From ecd3d52c55b26d2d350e2c9e0347675a02b3d2cc Mon Sep 17 00:00:00 2001 From: Joseph Nields Date: Mon, 12 Nov 2018 20:40:11 -0800 Subject: [PATCH] Add support for AbortSignal to cancel requests (#539) Thx @jnields @FrogTheFrog @TimothyGu for their work! --- ERROR-HANDLING.md | 12 +- README.md | 48 +++++++- package.json | 2 + src/abort-error.js | 25 ++++ src/body.js | 16 ++- src/index.js | 47 +++++++- src/request.js | 40 ++++++- test/server.js | 14 +++ test/test.js | 284 ++++++++++++++++++++++++++++++++++++++++++++- 9 files changed, 469 insertions(+), 19 deletions(-) create mode 100644 src/abort-error.js diff --git a/ERROR-HANDLING.md b/ERROR-HANDLING.md index 7ff8f54..89d5691 100644 --- a/ERROR-HANDLING.md +++ b/ERROR-HANDLING.md @@ -6,7 +6,17 @@ Because `window.fetch` isn't designed to be transparent about the cause of reque The basics: -- All [operational errors][joyent-guide] are rejected with a [FetchError](https://github.com/bitinn/node-fetch/blob/master/README.md#class-fetcherror). You can handle them all through the promise `catch` clause. +- A cancelled request is rejected with an [`AbortError`](https://github.com/bitinn/node-fetch/blob/master/README.md#class-aborterror). You can check if the reason for rejection was that the request was aborted by checking the `Error`'s `name` is `AbortError`. + +```js +fetch(url, { signal }).catch(err => { + if (err.name === 'AbortError') { + // request was aborted + } +}) +``` + +- All [operational errors][joyent-guide] *other than aborted requests* are rejected with a [FetchError](https://github.com/bitinn/node-fetch/blob/master/README.md#class-fetcherror). You can handle them all through the promise `catch` clause. - All errors come with an `err.message` detailing the cause of errors. diff --git a/README.md b/README.md index 3b20230..15fd3f7 100644 --- a/README.md +++ b/README.md @@ -118,7 +118,7 @@ fetch('https://httpbin.org/post', { method: 'POST', body: 'a=1' }) ```js const body = { a: 1 }; -fetch('https://httpbin.org/post', { +fetch('https://httpbin.org/post', { method: 'post', body: JSON.stringify(body), headers: { 'Content-Type': 'application/json' }, @@ -275,16 +275,51 @@ The default values are shown after each option key. headers: {}, // request headers. format is the identical to that accepted by the Headers constructor (see below) body: null, // request body. can be null, a string, a Buffer, a Blob, or a Node.js Readable stream redirect: 'follow', // set to `manual` to extract redirect headers, `error` to reject redirect + signal: null, // pass an instance of AbortSignal to optionally abort requests // The following properties are node-fetch extensions follow: 20, // maximum redirect count. 0 to not follow redirect - timeout: 0, // req/res timeout in ms, it resets on redirect. 0 to disable (OS limit applies) + timeout: 0, // req/res timeout in ms, it resets on redirect. 0 to disable (OS limit applies). Signal is recommended instead. compress: true, // support gzip/deflate content encoding. false to disable size: 0, // maximum response body size in bytes. 0 to disable agent: null // http(s).Agent instance, allows custom proxy, certificate, dns lookup etc. } ``` +#### Request cancellation with AbortController: + +> NOTE: You may only cancel streamed requests on Node >= v8.0.0 + +You may cancel requests with `AbortController`. A suggested implementation is [`abort-controller`](https://www.npmjs.com/package/abort-controller). + +An example of timing out a request after 150ms could be achieved as follows: + +```js +import AbortContoller from 'abort-controller'; + +const controller = new AbortController(); +const timeout = setTimeout( + () => { controller.abort(); }, + 150, +); + +fetch(url, { signal: controller.signal }) + .then(res => res.json()) + .then( + data => { + useData(data) + }, + err => { + if (err.name === 'AbortError') { + // request was aborted + } + }, + ) + .finally(() => { + clearTimeout(timeout); + }); +``` + ##### Default Headers If no values are set, the following request headers will be sent automatically: @@ -463,6 +498,13 @@ Identical to `body.text()`, except instead of always converting to UTF-8, encodi An operational error in the fetching process. See [ERROR-HANDLING.md][] for more info. + +### Class: AbortError + +*(node-fetch extension)* + +An Error thrown when the request is aborted in response to an `AbortSignal`'s `abort` event. It has a `name` property of `AbortError`. See [ERROR-HANDLING.MD][] for more info. + ## Acknowledgement Thanks to [github/fetch](https://github.com/github/fetch) for providing a solid implementation reference. @@ -487,4 +529,4 @@ MIT [mdn-headers]: https://developer.mozilla.org/en-US/docs/Web/API/Headers [LIMITS.md]: https://github.com/bitinn/node-fetch/blob/master/LIMITS.md [ERROR-HANDLING.md]: https://github.com/bitinn/node-fetch/blob/master/ERROR-HANDLING.md -[UPGRADE-GUIDE.md]: https://github.com/bitinn/node-fetch/blob/master/UPGRADE-GUIDE.md \ No newline at end of file +[UPGRADE-GUIDE.md]: https://github.com/bitinn/node-fetch/blob/master/UPGRADE-GUIDE.md diff --git a/package.json b/package.json index 577d467..a491f2a 100644 --- a/package.json +++ b/package.json @@ -37,6 +37,8 @@ }, "homepage": "https://github.com/bitinn/node-fetch", "devDependencies": { + "abort-controller": "^1.0.2", + "abortcontroller-polyfill": "^1.1.9", "babel-core": "^6.26.0", "babel-plugin-istanbul": "^4.1.5", "babel-preset-env": "^1.6.1", diff --git a/src/abort-error.js b/src/abort-error.js new file mode 100644 index 0000000..cbb13ca --- /dev/null +++ b/src/abort-error.js @@ -0,0 +1,25 @@ +/** + * abort-error.js + * + * AbortError interface for cancelled requests + */ + +/** + * Create AbortError instance + * + * @param String message Error message for human + * @return AbortError + */ +export default function AbortError(message) { + Error.call(this, message); + + this.type = 'aborted'; + this.message = message; + + // hide custom error implementation details from end-users + Error.captureStackTrace(this, this.constructor); +} + +AbortError.prototype = Object.create(Error.prototype); +AbortError.prototype.constructor = AbortError; +AbortError.prototype.name = 'AbortError'; diff --git a/src/body.js b/src/body.js index d39149f..6efe52d 100644 --- a/src/body.js +++ b/src/body.js @@ -63,7 +63,10 @@ export default function Body(body, { if (body instanceof Stream) { body.on('error', err => { - this[INTERNALS].error = new FetchError(`Invalid response body while trying to fetch ${this.url}: ${err.message}`, 'system', err); + const error = err.name === 'AbortError' + ? err + : new FetchError(`Invalid response body while trying to fetch ${this.url}: ${err.message}`, 'system', err); + this[INTERNALS].error = error; }); } } @@ -240,9 +243,16 @@ function consumeBody() { }, this.timeout); } - // handle stream error, such as incorrect content-encoding + // handle stream errors this.body.on('error', err => { - reject(new FetchError(`Invalid response body while trying to fetch ${this.url}: ${err.message}`, 'system', err)); + if (err.name === 'AbortError') { + // if the request was aborted, reject with this Error + abort = true; + reject(err); + } else { + // other errors, such as incorrect content-encoding + reject(new FetchError(`Invalid response body while trying to fetch ${this.url}: ${err.message}`, 'system', err)); + } }); this.body.on('data', chunk => { diff --git a/src/index.js b/src/index.js index dca44aa..3c25c75 100644 --- a/src/index.js +++ b/src/index.js @@ -18,6 +18,7 @@ import Response from './response'; import Headers, { createHeadersLenient } from './headers'; import Request, { getNodeRequestOptions } from './request'; import FetchError from './fetch-error'; +import AbortError from './abort-error'; // fix an issue where "PassThrough", "resolve" aren't a named export for node <10 const PassThrough = Stream.PassThrough; @@ -46,13 +47,40 @@ export default function fetch(url, opts) { const options = getNodeRequestOptions(request); const send = (options.protocol === 'https:' ? https : http).request; + const { signal } = request; + let response = null; + + const abort = () => { + let error = new AbortError('The user aborted a request.'); + reject(error); + if (request.body && request.body instanceof Stream.Readable) { + request.body.destroy(error); + } + if (!response || !response.body) return; + response.body.emit('error', error); + } + + if (signal && signal.aborted) { + abort(); + return; + } + + const abortAndFinalize = () => { + abort(); + finalize(); + } // send request const req = send(options); let reqTimeout; + if (signal) { + signal.addEventListener('abort', abortAndFinalize); + } + function finalize() { req.abort(); + if (signal) signal.removeEventListener('abort', abortAndFinalize); clearTimeout(reqTimeout); } @@ -117,7 +145,8 @@ export default function fetch(url, opts) { agent: request.agent, compress: request.compress, method: request.method, - body: request.body + body: request.body, + signal: request.signal, }; // HTTP-redirect fetch step 9 @@ -142,7 +171,11 @@ export default function fetch(url, opts) { } // prepare response + res.once('end', () => { + if (signal) signal.removeEventListener('abort', abortAndFinalize); + }); let body = res.pipe(new PassThrough()); + const response_options = { url: request.url, status: res.statusCode, @@ -164,7 +197,8 @@ export default function fetch(url, opts) { // 4. no content response (204) // 5. content not modified response (304) if (!request.compress || request.method === 'HEAD' || codings === null || res.statusCode === 204 || res.statusCode === 304) { - resolve(new Response(body, response_options)); + response = new Response(body, response_options); + resolve(response); return; } @@ -181,7 +215,8 @@ export default function fetch(url, opts) { // for gzip if (codings == 'gzip' || codings == 'x-gzip') { body = body.pipe(zlib.createGunzip(zlibOptions)); - resolve(new Response(body, response_options)); + response = new Response(body, response_options); + resolve(response); return; } @@ -197,13 +232,15 @@ export default function fetch(url, opts) { } else { body = body.pipe(zlib.createInflateRaw()); } - resolve(new Response(body, response_options)); + response = new Response(body, response_options); + resolve(response); }); return; } // otherwise, use response as-is - resolve(new Response(body, response_options)); + response = new Response(body, response_options); + resolve(response); }); writeToStream(req, request); diff --git a/src/request.js b/src/request.js index c4d2959..5023aa5 100644 --- a/src/request.js +++ b/src/request.js @@ -8,7 +8,7 @@ */ import Url from 'url'; - +import Stream from 'stream'; import Headers, { exportNodeCompatibleHeaders } from './headers.js'; import Body, { clone, extractContentType, getTotalBytes } from './body'; @@ -18,6 +18,8 @@ const INTERNALS = Symbol('Request internals'); const parse_url = Url.parse; const format_url = Url.format; +const streamDestructionSupported = 'destroy' in Stream.Readable.prototype; + /** * Check if a value is an instance of Request. * @@ -31,6 +33,15 @@ function isRequest(input) { ); } +function isAbortSignal(signal) { + const proto = ( + signal + && typeof signal === 'object' + && Object.getPrototypeOf(signal) + ); + return !!(proto && proto.constructor.name === 'AbortSignal'); +} + /** * Request class * @@ -86,11 +97,21 @@ export default class Request { } } + let signal = isRequest(input) + ? input.signal + : null; + if ('signal' in init) signal = init.signal + + if (signal != null && !isAbortSignal(signal)) { + throw new TypeError('Expected signal to be an instanceof AbortSignal'); + } + this[INTERNALS] = { method, redirect: init.redirect || input.redirect || 'follow', headers, - parsedURL + parsedURL, + signal, }; // node-fetch-only options @@ -120,6 +141,10 @@ export default class Request { return this[INTERNALS].redirect; } + get signal() { + return this[INTERNALS].signal; + } + /** * Clone this request * @@ -144,7 +169,8 @@ Object.defineProperties(Request.prototype, { url: { enumerable: true }, headers: { enumerable: true }, redirect: { enumerable: true }, - clone: { enumerable: true } + clone: { enumerable: true }, + signal: { enumerable: true }, }); /** @@ -171,6 +197,14 @@ export function getNodeRequestOptions(request) { throw new TypeError('Only HTTP(S) protocols are supported'); } + if ( + request.signal + && request.body instanceof Stream.Readable + && !streamDestructionSupported + ) { + throw new Error('Cancellation of streamed requests with AbortSignal is not supported in node < 8'); + } + // HTTP-network-or-cache fetch steps 2.4-2.7 let contentLengthValue = null; if (request.body == null && /^(POST|PUT)$/i.test(request.method)) { diff --git a/test/server.js b/test/server.js index 6641b16..4028f0c 100644 --- a/test/server.js +++ b/test/server.js @@ -269,6 +269,20 @@ export default class TestServer { res.end(); } + if (p === '/redirect/slow') { + res.statusCode = 301; + res.setHeader('Location', '/redirect/301'); + setTimeout(function() { + res.end(); + }, 1000); + } + + if (p === '/redirect/slow-stream') { + res.statusCode = 301; + res.setHeader('Location', '/slow'); + res.end(); + } + if (p === '/error/400') { res.statusCode = 400; res.setHeader('Content-Type', 'text/plain'); diff --git a/test/test.js b/test/test.js index ef57940..e9b5db2 100644 --- a/test/test.js +++ b/test/test.js @@ -10,6 +10,8 @@ import FormData from 'form-data'; import stringToArrayBuffer from 'string-to-arraybuffer'; import URLSearchParams_Polyfill from 'url-search-params'; import { URL } from 'whatwg-url'; +import { AbortController } from 'abortcontroller-polyfill/dist/abortcontroller'; +import AbortController2 from 'abort-controller'; const { spawn } = require('child_process'); const http = require('http'); @@ -53,6 +55,8 @@ const supportToString = ({ [Symbol.toStringTag]: 'z' }).toString() === '[object z]'; +const supportStreamDestroy = 'destroy' in stream.Readable.prototype; + const local = new TestServer(); const base = `http://${local.hostname}:${local.port}/`; @@ -793,6 +797,247 @@ describe('node-fetch', () => { }); }); + it('should support request cancellation with signal', function () { + this.timeout(500); + const controller = new AbortController(); + const controller2 = new AbortController2(); + + const fetches = [ + fetch(`${base}timeout`, { signal: controller.signal }), + fetch(`${base}timeout`, { signal: controller2.signal }), + fetch( + `${base}timeout`, + { + method: 'POST', + signal: controller.signal, + headers: { + 'Content-Type': 'application/json', + body: JSON.stringify({ hello: 'world' }) + } + } + ) + ]; + setTimeout(() => { + controller.abort(); + controller2.abort(); + }, 100); + + return Promise.all(fetches.map(fetched => expect(fetched) + .to.eventually.be.rejected + .and.be.an.instanceOf(Error) + .and.include({ + type: 'aborted', + name: 'AbortError', + }) + )); + }); + + it('should reject immediately if signal has already been aborted', function () { + const url = `${base}timeout`; + const controller = new AbortController(); + const opts = { + signal: controller.signal + }; + controller.abort(); + const fetched = fetch(url, opts); + return expect(fetched).to.eventually.be.rejected + .and.be.an.instanceOf(Error) + .and.include({ + type: 'aborted', + name: 'AbortError', + }); + }); + + it('should clear internal timeout when request is cancelled with an AbortSignal', function(done) { + this.timeout(2000); + const script = ` + var AbortController = require('abortcontroller-polyfill/dist/cjs-ponyfill').AbortController; + var controller = new AbortController(); + require('./')( + '${base}timeout', + { signal: controller.signal, timeout: 10000 } + ); + setTimeout(function () { controller.abort(); }, 100); + ` + spawn('node', ['-e', script]) + .on('exit', () => { + done(); + }); + }); + + it('should remove internal AbortSignal event listener after request is aborted', function () { + const controller = new AbortController(); + const { signal } = controller; + const promise = fetch( + `${base}timeout`, + { signal } + ); + const result = expect(promise).to.eventually.be.rejected + .and.be.an.instanceof(Error) + .and.have.property('name', 'AbortError') + .then(() => { + expect(signal.listeners.abort.length).to.equal(0); + }); + controller.abort(); + return result; + }); + + it('should allow redirects to be aborted', function() { + const abortController = new AbortController(); + const request = new Request(`${base}redirect/slow`, { + signal: abortController.signal + }); + setTimeout(() => { + abortController.abort(); + }, 50); + return expect(fetch(request)).to.be.eventually.rejected + .and.be.an.instanceOf(Error) + .and.have.property('name', 'AbortError'); + }); + + it('should allow redirected response body to be aborted', function() { + const abortController = new AbortController(); + const request = new Request(`${base}redirect/slow-stream`, { + signal: abortController.signal + }); + return expect(fetch(request).then(res => { + expect(res.headers.get('content-type')).to.equal('text/plain'); + const result = res.text(); + abortController.abort(); + return result; + })).to.be.eventually.rejected + .and.be.an.instanceOf(Error) + .and.have.property('name', 'AbortError'); + }); + + it('should remove internal AbortSignal event listener after request and response complete without aborting', () => { + const controller = new AbortController(); + const { signal } = controller; + const fetchHtml = fetch(`${base}html`, { signal }) + .then(res => res.text()); + const fetchResponseError = fetch(`${base}error/reset`, { signal }); + const fetchRedirect = fetch(`${base}redirect/301`, { signal }).then(res => res.json()); + return Promise.all([ + expect(fetchHtml).to.eventually.be.fulfilled.and.equal(''), + expect(fetchResponseError).to.be.eventually.rejected, + expect(fetchRedirect).to.eventually.be.fulfilled, + ]).then(() => { + expect(signal.listeners.abort.length).to.equal(0) + }); + }); + + it('should reject response body with AbortError when aborted before stream has been read completely', () => { + const controller = new AbortController(); + return expect(fetch( + `${base}slow`, + { signal: controller.signal } + )) + .to.eventually.be.fulfilled + .then((res) => { + const promise = res.text(); + controller.abort(); + return expect(promise) + .to.eventually.be.rejected + .and.be.an.instanceof(Error) + .and.have.property('name', 'AbortError'); + }); + }); + + it('should reject response body methods immediately with AbortError when aborted before stream is disturbed', () => { + const controller = new AbortController(); + return expect(fetch( + `${base}slow`, + { signal: controller.signal } + )) + .to.eventually.be.fulfilled + .then((res) => { + controller.abort(); + return expect(res.text()) + .to.eventually.be.rejected + .and.be.an.instanceof(Error) + .and.have.property('name', 'AbortError'); + }); + }); + + it('should emit error event to response body with an AbortError when aborted before underlying stream is closed', (done) => { + const controller = new AbortController(); + expect(fetch( + `${base}slow`, + { signal: controller.signal } + )) + .to.eventually.be.fulfilled + .then((res) => { + res.body.on('error', (err) => { + expect(err) + .to.be.an.instanceof(Error) + .and.have.property('name', 'AbortError'); + done(); + }); + controller.abort(); + }); + }); + + (supportStreamDestroy ? it : it.skip)('should cancel request body of type Stream with AbortError when aborted', () => { + const controller = new AbortController(); + const body = new stream.Readable({ objectMode: true }); + body._read = () => {}; + const promise = fetch( + `${base}slow`, + { signal: controller.signal, body, method: 'POST' } + ); + + const result = Promise.all([ + new Promise((resolve, reject) => { + body.on('error', (error) => { + try { + expect(error).to.be.an.instanceof(Error).and.have.property('name', 'AbortError') + resolve(); + } catch (err) { + reject(err); + } + }); + }), + expect(promise).to.eventually.be.rejected + .and.be.an.instanceof(Error) + .and.have.property('name', 'AbortError') + ]); + + controller.abort(); + + return result; + }); + + (supportStreamDestroy ? it.skip : it)('should immediately reject when attempting to cancel streamed Requests in node < 8', () => { + const controller = new AbortController(); + const body = new stream.Readable({ objectMode: true }); + body._read = () => {}; + const promise = fetch( + `${base}slow`, + { signal: controller.signal, body, method: 'POST' } + ); + + return expect(promise).to.eventually.be.rejected + .and.be.an.instanceof(Error) + .and.have.property('message').includes('not supported'); + }); + + it('should throw a TypeError if a signal is not of type AbortSignal', () => { + return Promise.all([ + expect(fetch(`${base}inspect`, { signal: {} })) + .to.be.eventually.rejected + .and.be.an.instanceof(TypeError) + .and.have.property('message').includes('AbortSignal'), + expect(fetch(`${base}inspect`, { signal: '' })) + .to.be.eventually.rejected + .and.be.an.instanceof(TypeError) + .and.have.property('message').includes('AbortSignal'), + expect(fetch(`${base}inspect`, { signal: Object.create(null) })) + .to.be.eventually.rejected + .and.be.an.instanceof(TypeError) + .and.have.property('message').includes('AbortSignal'), + ]); + }); + it('should set default User-Agent', function () { const url = `${base}inspect`; return fetch(url).then(res => res.json()).then(res => { @@ -2016,12 +2261,12 @@ describe('Request', function () { } for (const toCheck of [ 'body', 'bodyUsed', 'arrayBuffer', 'blob', 'json', 'text', - 'method', 'url', 'headers', 'redirect', 'clone' + 'method', 'url', 'headers', 'redirect', 'clone', 'signal', ]) { expect(enumerableProperties).to.contain(toCheck); } for (const toCheck of [ - 'body', 'bodyUsed', 'method', 'url', 'headers', 'redirect' + 'body', 'bodyUsed', 'method', 'url', 'headers', 'redirect', 'signal', ]) { expect(() => { req[toCheck] = 'abc'; @@ -2034,11 +2279,13 @@ describe('Request', function () { const form = new FormData(); form.append('a', '1'); + const { signal } = new AbortController(); const r1 = new Request(url, { method: 'POST', follow: 1, - body: form + body: form, + signal, }); const r2 = new Request(r1, { follow: 2 @@ -2046,6 +2293,7 @@ describe('Request', function () { expect(r2.url).to.equal(url); expect(r2.method).to.equal('POST'); + expect(r2.signal).to.equal(signal); // note that we didn't clone the body expect(r2.body).to.equal(form); expect(r1.follow).to.equal(1); @@ -2054,6 +2302,31 @@ describe('Request', function () { expect(r2.counter).to.equal(0); }); + it('should override signal on derived Request instances', function() { + const parentAbortController = new AbortController(); + const derivedAbortController = new AbortController(); + const parentRequest = new Request(`test`, { + signal: parentAbortController.signal + }); + const derivedRequest = new Request(parentRequest, { + signal: derivedAbortController.signal + }); + expect(parentRequest.signal).to.equal(parentAbortController.signal); + expect(derivedRequest.signal).to.equal(derivedAbortController.signal); + }); + + it('should allow removing signal on derived Request instances', function() { + const parentAbortController = new AbortController(); + const parentRequest = new Request(`test`, { + signal: parentAbortController.signal + }); + const derivedRequest = new Request(parentRequest, { + signal: null + }); + expect(parentRequest.signal).to.equal(parentAbortController.signal); + expect(derivedRequest.signal).to.equal(null); + }); + it('should throw error with GET/HEAD requests with body', function() { expect(() => new Request('.', { body: '' })) .to.throw(TypeError); @@ -2161,6 +2434,7 @@ describe('Request', function () { let body = resumer().queue('a=1').end(); body = body.pipe(new stream.PassThrough()); const agent = new http.Agent(); + const { signal } = new AbortController(); const req = new Request(url, { body, method: 'POST', @@ -2170,7 +2444,8 @@ describe('Request', function () { }, follow: 3, compress: false, - agent + agent, + signal, }); const cl = req.clone(); expect(cl.url).to.equal(url); @@ -2182,6 +2457,7 @@ describe('Request', function () { expect(cl.method).to.equal('POST'); expect(cl.counter).to.equal(0); expect(cl.agent).to.equal(agent); + expect(cl.signal).to.equal(signal); // clone body shouldn't be the same body expect(cl.body).to.not.equal(body); return Promise.all([cl.text(), req.text()]).then(results => {