Skip to content

Commit

Permalink
dgram: implement socket.bind({ fd })
Browse files Browse the repository at this point in the history
dgram: Implement binding an existing `fd`. Allow pass a `fd` property
to `socket.bind()` in dgram.
src: Add `UDPWrap::Open`

PR-URL: #21745
Fixes: #14961
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
  • Loading branch information
oyyd authored and mcollina committed Aug 6, 2018
1 parent 214844e commit 2bea9ce
Show file tree
Hide file tree
Showing 10 changed files with 442 additions and 39 deletions.
6 changes: 6 additions & 0 deletions doc/api/dgram.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ added: v0.11.14
* `port` {integer}
* `address` {string}
* `exclusive` {boolean}
* `fd` {integer}
* `callback` {Function}

For UDP sockets, causes the `dgram.Socket` to listen for datagram
Expand All @@ -177,6 +178,11 @@ system will attempt to listen on all addresses. Once binding is
complete, a `'listening'` event is emitted and the optional `callback`
function is called.

The `options` object may contain a `fd` property. When a `fd` greater
than `0` is set, it will wrap around an existing socket with the given
file descriptor. In this case, the properties of `port` and `address`
will be ignored.

Note that specifying both a `'listening'` event listener and passing a
`callback` to the `socket.bind()` method is not harmful but not very
useful.
Expand Down
91 changes: 72 additions & 19 deletions lib/dgram.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ const errors = require('internal/errors');
const {
kStateSymbol,
_createSocketHandle,
newHandle
newHandle,
guessHandleType,
} = require('internal/dgram');
const {
ERR_INVALID_ARG_TYPE,
Expand All @@ -35,7 +36,8 @@ const {
ERR_SOCKET_BAD_PORT,
ERR_SOCKET_BUFFER_SIZE,
ERR_SOCKET_CANNOT_SEND,
ERR_SOCKET_DGRAM_NOT_RUNNING
ERR_SOCKET_DGRAM_NOT_RUNNING,
ERR_INVALID_FD_TYPE
} = errors.codes;
const { Buffer } = require('buffer');
const util = require('util');
Expand All @@ -45,6 +47,7 @@ const {
defaultTriggerAsyncIdScope,
symbols: { async_id_symbol, owner_symbol }
} = require('internal/async_hooks');
const { isInt32 } = require('internal/validators');
const { UV_UDP_REUSEADDR } = process.binding('constants').os;

const { UDP, SendWrap } = process.binding('udp_wrap');
Expand Down Expand Up @@ -151,6 +154,28 @@ function bufferSize(self, size, buffer) {
return ret;
}

// Query master process to get the server handle and utilize it.
function bindServerHandle(self, options, errCb) {
if (!cluster)
cluster = require('cluster');

const state = self[kStateSymbol];
cluster._getServer(self, options, (err, handle) => {
if (err) {
errCb(err);
return;
}

if (!state.handle) {
// Handle has been closed in the mean time.
return handle.close();
}

replaceHandle(self, handle);
startListening(self);
});
}

Socket.prototype.bind = function(port_, address_ /* , callback */) {
let port = port_;

Expand All @@ -171,6 +196,44 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
return this;
}

// Open an existing fd instead of creating a new one.
if (port !== null && typeof port === 'object' &&
isInt32(port.fd) && port.fd > 0) {
const fd = port.fd;
const exclusive = !!port.exclusive;
const state = this[kStateSymbol];

if (!cluster)
cluster = require('cluster');

if (cluster.isWorker && !exclusive) {
bindServerHandle(this, {
address: null,
port: null,
addressType: this.type,
fd,
flags: null
}, (err) => {
// Callback to handle error.
const ex = errnoException(err, 'open');
this.emit('error', ex);
state.bindState = BIND_STATE_UNBOUND;
});
return this;
}

const type = guessHandleType(fd);
if (type !== 'UDP')
throw new ERR_INVALID_FD_TYPE(type);
const err = state.handle.open(fd);

if (err)
throw errnoException(err, 'open');

startListening(this);
return this;
}

var address;
var exclusive;

Expand Down Expand Up @@ -207,28 +270,18 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
flags |= UV_UDP_REUSEADDR;

if (cluster.isWorker && !exclusive) {
const onHandle = (err, handle) => {
if (err) {
var ex = exceptionWithHostPort(err, 'bind', ip, port);
this.emit('error', ex);
state.bindState = BIND_STATE_UNBOUND;
return;
}

if (!state.handle)
// handle has been closed in the mean time.
return handle.close();

replaceHandle(this, handle);
startListening(this);
};
cluster._getServer(this, {
bindServerHandle(this, {
address: ip,
port: port,
addressType: this.type,
fd: -1,
flags: flags
}, onHandle);
}, (err) => {
// Callback to handle error.
const ex = exceptionWithHostPort(err, 'bind', ip, port);
this.emit('error', ex);
state.bindState = BIND_STATE_UNBOUND;
});
} else {
if (!state.handle)
return; // handle has been closed in the mean time
Expand Down
39 changes: 27 additions & 12 deletions lib/internal/dgram.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
'use strict';
const assert = require('assert');
const { codes } = require('internal/errors');
const { UDP } = process.binding('udp_wrap');
const { isInt32 } = require('internal/validators');
const TTYWrap = process.binding('tty_wrap');
const { UV_EINVAL } = process.binding('uv');
const { ERR_INVALID_ARG_TYPE, ERR_SOCKET_BAD_TYPE } = codes;
const kStateSymbol = Symbol('state symbol');
let dns; // Lazy load for startup performance.
Expand All @@ -17,6 +19,9 @@ function lookup6(lookup, address, callback) {
}


const guessHandleType = TTYWrap.guessHandleType;


function newHandle(type, lookup) {
if (lookup === undefined) {
if (dns === undefined) {
Expand Down Expand Up @@ -49,22 +54,32 @@ function newHandle(type, lookup) {


function _createSocketHandle(address, port, addressType, fd, flags) {
// Opening an existing fd is not supported for UDP handles.
assert(typeof fd !== 'number' || fd < 0);

const handle = newHandle(addressType);

if (port || address) {
const err = handle.bind(address, port || 0, flags);

if (err) {
handle.close();
return err;
let err;

if (isInt32(fd) && fd > 0) {
const type = guessHandleType(fd);
if (type !== 'UDP') {
err = UV_EINVAL;
} else {
err = handle.open(fd);
}
} else if (port || address) {
err = handle.bind(address, port || 0, flags);
}

if (err) {
handle.close();
return err;
}

return handle;
}


module.exports = { kStateSymbol, _createSocketHandle, newHandle };
module.exports = {
kStateSymbol,
_createSocketHandle,
newHandle,
guessHandleType,
};
13 changes: 13 additions & 0 deletions src/udp_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ void UDPWrap::Initialize(Local<Object> target,
Local<FunctionTemplate>(),
attributes);

env->SetProtoMethod(t, "open", Open);
env->SetProtoMethod(t, "bind", Bind);
env->SetProtoMethod(t, "send", Send);
env->SetProtoMethod(t, "bind6", Bind6);
Expand Down Expand Up @@ -206,6 +207,18 @@ void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) {
}


void UDPWrap::Open(const FunctionCallbackInfo<Value>& args) {
UDPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap,
args.Holder(),
args.GetReturnValue().Set(UV_EBADF));
int fd = static_cast<int>(args[0]->IntegerValue());
int err = uv_udp_open(&wrap->handle_, fd);

args.GetReturnValue().Set(err);
}


void UDPWrap::Bind(const FunctionCallbackInfo<Value>& args) {
DoBind(args, AF_INET);
}
Expand Down
1 change: 1 addition & 0 deletions src/udp_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class UDPWrap: public HandleWrap {
v8::Local<v8::Context> context);
static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Open(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Bind(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Send(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Bind6(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand Down
108 changes: 108 additions & 0 deletions test/parallel/test-cluster-dgram-bind-fd.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
'use strict';
const common = require('../common');
if (common.isWindows)
common.skip('dgram clustering is currently not supported on Windows.');

const NUM_WORKERS = 4;
const PACKETS_PER_WORKER = 10;

const assert = require('assert');
const cluster = require('cluster');
const dgram = require('dgram');
const { UDP } = process.binding('udp_wrap');

if (cluster.isMaster)
master();
else
worker();


function master() {
// Create a handle and use its fd.
const rawHandle = new UDP();
const err = rawHandle.bind(common.localhostIPv4, 0, 0);
assert(err >= 0, String(err));
assert.notStrictEqual(rawHandle.fd, -1);

const fd = rawHandle.fd;

let listening = 0;

// Fork 4 workers.
for (let i = 0; i < NUM_WORKERS; i++)
cluster.fork();

// Wait until all workers are listening.
cluster.on('listening', common.mustCall((worker, address) => {
if (++listening < NUM_WORKERS)
return;

// Start sending messages.
const buf = Buffer.from('hello world');
const socket = dgram.createSocket('udp4');
let sent = 0;
doSend();

function doSend() {
socket.send(buf, 0, buf.length, address.port, address.address, afterSend);
}

function afterSend() {
sent++;
if (sent < NUM_WORKERS * PACKETS_PER_WORKER) {
doSend();
} else {
socket.close();
}
}
}, NUM_WORKERS));

// Set up event handlers for every worker. Each worker sends a message when
// it has received the expected number of packets. After that it disconnects.
for (const key in cluster.workers) {
if (cluster.workers.hasOwnProperty(key))
setupWorker(cluster.workers[key]);
}

function setupWorker(worker) {
let received = 0;

worker.send({
fd,
});

worker.on('message', common.mustCall((msg) => {
received = msg.received;
worker.disconnect();
}));

worker.on('exit', common.mustCall(() => {
assert.strictEqual(received, PACKETS_PER_WORKER);
}));
}
}


function worker() {
let received = 0;

process.on('message', common.mustCall((data) => {
const { fd } = data;
// Create udp socket and start listening.
const socket = dgram.createSocket('udp4');

socket.on('message', common.mustCall((data, info) => {
received++;

// Every 10 messages, notify the master.
if (received === PACKETS_PER_WORKER) {
process.send({ received });
socket.close();
}
}, PACKETS_PER_WORKER));

socket.bind({
fd,
});
}));
}
Loading

0 comments on commit 2bea9ce

Please sign in to comment.