support for sharing streams accross isolates

This commit is contained in:
Igor Zinkovsky 2012-01-19 16:52:23 -08:00
Родитель 33b7fc250f
Коммит db3c4efd1d
4 изменённых файлов: 165 добавлений и 48 удалений

Просмотреть файл

@ -68,29 +68,13 @@ function mergeOptions(target, overrides) {
function setupChannel(target, channel) {
var isWindows = process.platform === 'win32';
target._channel = channel;
var jsonBuffer = '';
if (isWindows) {
var setSimultaneousAccepts = function(handle) {
var simultaneousAccepts = (process.env.NODE_MANY_ACCEPTS &&
process.env.NODE_MANY_ACCEPTS != '0') ? true : false;
if (handle._simultaneousAccepts != simultaneousAccepts) {
handle.setSimultaneousAccepts(simultaneousAccepts);
handle._simultaneousAccepts = simultaneousAccepts;
}
}
}
channel.buffering = false;
channel.onread = function(pool, offset, length, recvHandle) {
if (recvHandle && setSimultaneousAccepts) {
// Update simultaneous accepts on Windows
setSimultaneousAccepts(recvHandle);
}
net._setSimultaneousAccepts(recvHandle);
if (pool) {
jsonBuffer += pool.toString('ascii', offset, offset + length);
@ -140,10 +124,8 @@ function setupChannel(target, channel) {
var buffer = Buffer(JSON.stringify(message) + '\n');
if (sendHandle && setSimultaneousAccepts) {
// Update simultaneous accepts on Windows
setSimultaneousAccepts(sendHandle);
}
net._setSimultaneousAccepts(sendHandle);
var writeReq = channel.write(buffer, 0, buffer.length, sendHandle);
@ -582,9 +564,13 @@ Isolate.prototype.spawn = function(options) {
self._handle = isolates.create(options.args, options.options);
if (!self._handle) throw new Error('Cannot create isolate.');
self._handle.onmessage = function(msg) {
self._handle.onmessage = function(msg, recvHandle) {
msg = JSON.parse('' + msg);
self.emit('message', msg);
// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(recvHandle);
self.emit('message', msg, recvHandle);
};
self._handle.onexit = function() {
@ -600,10 +586,14 @@ Isolate.prototype.kill = function(sig) {
};
Isolate.prototype.send = function(msg) {
Isolate.prototype.send = function(msg, sendHandle) {
if (typeof msg === 'undefined') throw new TypeError('Bad argument.');
if (!this._handle) throw new Error('Isolate not running.');
msg = JSON.stringify(msg);
msg = new Buffer(msg);
return this._handle.send(msg);
// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(sendHandle);
return this._handle.send(msg, sendHandle);
};

Просмотреть файл

@ -942,3 +942,26 @@ exports.isIPv4 = function(input) {
exports.isIPv6 = function(input) {
return exports.isIP(input) === 6;
};
if (process.platform === 'win32') {
var simultaneousAccepts;
exports._setSimultaneousAccepts = function(handle) {
if (typeof handle === 'undefined') {
return;
}
if (typeof simultaneousAccepts === 'undefined') {
simultaneousAccepts = (process.env.NODE_MANY_ACCEPTS &&
process.env.NODE_MANY_ACCEPTS != '0') ? true : false;
}
if (handle._simultaneousAccepts != simultaneousAccepts) {
handle.setSimultaneousAccepts(simultaneousAccepts);
handle._simultaneousAccepts = simultaneousAccepts;
}
}
} else {
exports._setSimultaneousAccepts = function(handle) {}
}

Просмотреть файл

@ -123,17 +123,27 @@
if (process.tid === 1) return;
var net = NativeModule.require('net');
// isolate initialization
process.send = function(msg) {
process.send = function(msg, sendHandle) {
if (typeof msg === 'undefined') throw new TypeError('Bad argument.');
msg = JSON.stringify(msg);
msg = new Buffer(msg);
return process._send(msg);
// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(sendHandle);
return process._send(msg, sendHandle);
};
process._onmessage = function(msg) {
process._onmessage = function(msg, recvHandle) {
msg = JSON.parse('' + msg);
process.emit('message', msg);
// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(recvHandle);
process.emit('message', msg, recvHandle);
};
process.exit = process._exit;
@ -439,10 +449,15 @@
// Load tcp_wrap to avoid situation where we might immediately receive
// a message.
// FIXME is this really necessary?
process.binding('tcp_wrap')
process.binding('tcp_wrap');
cp._forkChild();
assert(process.send);
} else if (process.tid !== 1) {
// Load tcp_wrap to avoid situation where we might immediately receive
// a message.
// FIXME is this really necessary?
process.binding('tcp_wrap');
}
}

Просмотреть файл

@ -26,6 +26,7 @@
#include <node_isolate.h>
#include <node_internals.h>
#include <node_object_wrap.h>
#include <tcp_wrap.h>
#include <stdlib.h>
#include <string.h>
@ -34,6 +35,8 @@
#define isolate_debugger_constructor NODE_VAR(isolate_debugger_constructor)
#define ISOLATEMESSAGE_SHARED_STREAM 0x0001
namespace node {
@ -166,23 +169,35 @@ private:
struct IsolateMessage {
int flags;
struct {
size_t size_;
char* data_;
char* buffer_;
} data_;
uv_stream_info_t shared_stream_info_;
IsolateMessage(const char* buffer, size_t size,
uv_stream_info_t* shared_stream_info) {
flags = 0;
IsolateMessage(const char* data, size_t size) {
// make a copy for now
size_ = size;
data_ = new char[size];
memcpy(data_, data, size);
data_.size_ = size;
data_.buffer_ = new char[size];
memcpy(data_.buffer_, buffer, size);
if (shared_stream_info) {
flags |= ISOLATEMESSAGE_SHARED_STREAM;
shared_stream_info_ = *shared_stream_info;
}
}
~IsolateMessage() {
delete[] data_;
delete[] data_.buffer_;
}
static void Free(char* data, void* arg) {
IsolateMessage* msg = static_cast<IsolateMessage*>(arg);
assert(data == msg->data_);
assert(data == msg->data_.buffer_);
delete msg;
}
};
@ -208,7 +223,23 @@ Handle<Value> Isolate::Send(const Arguments& args) {
const char* data = Buffer::Data(obj);
size_t size = Buffer::Length(obj);
IsolateMessage* msg = new IsolateMessage(data, size);
IsolateMessage* msg;
if (args[1]->IsObject()) {
uv_stream_info_t stream_info;
Local<Object> send_stream_obj = args[1]->ToObject();
assert(send_stream_obj->InternalFieldCount() > 0);
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
send_stream_obj->GetPointerFromInternalField(0));
uv_stream_t* send_stream = send_stream_wrap->GetStream();
int r = uv_export(send_stream, &stream_info);
assert(r == 0);
msg = new IsolateMessage(data, size, &stream_info);
} else {
msg = new IsolateMessage(data, size, NULL);
}
isolate->send_channel_->Send(msg);
return Undefined();
@ -231,9 +262,31 @@ void Isolate::OnMessage(IsolateMessage* msg, void* arg) {
Isolate* self = static_cast<Isolate*>(arg);
NODE_ISOLATE_CHECK(self);
Buffer* buf = Buffer::New(msg->data_, msg->size_, IsolateMessage::Free, msg);
Handle<Value> argv[] = { buf->handle_ };
MakeCallback(self->globals_.process, "_onmessage", ARRAY_SIZE(argv), argv);
Buffer* buf = Buffer::New(msg->data_.buffer_, msg->data_.size_,
IsolateMessage::Free, msg);
int argc = 1;
Handle<Value> argv[2] = {
buf->handle_
};
if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) {
// Instantiate the client javascript object and handle.
Local<Object> pending_obj = TCPWrap::Instantiate();
// Unwrap the client javascript object.
assert(pending_obj->InternalFieldCount() > 0);
TCPWrap* pending_wrap =
static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0));
int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_);
assert(r == 0);
argv[1] = pending_obj;
argc++;
}
MakeCallback(self->globals_.process, "_onmessage", argc, argv);
}
@ -442,9 +495,30 @@ private:
NODE_ISOLATE_CHECK(parent_isolate_);
HandleScope scope;
Buffer* buf = Buffer::New(
msg->data_, msg->size_, IsolateMessage::Free, msg);
Handle<Value> argv[] = { buf->handle_ };
MakeCallback(handle_, "onmessage", ARRAY_SIZE(argv), argv);
msg->data_.buffer_, msg->data_.size_, IsolateMessage::Free, msg);
int argc = 1;
Handle<Value> argv[2] = {
buf->handle_
};
if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) {
// Instantiate the client javascript object and handle.
Local<Object> pending_obj = TCPWrap::Instantiate();
// Unwrap the client javascript object.
assert(pending_obj->InternalFieldCount() > 0);
TCPWrap* pending_wrap =
static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0));
int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_);
assert(r == 0);
argv[1] = pending_obj;
argc++;
}
MakeCallback(handle_, "onmessage", argc, argv);
}
// TODO merge with Isolate::Send(), it's almost identical
@ -457,9 +531,24 @@ private:
const char* data = Buffer::Data(obj);
size_t size = Buffer::Length(obj);
IsolateMessage* msg = new IsolateMessage(data, size);
self->send_channel_->Send(msg);
IsolateMessage* msg;
if (args[1]->IsObject()) {
uv_stream_info_t stream_info;
Local<Object> send_stream_obj = args[1]->ToObject();
assert(send_stream_obj->InternalFieldCount() > 0);
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
send_stream_obj->GetPointerFromInternalField(0));
uv_stream_t* send_stream = send_stream_wrap->GetStream();
int r = uv_export(send_stream, &stream_info);
assert(r == 0);
msg = new IsolateMessage(data, size, &stream_info);
} else {
msg = new IsolateMessage(data, size, NULL);
}
self->send_channel_->Send(msg);
return Undefined();
}