Add support for AbortSignal to cancel requests (#539)

Thx @jnields @FrogTheFrog @TimothyGu for their work!
This commit is contained in:
Joseph Nields 2018-11-12 20:40:11 -08:00 committed by David Frank
parent 1daae67e9e
commit ecd3d52c55
9 changed files with 469 additions and 19 deletions

View File

@ -6,7 +6,17 @@ Because `window.fetch` isn't designed to be transparent about the cause of reque
The basics:
- All [operational errors][joyent-guide] are rejected with a [FetchError](https://github.com/bitinn/node-fetch/blob/master/README.md#class-fetcherror). You can handle them all through the promise `catch` clause.
- A cancelled request is rejected with an [`AbortError`](https://github.com/bitinn/node-fetch/blob/master/README.md#class-aborterror). You can check if the reason for rejection was that the request was aborted by checking the `Error`'s `name` is `AbortError`.
```js
fetch(url, { signal }).catch(err => {
if (err.name === 'AbortError') {
// request was aborted
}
})
```
- All [operational errors][joyent-guide] *other than aborted requests* are rejected with a [FetchError](https://github.com/bitinn/node-fetch/blob/master/README.md#class-fetcherror). You can handle them all through the promise `catch` clause.
- All errors come with an `err.message` detailing the cause of errors.

View File

@ -118,7 +118,7 @@ fetch('https://httpbin.org/post', { method: 'POST', body: 'a=1' })
```js
const body = { a: 1 };
fetch('https://httpbin.org/post', {
fetch('https://httpbin.org/post', {
method: 'post',
body: JSON.stringify(body),
headers: { 'Content-Type': 'application/json' },
@ -275,16 +275,51 @@ The default values are shown after each option key.
headers: {}, // request headers. format is the identical to that accepted by the Headers constructor (see below)
body: null, // request body. can be null, a string, a Buffer, a Blob, or a Node.js Readable stream
redirect: 'follow', // set to `manual` to extract redirect headers, `error` to reject redirect
signal: null, // pass an instance of AbortSignal to optionally abort requests
// The following properties are node-fetch extensions
follow: 20, // maximum redirect count. 0 to not follow redirect
timeout: 0, // req/res timeout in ms, it resets on redirect. 0 to disable (OS limit applies)
timeout: 0, // req/res timeout in ms, it resets on redirect. 0 to disable (OS limit applies). Signal is recommended instead.
compress: true, // support gzip/deflate content encoding. false to disable
size: 0, // maximum response body size in bytes. 0 to disable
agent: null // http(s).Agent instance, allows custom proxy, certificate, dns lookup etc.
}
```
#### Request cancellation with AbortController:
> NOTE: You may only cancel streamed requests on Node >= v8.0.0
You may cancel requests with `AbortController`. A suggested implementation is [`abort-controller`](https://www.npmjs.com/package/abort-controller).
An example of timing out a request after 150ms could be achieved as follows:
```js
import AbortContoller from 'abort-controller';
const controller = new AbortController();
const timeout = setTimeout(
() => { controller.abort(); },
150,
);
fetch(url, { signal: controller.signal })
.then(res => res.json())
.then(
data => {
useData(data)
},
err => {
if (err.name === 'AbortError') {
// request was aborted
}
},
)
.finally(() => {
clearTimeout(timeout);
});
```
##### Default Headers
If no values are set, the following request headers will be sent automatically:
@ -463,6 +498,13 @@ Identical to `body.text()`, except instead of always converting to UTF-8, encodi
An operational error in the fetching process. See [ERROR-HANDLING.md][] for more info.
<a id="class-aborterror"></a>
### Class: AbortError
<small>*(node-fetch extension)*</small>
An Error thrown when the request is aborted in response to an `AbortSignal`'s `abort` event. It has a `name` property of `AbortError`. See [ERROR-HANDLING.MD][] for more info.
## Acknowledgement
Thanks to [github/fetch](https://github.com/github/fetch) for providing a solid implementation reference.
@ -487,4 +529,4 @@ MIT
[mdn-headers]: https://developer.mozilla.org/en-US/docs/Web/API/Headers
[LIMITS.md]: https://github.com/bitinn/node-fetch/blob/master/LIMITS.md
[ERROR-HANDLING.md]: https://github.com/bitinn/node-fetch/blob/master/ERROR-HANDLING.md
[UPGRADE-GUIDE.md]: https://github.com/bitinn/node-fetch/blob/master/UPGRADE-GUIDE.md
[UPGRADE-GUIDE.md]: https://github.com/bitinn/node-fetch/blob/master/UPGRADE-GUIDE.md

View File

@ -37,6 +37,8 @@
},
"homepage": "https://github.com/bitinn/node-fetch",
"devDependencies": {
"abort-controller": "^1.0.2",
"abortcontroller-polyfill": "^1.1.9",
"babel-core": "^6.26.0",
"babel-plugin-istanbul": "^4.1.5",
"babel-preset-env": "^1.6.1",

25
src/abort-error.js Normal file
View File

@ -0,0 +1,25 @@
/**
* abort-error.js
*
* AbortError interface for cancelled requests
*/
/**
* Create AbortError instance
*
* @param String message Error message for human
* @return AbortError
*/
export default function AbortError(message) {
Error.call(this, message);
this.type = 'aborted';
this.message = message;
// hide custom error implementation details from end-users
Error.captureStackTrace(this, this.constructor);
}
AbortError.prototype = Object.create(Error.prototype);
AbortError.prototype.constructor = AbortError;
AbortError.prototype.name = 'AbortError';

View File

@ -63,7 +63,10 @@ export default function Body(body, {
if (body instanceof Stream) {
body.on('error', err => {
this[INTERNALS].error = new FetchError(`Invalid response body while trying to fetch ${this.url}: ${err.message}`, 'system', err);
const error = err.name === 'AbortError'
? err
: new FetchError(`Invalid response body while trying to fetch ${this.url}: ${err.message}`, 'system', err);
this[INTERNALS].error = error;
});
}
}
@ -240,9 +243,16 @@ function consumeBody() {
}, this.timeout);
}
// handle stream error, such as incorrect content-encoding
// handle stream errors
this.body.on('error', err => {
reject(new FetchError(`Invalid response body while trying to fetch ${this.url}: ${err.message}`, 'system', err));
if (err.name === 'AbortError') {
// if the request was aborted, reject with this Error
abort = true;
reject(err);
} else {
// other errors, such as incorrect content-encoding
reject(new FetchError(`Invalid response body while trying to fetch ${this.url}: ${err.message}`, 'system', err));
}
});
this.body.on('data', chunk => {

View File

@ -18,6 +18,7 @@ import Response from './response';
import Headers, { createHeadersLenient } from './headers';
import Request, { getNodeRequestOptions } from './request';
import FetchError from './fetch-error';
import AbortError from './abort-error';
// fix an issue where "PassThrough", "resolve" aren't a named export for node <10
const PassThrough = Stream.PassThrough;
@ -46,13 +47,40 @@ export default function fetch(url, opts) {
const options = getNodeRequestOptions(request);
const send = (options.protocol === 'https:' ? https : http).request;
const { signal } = request;
let response = null;
const abort = () => {
let error = new AbortError('The user aborted a request.');
reject(error);
if (request.body && request.body instanceof Stream.Readable) {
request.body.destroy(error);
}
if (!response || !response.body) return;
response.body.emit('error', error);
}
if (signal && signal.aborted) {
abort();
return;
}
const abortAndFinalize = () => {
abort();
finalize();
}
// send request
const req = send(options);
let reqTimeout;
if (signal) {
signal.addEventListener('abort', abortAndFinalize);
}
function finalize() {
req.abort();
if (signal) signal.removeEventListener('abort', abortAndFinalize);
clearTimeout(reqTimeout);
}
@ -117,7 +145,8 @@ export default function fetch(url, opts) {
agent: request.agent,
compress: request.compress,
method: request.method,
body: request.body
body: request.body,
signal: request.signal,
};
// HTTP-redirect fetch step 9
@ -142,7 +171,11 @@ export default function fetch(url, opts) {
}
// prepare response
res.once('end', () => {
if (signal) signal.removeEventListener('abort', abortAndFinalize);
});
let body = res.pipe(new PassThrough());
const response_options = {
url: request.url,
status: res.statusCode,
@ -164,7 +197,8 @@ export default function fetch(url, opts) {
// 4. no content response (204)
// 5. content not modified response (304)
if (!request.compress || request.method === 'HEAD' || codings === null || res.statusCode === 204 || res.statusCode === 304) {
resolve(new Response(body, response_options));
response = new Response(body, response_options);
resolve(response);
return;
}
@ -181,7 +215,8 @@ export default function fetch(url, opts) {
// for gzip
if (codings == 'gzip' || codings == 'x-gzip') {
body = body.pipe(zlib.createGunzip(zlibOptions));
resolve(new Response(body, response_options));
response = new Response(body, response_options);
resolve(response);
return;
}
@ -197,13 +232,15 @@ export default function fetch(url, opts) {
} else {
body = body.pipe(zlib.createInflateRaw());
}
resolve(new Response(body, response_options));
response = new Response(body, response_options);
resolve(response);
});
return;
}
// otherwise, use response as-is
resolve(new Response(body, response_options));
response = new Response(body, response_options);
resolve(response);
});
writeToStream(req, request);

View File

@ -8,7 +8,7 @@
*/
import Url from 'url';
import Stream from 'stream';
import Headers, { exportNodeCompatibleHeaders } from './headers.js';
import Body, { clone, extractContentType, getTotalBytes } from './body';
@ -18,6 +18,8 @@ const INTERNALS = Symbol('Request internals');
const parse_url = Url.parse;
const format_url = Url.format;
const streamDestructionSupported = 'destroy' in Stream.Readable.prototype;
/**
* Check if a value is an instance of Request.
*
@ -31,6 +33,15 @@ function isRequest(input) {
);
}
function isAbortSignal(signal) {
const proto = (
signal
&& typeof signal === 'object'
&& Object.getPrototypeOf(signal)
);
return !!(proto && proto.constructor.name === 'AbortSignal');
}
/**
* Request class
*
@ -86,11 +97,21 @@ export default class Request {
}
}
let signal = isRequest(input)
? input.signal
: null;
if ('signal' in init) signal = init.signal
if (signal != null && !isAbortSignal(signal)) {
throw new TypeError('Expected signal to be an instanceof AbortSignal');
}
this[INTERNALS] = {
method,
redirect: init.redirect || input.redirect || 'follow',
headers,
parsedURL
parsedURL,
signal,
};
// node-fetch-only options
@ -120,6 +141,10 @@ export default class Request {
return this[INTERNALS].redirect;
}
get signal() {
return this[INTERNALS].signal;
}
/**
* Clone this request
*
@ -144,7 +169,8 @@ Object.defineProperties(Request.prototype, {
url: { enumerable: true },
headers: { enumerable: true },
redirect: { enumerable: true },
clone: { enumerable: true }
clone: { enumerable: true },
signal: { enumerable: true },
});
/**
@ -171,6 +197,14 @@ export function getNodeRequestOptions(request) {
throw new TypeError('Only HTTP(S) protocols are supported');
}
if (
request.signal
&& request.body instanceof Stream.Readable
&& !streamDestructionSupported
) {
throw new Error('Cancellation of streamed requests with AbortSignal is not supported in node < 8');
}
// HTTP-network-or-cache fetch steps 2.4-2.7
let contentLengthValue = null;
if (request.body == null && /^(POST|PUT)$/i.test(request.method)) {

View File

@ -269,6 +269,20 @@ export default class TestServer {
res.end();
}
if (p === '/redirect/slow') {
res.statusCode = 301;
res.setHeader('Location', '/redirect/301');
setTimeout(function() {
res.end();
}, 1000);
}
if (p === '/redirect/slow-stream') {
res.statusCode = 301;
res.setHeader('Location', '/slow');
res.end();
}
if (p === '/error/400') {
res.statusCode = 400;
res.setHeader('Content-Type', 'text/plain');

View File

@ -10,6 +10,8 @@ import FormData from 'form-data';
import stringToArrayBuffer from 'string-to-arraybuffer';
import URLSearchParams_Polyfill from 'url-search-params';
import { URL } from 'whatwg-url';
import { AbortController } from 'abortcontroller-polyfill/dist/abortcontroller';
import AbortController2 from 'abort-controller';
const { spawn } = require('child_process');
const http = require('http');
@ -53,6 +55,8 @@ const supportToString = ({
[Symbol.toStringTag]: 'z'
}).toString() === '[object z]';
const supportStreamDestroy = 'destroy' in stream.Readable.prototype;
const local = new TestServer();
const base = `http://${local.hostname}:${local.port}/`;
@ -793,6 +797,247 @@ describe('node-fetch', () => {
});
});
it('should support request cancellation with signal', function () {
this.timeout(500);
const controller = new AbortController();
const controller2 = new AbortController2();
const fetches = [
fetch(`${base}timeout`, { signal: controller.signal }),
fetch(`${base}timeout`, { signal: controller2.signal }),
fetch(
`${base}timeout`,
{
method: 'POST',
signal: controller.signal,
headers: {
'Content-Type': 'application/json',
body: JSON.stringify({ hello: 'world' })
}
}
)
];
setTimeout(() => {
controller.abort();
controller2.abort();
}, 100);
return Promise.all(fetches.map(fetched => expect(fetched)
.to.eventually.be.rejected
.and.be.an.instanceOf(Error)
.and.include({
type: 'aborted',
name: 'AbortError',
})
));
});
it('should reject immediately if signal has already been aborted', function () {
const url = `${base}timeout`;
const controller = new AbortController();
const opts = {
signal: controller.signal
};
controller.abort();
const fetched = fetch(url, opts);
return expect(fetched).to.eventually.be.rejected
.and.be.an.instanceOf(Error)
.and.include({
type: 'aborted',
name: 'AbortError',
});
});
it('should clear internal timeout when request is cancelled with an AbortSignal', function(done) {
this.timeout(2000);
const script = `
var AbortController = require('abortcontroller-polyfill/dist/cjs-ponyfill').AbortController;
var controller = new AbortController();
require('./')(
'${base}timeout',
{ signal: controller.signal, timeout: 10000 }
);
setTimeout(function () { controller.abort(); }, 100);
`
spawn('node', ['-e', script])
.on('exit', () => {
done();
});
});
it('should remove internal AbortSignal event listener after request is aborted', function () {
const controller = new AbortController();
const { signal } = controller;
const promise = fetch(
`${base}timeout`,
{ signal }
);
const result = expect(promise).to.eventually.be.rejected
.and.be.an.instanceof(Error)
.and.have.property('name', 'AbortError')
.then(() => {
expect(signal.listeners.abort.length).to.equal(0);
});
controller.abort();
return result;
});
it('should allow redirects to be aborted', function() {
const abortController = new AbortController();
const request = new Request(`${base}redirect/slow`, {
signal: abortController.signal
});
setTimeout(() => {
abortController.abort();
}, 50);
return expect(fetch(request)).to.be.eventually.rejected
.and.be.an.instanceOf(Error)
.and.have.property('name', 'AbortError');
});
it('should allow redirected response body to be aborted', function() {
const abortController = new AbortController();
const request = new Request(`${base}redirect/slow-stream`, {
signal: abortController.signal
});
return expect(fetch(request).then(res => {
expect(res.headers.get('content-type')).to.equal('text/plain');
const result = res.text();
abortController.abort();
return result;
})).to.be.eventually.rejected
.and.be.an.instanceOf(Error)
.and.have.property('name', 'AbortError');
});
it('should remove internal AbortSignal event listener after request and response complete without aborting', () => {
const controller = new AbortController();
const { signal } = controller;
const fetchHtml = fetch(`${base}html`, { signal })
.then(res => res.text());
const fetchResponseError = fetch(`${base}error/reset`, { signal });
const fetchRedirect = fetch(`${base}redirect/301`, { signal }).then(res => res.json());
return Promise.all([
expect(fetchHtml).to.eventually.be.fulfilled.and.equal('<html></html>'),
expect(fetchResponseError).to.be.eventually.rejected,
expect(fetchRedirect).to.eventually.be.fulfilled,
]).then(() => {
expect(signal.listeners.abort.length).to.equal(0)
});
});
it('should reject response body with AbortError when aborted before stream has been read completely', () => {
const controller = new AbortController();
return expect(fetch(
`${base}slow`,
{ signal: controller.signal }
))
.to.eventually.be.fulfilled
.then((res) => {
const promise = res.text();
controller.abort();
return expect(promise)
.to.eventually.be.rejected
.and.be.an.instanceof(Error)
.and.have.property('name', 'AbortError');
});
});
it('should reject response body methods immediately with AbortError when aborted before stream is disturbed', () => {
const controller = new AbortController();
return expect(fetch(
`${base}slow`,
{ signal: controller.signal }
))
.to.eventually.be.fulfilled
.then((res) => {
controller.abort();
return expect(res.text())
.to.eventually.be.rejected
.and.be.an.instanceof(Error)
.and.have.property('name', 'AbortError');
});
});
it('should emit error event to response body with an AbortError when aborted before underlying stream is closed', (done) => {
const controller = new AbortController();
expect(fetch(
`${base}slow`,
{ signal: controller.signal }
))
.to.eventually.be.fulfilled
.then((res) => {
res.body.on('error', (err) => {
expect(err)
.to.be.an.instanceof(Error)
.and.have.property('name', 'AbortError');
done();
});
controller.abort();
});
});
(supportStreamDestroy ? it : it.skip)('should cancel request body of type Stream with AbortError when aborted', () => {
const controller = new AbortController();
const body = new stream.Readable({ objectMode: true });
body._read = () => {};
const promise = fetch(
`${base}slow`,
{ signal: controller.signal, body, method: 'POST' }
);
const result = Promise.all([
new Promise((resolve, reject) => {
body.on('error', (error) => {
try {
expect(error).to.be.an.instanceof(Error).and.have.property('name', 'AbortError')
resolve();
} catch (err) {
reject(err);
}
});
}),
expect(promise).to.eventually.be.rejected
.and.be.an.instanceof(Error)
.and.have.property('name', 'AbortError')
]);
controller.abort();
return result;
});
(supportStreamDestroy ? it.skip : it)('should immediately reject when attempting to cancel streamed Requests in node < 8', () => {
const controller = new AbortController();
const body = new stream.Readable({ objectMode: true });
body._read = () => {};
const promise = fetch(
`${base}slow`,
{ signal: controller.signal, body, method: 'POST' }
);
return expect(promise).to.eventually.be.rejected
.and.be.an.instanceof(Error)
.and.have.property('message').includes('not supported');
});
it('should throw a TypeError if a signal is not of type AbortSignal', () => {
return Promise.all([
expect(fetch(`${base}inspect`, { signal: {} }))
.to.be.eventually.rejected
.and.be.an.instanceof(TypeError)
.and.have.property('message').includes('AbortSignal'),
expect(fetch(`${base}inspect`, { signal: '' }))
.to.be.eventually.rejected
.and.be.an.instanceof(TypeError)
.and.have.property('message').includes('AbortSignal'),
expect(fetch(`${base}inspect`, { signal: Object.create(null) }))
.to.be.eventually.rejected
.and.be.an.instanceof(TypeError)
.and.have.property('message').includes('AbortSignal'),
]);
});
it('should set default User-Agent', function () {
const url = `${base}inspect`;
return fetch(url).then(res => res.json()).then(res => {
@ -2016,12 +2261,12 @@ describe('Request', function () {
}
for (const toCheck of [
'body', 'bodyUsed', 'arrayBuffer', 'blob', 'json', 'text',
'method', 'url', 'headers', 'redirect', 'clone'
'method', 'url', 'headers', 'redirect', 'clone', 'signal',
]) {
expect(enumerableProperties).to.contain(toCheck);
}
for (const toCheck of [
'body', 'bodyUsed', 'method', 'url', 'headers', 'redirect'
'body', 'bodyUsed', 'method', 'url', 'headers', 'redirect', 'signal',
]) {
expect(() => {
req[toCheck] = 'abc';
@ -2034,11 +2279,13 @@ describe('Request', function () {
const form = new FormData();
form.append('a', '1');
const { signal } = new AbortController();
const r1 = new Request(url, {
method: 'POST',
follow: 1,
body: form
body: form,
signal,
});
const r2 = new Request(r1, {
follow: 2
@ -2046,6 +2293,7 @@ describe('Request', function () {
expect(r2.url).to.equal(url);
expect(r2.method).to.equal('POST');
expect(r2.signal).to.equal(signal);
// note that we didn't clone the body
expect(r2.body).to.equal(form);
expect(r1.follow).to.equal(1);
@ -2054,6 +2302,31 @@ describe('Request', function () {
expect(r2.counter).to.equal(0);
});
it('should override signal on derived Request instances', function() {
const parentAbortController = new AbortController();
const derivedAbortController = new AbortController();
const parentRequest = new Request(`test`, {
signal: parentAbortController.signal
});
const derivedRequest = new Request(parentRequest, {
signal: derivedAbortController.signal
});
expect(parentRequest.signal).to.equal(parentAbortController.signal);
expect(derivedRequest.signal).to.equal(derivedAbortController.signal);
});
it('should allow removing signal on derived Request instances', function() {
const parentAbortController = new AbortController();
const parentRequest = new Request(`test`, {
signal: parentAbortController.signal
});
const derivedRequest = new Request(parentRequest, {
signal: null
});
expect(parentRequest.signal).to.equal(parentAbortController.signal);
expect(derivedRequest.signal).to.equal(null);
});
it('should throw error with GET/HEAD requests with body', function() {
expect(() => new Request('.', { body: '' }))
.to.throw(TypeError);
@ -2161,6 +2434,7 @@ describe('Request', function () {
let body = resumer().queue('a=1').end();
body = body.pipe(new stream.PassThrough());
const agent = new http.Agent();
const { signal } = new AbortController();
const req = new Request(url, {
body,
method: 'POST',
@ -2170,7 +2444,8 @@ describe('Request', function () {
},
follow: 3,
compress: false,
agent
agent,
signal,
});
const cl = req.clone();
expect(cl.url).to.equal(url);
@ -2182,6 +2457,7 @@ describe('Request', function () {
expect(cl.method).to.equal('POST');
expect(cl.counter).to.equal(0);
expect(cl.agent).to.equal(agent);
expect(cl.signal).to.equal(signal);
// clone body shouldn't be the same body
expect(cl.body).to.not.equal(body);
return Promise.all([cl.text(), req.text()]).then(results => {