From 9bee03aaf2d8137a5e490150e759750ccdc65202 Mon Sep 17 00:00:00 2001 From: Brian White Date: Mon, 11 Jan 2016 23:41:01 -0500 Subject: [PATCH] http: allow async createConnection() This commit adds support for async createConnection() implementations and is still backwards compatible with synchronous createConnection() implementations. This commit also makes the http client more friendly with generic stream objects produced by createConnection() by checking stream.writable instead of stream.destroyed as the latter is currently a net.Socket-ism and not set by the core stream implementations. PR-URL: https://github.com/nodejs/node/pull/4638 Reviewed-By: Matteo Collina Reviewed-By: James M Snell --- doc/api/http.markdown | 18 ++++ lib/_http_agent.js | 110 ++++++++++++-------- lib/_http_client.js | 41 +++++++- test/parallel/test-http-createConnection.js | 68 +++++++++--- 4 files changed, 173 insertions(+), 64 deletions(-) diff --git a/doc/api/http.markdown b/doc/api/http.markdown index 8dfde77524ffb7..4108080a50957a 100644 --- a/doc/api/http.markdown +++ b/doc/api/http.markdown @@ -117,6 +117,18 @@ options.agent = keepAliveAgent; http.request(options, onResponseCallback); ``` +### agent.createConnection(options[, callback]) + +Produces a socket/stream to be used for HTTP requests. + +By default, this function is the same as [`net.createConnection()`][]. However, +custom Agents may override this method in case greater flexibility is desired. + +A socket/stream can be supplied in one of two ways: by returning the +socket/stream from this function, or by passing the socket/stream to `callback`. + +`callback` has a signature of `(err, stream)`. + ### agent.destroy() Destroy any sockets that are currently in use by the agent. @@ -1117,6 +1129,10 @@ Options: - `Agent` object: explicitly use the passed in `Agent`. - `false`: opts out of connection pooling with an Agent, defaults request to `Connection: close`. +- `createConnection`: A function that produces a socket/stream to use for the + request when the `agent` option is not used. This can be used to avoid + creating a custom Agent class just to override the default `createConnection` + function. See [`agent.createConnection()`][] for more details. The optional `callback` parameter will be added as a one time listener for the `'response'` event. @@ -1192,6 +1208,7 @@ There are a few special headers that should be noted. [`'listening'`]: net.html#net_event_listening [`'response'`]: #http_event_response [`Agent`]: #http_class_http_agent +[`agent.createConnection`]: #http_agent_createconnection [`Buffer`]: buffer.html#buffer_buffer [`destroy()`]: #http_agent_destroy [`EventEmitter`]: events.html#events_class_events_eventemitter @@ -1203,6 +1220,7 @@ There are a few special headers that should be noted. [`http.Server`]: #http_class_http_server [`http.ServerResponse`]: #http_class_http_serverresponse [`message.headers`]: #http_message_headers +[`net.createConnection`]: net.html#net_net_createconnection_options_connectlistener [`net.Server`]: net.html#net_class_net_server [`net.Server.close()`]: net.html#net_server_close_callback [`net.Server.listen()`]: net.html#net_server_listen_handle_callback diff --git a/lib/_http_agent.js b/lib/_http_agent.js index ddb1c5bfff9b63..5828927786288f 100644 --- a/lib/_http_agent.js +++ b/lib/_http_agent.js @@ -44,7 +44,7 @@ function Agent(options) { var name = self.getName(options); debug('agent.on(free)', name); - if (!socket.destroyed && + if (socket.writable && self.requests[name] && self.requests[name].length) { self.requests[name].shift().onSocket(socket); if (self.requests[name].length === 0) { @@ -57,7 +57,7 @@ function Agent(options) { var req = socket._httpMessage; if (req && req.shouldKeepAlive && - !socket.destroyed && + socket.writable && self.keepAlive) { var freeSockets = self.freeSockets[name]; var freeLen = freeSockets ? freeSockets.length : 0; @@ -138,7 +138,15 @@ Agent.prototype.addRequest = function(req, options) { } else if (sockLen < this.maxSockets) { debug('call onSocket', sockLen, freeLen); // If we are under maxSockets create a new one. - req.onSocket(this.createSocket(req, options)); + this.createSocket(req, options, function(err, newSocket) { + if (err) { + process.nextTick(function() { + req.emit('error', err); + }); + return; + } + req.onSocket(newSocket); + }); } else { debug('wait for socket'); // We are over limit so we'll add it to the queue. @@ -149,18 +157,16 @@ Agent.prototype.addRequest = function(req, options) { } }; -Agent.prototype.createSocket = function(req, options) { +Agent.prototype.createSocket = function(req, options, cb) { var self = this; options = util._extend({}, options); options = util._extend(options, self.options); if (!options.servername) { options.servername = options.host; - if (req) { - var hostHeader = req.getHeader('host'); - if (hostHeader) { - options.servername = hostHeader.replace(/:.*$/, ''); - } + const hostHeader = req.getHeader('host'); + if (hostHeader) { + options.servername = hostHeader.replace(/:.*$/, ''); } } @@ -169,48 +175,58 @@ Agent.prototype.createSocket = function(req, options) { debug('createConnection', name, options); options.encoding = null; - var s = self.createConnection(options); - if (!self.sockets[name]) { - self.sockets[name] = []; - } - this.sockets[name].push(s); - debug('sockets', name, this.sockets[name].length); + var called = false; + const newSocket = self.createConnection(options, oncreate); + if (newSocket) + oncreate(null, newSocket); + function oncreate(err, s) { + if (called) + return; + called = true; + if (err) + return cb(err); + if (!self.sockets[name]) { + self.sockets[name] = []; + } + self.sockets[name].push(s); + debug('sockets', name, self.sockets[name].length); - function onFree() { - self.emit('free', s, options); - } - s.on('free', onFree); - - function onClose(err) { - debug('CLIENT socket onClose'); - // This is the only place where sockets get removed from the Agent. - // If you want to remove a socket from the pool, just close it. - // All socket errors end in a close event anyway. - self.removeSocket(s, options); - } - s.on('close', onClose); - - function onRemove() { - // We need this function for cases like HTTP 'upgrade' - // (defined by WebSockets) where we need to remove a socket from the - // pool because it'll be locked up indefinitely - debug('CLIENT socket onRemove'); - self.removeSocket(s, options); - s.removeListener('close', onClose); - s.removeListener('free', onFree); - s.removeListener('agentRemove', onRemove); + function onFree() { + self.emit('free', s, options); + } + s.on('free', onFree); + + function onClose(err) { + debug('CLIENT socket onClose'); + // This is the only place where sockets get removed from the Agent. + // If you want to remove a socket from the pool, just close it. + // All socket errors end in a close event anyway. + self.removeSocket(s, options); + } + s.on('close', onClose); + + function onRemove() { + // We need this function for cases like HTTP 'upgrade' + // (defined by WebSockets) where we need to remove a socket from the + // pool because it'll be locked up indefinitely + debug('CLIENT socket onRemove'); + self.removeSocket(s, options); + s.removeListener('close', onClose); + s.removeListener('free', onFree); + s.removeListener('agentRemove', onRemove); + } + s.on('agentRemove', onRemove); + cb(null, s); } - s.on('agentRemove', onRemove); - return s; }; Agent.prototype.removeSocket = function(s, options) { var name = this.getName(options); - debug('removeSocket', name, 'destroyed:', s.destroyed); + debug('removeSocket', name, 'writable:', s.writable); var sets = [this.sockets]; // If the socket was destroyed, remove it from the free buffers too. - if (s.destroyed) + if (!s.writable) sets.push(this.freeSockets); for (var sk = 0; sk < sets.length; sk++) { @@ -231,7 +247,15 @@ Agent.prototype.removeSocket = function(s, options) { debug('removeSocket, have a request, make a socket'); var req = this.requests[name][0]; // If we have pending requests and a socket gets closed make a new one - this.createSocket(req, options).emit('free'); + this.createSocket(req, options, function(err, newSocket) { + if (err) { + process.nextTick(function() { + req.emit('error', err); + }); + return; + } + newSocket.emit('free'); + }); } }; diff --git a/lib/_http_client.js b/lib/_http_client.js index 34072f8f996840..81fa5dabd4fac1 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -33,7 +33,7 @@ function ClientRequest(options, cb) { if (agent === false) { agent = new defaultAgent.constructor(); } else if ((agent === null || agent === undefined) && - !options.createConnection) { + typeof options.createConnection !== 'function') { agent = defaultAgent; } self.agent = agent; @@ -118,10 +118,20 @@ function ClientRequest(options, cb) { self._renderHeaders()); } + var called = false; if (self.socketPath) { self._last = true; self.shouldKeepAlive = false; - self.onSocket(self.agent.createConnection({ path: self.socketPath })); + const optionsPath = { + path: self.socketPath + }; + const newSocket = self.agent.createConnection(optionsPath, oncreate); + if (newSocket && !called) { + called = true; + self.onSocket(newSocket); + } else { + return; + } } else if (self.agent) { // If there is an agent we should default to Connection:keep-alive, // but only if the Agent will actually reuse the connection! @@ -139,14 +149,37 @@ function ClientRequest(options, cb) { // No agent, default to Connection:close. self._last = true; self.shouldKeepAlive = false; - if (options.createConnection) { - self.onSocket(options.createConnection(options)); + if (typeof options.createConnection === 'function') { + const newSocket = options.createConnection(options, oncreate); + if (newSocket && !called) { + called = true; + self.onSocket(newSocket); + } else { + return; + } } else { debug('CLIENT use net.createConnection', options); self.onSocket(net.createConnection(options)); } } + function oncreate(err, socket) { + if (called) + return; + called = true; + if (err) { + process.nextTick(function() { + self.emit('error', err); + }); + return; + } + self.onSocket(socket); + self._deferToConnect(null, null, function() { + self._flush(); + self = null; + }); + } + self._deferToConnect(null, null, function() { self._flush(); self = null; diff --git a/test/parallel/test-http-createConnection.js b/test/parallel/test-http-createConnection.js index 48a7d7dbe68ea3..1b7376d1287dd6 100644 --- a/test/parallel/test-http-createConnection.js +++ b/test/parallel/test-http-createConnection.js @@ -1,27 +1,61 @@ 'use strict'; -var common = require('../common'); -var assert = require('assert'); -var http = require('http'); -var net = require('net'); +const common = require('../common'); +const http = require('http'); +const net = require('net'); +const assert = require('assert'); -var create = 0; -var response = 0; -process.on('exit', function() { - assert.equal(1, create, 'createConnection() http option was not called'); - assert.equal(1, response, 'http server "request" callback was not called'); -}); - -var server = http.createServer(function(req, res) { +const server = http.createServer(common.mustCall(function(req, res) { res.end(); - response++; -}).listen(common.PORT, '127.0.0.1', function() { - http.get({ createConnection: createConnection }, function(res) { +}, 4)).listen(common.PORT, '127.0.0.1', function() { + let fn = common.mustCall(createConnection); + http.get({ createConnection: fn }, function(res) { res.resume(); - server.close(); + fn = common.mustCall(createConnectionAsync); + http.get({ createConnection: fn }, function(res) { + res.resume(); + fn = common.mustCall(createConnectionBoth1); + http.get({ createConnection: fn }, function(res) { + res.resume(); + fn = common.mustCall(createConnectionBoth2); + http.get({ createConnection: fn }, function(res) { + res.resume(); + fn = common.mustCall(createConnectionError); + http.get({ createConnection: fn }, function(res) { + assert.fail(null, null, 'Unexpected response callback'); + }).on('error', common.mustCall(function(err) { + assert.equal(err.message, 'Could not create socket'); + server.close(); + })); + }); + }); + }); }); }); function createConnection() { - create++; return net.createConnection(common.PORT, '127.0.0.1'); } + +function createConnectionAsync(options, cb) { + setImmediate(function() { + cb(null, net.createConnection(common.PORT, '127.0.0.1')); + }); +} + +function createConnectionBoth1(options, cb) { + const socket = net.createConnection(common.PORT, '127.0.0.1'); + setImmediate(function() { + cb(null, socket); + }); + return socket; +} + +function createConnectionBoth2(options, cb) { + const socket = net.createConnection(common.PORT, '127.0.0.1'); + cb(null, socket); + return socket; +} + +function createConnectionError(options, cb) { + process.nextTick(cb, new Error('Could not create socket')); +}