Fix premature close with chunked transfer encoding and for async iterators in Node 12 (#1064)
Co-authored-by: Irakli Gozalishvili <contact@gozala.io>
This commit is contained in:
parent
6ee9d3186f
commit
8eeeec18c1
49
README.md
49
README.md
|
@ -285,6 +285,55 @@ if (!response.ok) throw new Error(`unexpected response ${response.statusText}`);
|
||||||
await streamPipeline(response.body, createWriteStream('./octocat.png'));
|
await streamPipeline(response.body, createWriteStream('./octocat.png'));
|
||||||
```
|
```
|
||||||
|
|
||||||
|
In Node.js 14 you can also use async iterators to read `body`; however, be careful to catch
|
||||||
|
errors -- the longer a response runs, the more likely it is to encounter an error.
|
||||||
|
|
||||||
|
```js
|
||||||
|
const fetch = require('node-fetch');
|
||||||
|
|
||||||
|
const response = await fetch('https://httpbin.org/stream/3');
|
||||||
|
|
||||||
|
try {
|
||||||
|
for await (const chunk of response.body) {
|
||||||
|
console.dir(JSON.parse(chunk.toString()));
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error(err.stack);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
In Node.js 12 you can also use async iterators to read `body`; however, async iterators with streams
|
||||||
|
did not mature until Node.js 14, so you need to do some extra work to ensure you handle errors
|
||||||
|
directly from the stream and wait on it response to fully close.
|
||||||
|
|
||||||
|
```js
|
||||||
|
const fetch = require('node-fetch');
|
||||||
|
|
||||||
|
const read = async body => {
|
||||||
|
let error;
|
||||||
|
body.on('error', err => {
|
||||||
|
error = err;
|
||||||
|
});
|
||||||
|
|
||||||
|
for await (const chunk of body) {
|
||||||
|
console.dir(JSON.parse(chunk.toString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
body.on('close', () => {
|
||||||
|
error ? reject(error) : resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await fetch('https://httpbin.org/stream/3');
|
||||||
|
await read(response.body);
|
||||||
|
} catch (err) {
|
||||||
|
console.error(err.stack);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
### Buffer
|
### Buffer
|
||||||
|
|
||||||
If you prefer to cache binary data in full, use buffer(). (NOTE: buffer() is a `node-fetch` only API)
|
If you prefer to cache binary data in full, use buffer(). (NOTE: buffer() is a `node-fetch` only API)
|
||||||
|
|
52
src/index.js
52
src/index.js
|
@ -95,6 +95,30 @@ export default async function fetch(url, options_) {
|
||||||
finalize();
|
finalize();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
fixResponseChunkedTransferBadEnding(request_, err => {
|
||||||
|
response.body.destroy(err);
|
||||||
|
});
|
||||||
|
|
||||||
|
/* c8 ignore next 18 */
|
||||||
|
if (process.version < 'v14') {
|
||||||
|
// Before Node.js 14, pipeline() does not fully support async iterators and does not always
|
||||||
|
// properly handle when the socket close/end events are out of order.
|
||||||
|
request_.on('socket', s => {
|
||||||
|
let endedWithEventsCount;
|
||||||
|
s.prependListener('end', () => {
|
||||||
|
endedWithEventsCount = s._eventsCount;
|
||||||
|
});
|
||||||
|
s.prependListener('close', hadError => {
|
||||||
|
// if end happened before close but the socket didn't emit an error, do it now
|
||||||
|
if (response && endedWithEventsCount < s._eventsCount && !hadError) {
|
||||||
|
const err = new Error('Premature close');
|
||||||
|
err.code = 'ERR_STREAM_PREMATURE_CLOSE';
|
||||||
|
response.body.emit('error', err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
request_.on('response', response_ => {
|
request_.on('response', response_ => {
|
||||||
request_.setTimeout(0);
|
request_.setTimeout(0);
|
||||||
const headers = fromRawHeaders(response_.rawHeaders);
|
const headers = fromRawHeaders(response_.rawHeaders);
|
||||||
|
@ -265,3 +289,31 @@ export default async function fetch(url, options_) {
|
||||||
writeToStream(request_, request);
|
writeToStream(request_, request);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function fixResponseChunkedTransferBadEnding(request, errorCallback) {
|
||||||
|
const LAST_CHUNK = Buffer.from('0\r\n');
|
||||||
|
let socket;
|
||||||
|
|
||||||
|
request.on('socket', s => {
|
||||||
|
socket = s;
|
||||||
|
});
|
||||||
|
|
||||||
|
request.on('response', response => {
|
||||||
|
const {headers} = response;
|
||||||
|
if (headers['transfer-encoding'] === 'chunked' && !headers['content-length']) {
|
||||||
|
let properLastChunkReceived = false;
|
||||||
|
|
||||||
|
socket.on('data', buf => {
|
||||||
|
properLastChunkReceived = Buffer.compare(buf.slice(-3), LAST_CHUNK) === 0;
|
||||||
|
});
|
||||||
|
|
||||||
|
socket.prependListener('close', () => {
|
||||||
|
if (!properLastChunkReceived) {
|
||||||
|
const err = new Error('Premature close');
|
||||||
|
err.code = 'ERR_STREAM_PREMATURE_CLOSE';
|
||||||
|
errorCallback(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
68
test/main.js
68
test/main.js
|
@ -613,6 +613,74 @@ describe('node-fetch', () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should handle network-error in chunked response', () => {
|
||||||
|
const url = `${base}error/premature/chunked`;
|
||||||
|
return fetch(url).then(res => {
|
||||||
|
expect(res.status).to.equal(200);
|
||||||
|
expect(res.ok).to.be.true;
|
||||||
|
|
||||||
|
return expect(new Promise((resolve, reject) => {
|
||||||
|
res.body.on('error', reject);
|
||||||
|
res.body.on('close', resolve);
|
||||||
|
})).to.eventually.be.rejectedWith(Error, 'Premature close')
|
||||||
|
.and.have.property('code', 'ERR_STREAM_PREMATURE_CLOSE');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should handle network-error in chunked response async iterator', () => {
|
||||||
|
const url = `${base}error/premature/chunked`;
|
||||||
|
return fetch(url).then(res => {
|
||||||
|
expect(res.status).to.equal(200);
|
||||||
|
expect(res.ok).to.be.true;
|
||||||
|
|
||||||
|
const read = async body => {
|
||||||
|
const chunks = [];
|
||||||
|
|
||||||
|
if (process.version < 'v14') {
|
||||||
|
// In Node.js 12, some errors don't come out in the async iterator; we have to pick
|
||||||
|
// them up from the event-emitter and then throw them after the async iterator
|
||||||
|
let error;
|
||||||
|
body.on('error', err => {
|
||||||
|
error = err;
|
||||||
|
});
|
||||||
|
|
||||||
|
for await (const chunk of body) {
|
||||||
|
chunks.push(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (error) {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Promise(resolve => {
|
||||||
|
body.on('close', () => resolve(chunks));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
for await (const chunk of body) {
|
||||||
|
chunks.push(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
return chunks;
|
||||||
|
};
|
||||||
|
|
||||||
|
return expect(read(res.body))
|
||||||
|
.to.eventually.be.rejectedWith(Error, 'Premature close')
|
||||||
|
.and.have.property('code', 'ERR_STREAM_PREMATURE_CLOSE');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should handle network-error in chunked response in consumeBody', () => {
|
||||||
|
const url = `${base}error/premature/chunked`;
|
||||||
|
return fetch(url).then(res => {
|
||||||
|
expect(res.status).to.equal(200);
|
||||||
|
expect(res.ok).to.be.true;
|
||||||
|
|
||||||
|
return expect(res.text())
|
||||||
|
.to.eventually.be.rejectedWith(Error, 'Premature close');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
it('should handle DNS-error response', () => {
|
it('should handle DNS-error response', () => {
|
||||||
const url = 'http://domain.invalid';
|
const url = 'http://domain.invalid';
|
||||||
return expect(fetch(url)).to.eventually.be.rejected
|
return expect(fetch(url)).to.eventually.be.rejected
|
||||||
|
|
|
@ -323,6 +323,23 @@ export default class TestServer {
|
||||||
}, 100);
|
}, 100);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (p === '/error/premature/chunked') {
|
||||||
|
res.writeHead(200, {
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
'Transfer-Encoding': 'chunked'
|
||||||
|
});
|
||||||
|
|
||||||
|
res.write(`${JSON.stringify({data: 'hi'})}\n`);
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
res.write(`${JSON.stringify({data: 'bye'})}\n`);
|
||||||
|
}, 200);
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
res.destroy();
|
||||||
|
}, 400);
|
||||||
|
}
|
||||||
|
|
||||||
if (p === '/error/json') {
|
if (p === '/error/json') {
|
||||||
res.statusCode = 200;
|
res.statusCode = 200;
|
||||||
res.setHeader('Content-Type', 'application/json');
|
res.setHeader('Content-Type', 'application/json');
|
||||||
|
|
Loading…
Reference in New Issue