Relatively large update to TCP API. No more "protocol".

Instead servers are passed a function which gets called on connection (like
in the original design) which has one argument, the connecting socket. The
user sets up callbacks on that. It's pretty much how I had it originally.

Encoding is now set via v8 getter/setter and can be changed dynamically.

The timeout for all sockets is fixed at 60 seconds for now. Need to fix
that.
This commit is contained in:
Ryan 2009-05-14 23:47:21 +02:00
Родитель 31ba3cde17
Коммит 73fb24f48d
6 изменённых файлов: 152 добавлений и 137 удалений

Просмотреть файл

@ -30,7 +30,7 @@ body {
}
#toc a { color: #777; }
h1, h2, h3 { color: #aaf; }
h1, h2, h3, h4 { color: #bbb; }
h1 {
margin: 2em 0;
@ -42,22 +42,30 @@ h1 {
h1 a { color: inherit; }
h2 {
font-size: 45px;
font-size: 30px;
line-height: inherit;
font-weight: bold;
margin: 2em 0;
}
h3 {
margin: 2em 0;
font-size: 20px;
line-height: inherit;
font-weight: bold;
}
h3 {
h4 {
margin: 1em 0;
font-size: 30px;
font-size: inherit;
line-height: inherit;
font-weight: inherit;
font-weight: bold;
}
pre, code {
font-family: monospace;
font-size: 13pt;
color: #eae;
color: #aaf;
}
pre {
@ -137,31 +145,33 @@ Check out <a href="#api">the documentation</a> for more examples.
<h2 id="motivation">Motivation</h2>
<ol>
<li>Evented programming makes sense
<ol>
<li>difference between blocking/non-blocking design
<p> There are many methods to write internet servers but they can
fundamentally be divided into two camps: evented and threaded; non-blocking
and blocking. A blocking server accepts a connection and launches a new
thread to handle the connection. Because the concurrency is handled by
the thread scheduler, a blocking server can make function calls which
preform full network requests.
<h3>Evented Programming Makes More Sense</h3>
difference between blocking/non-blocking design
<p> There are many methods to write internet servers but they can
fundamentally be divided into two camps: evented and threaded; non-blocking
and blocking. A blocking server accepts a connection and launches a new
thread to handle the connection. Because the concurrency is handled by
the thread scheduler, a blocking server can make function calls which
preform full network requests.
<pre class="sh_javascript">var response = db.execute("SELECT * FROM table");
// do something</pre>
<p> An evented server manages its concurrency itself. All connections
are handled in a single thread and callbacks are executed on certain
events: "socket 23 is has data to read", "socket 65's write buffer is
empty". An evented server executes small bits of code but never
<i>blocks</i> the process. In the evented world callbacks are used
instead of functions
<p> An evented server manages its concurrency itself. All connections
are handled in a single thread and callbacks are executed on certain
events: "socket 23 is has data to read", "socket 65's write buffer is
empty". An evented server executes small bits of code but never
<i>blocks</i> the process. In the evented world callbacks are used
instead of functions
<pre class="sh_javascript">db.execute("SELECT * FROM table", function (response) {
// do something
// do something
});</pre>
<li><a href="http://duartes.org/gustavo/blog/post/what-your-computer-does-while-you-wait">I/O latency</a>
<p><a href="http://duartes.org/gustavo/blog/post/what-your-computer-does-while-you-wait">I/O latency</a>
<pre>
l1 cache ~ 3
l2 cache ~ 14
@ -169,9 +179,10 @@ l2 cache ~ 14
disk ~ 41000000
network ~ 240000000
</pre>
<li>purely evented interfaces rule out a lot of stupidity
</ol>
<li>Evented programs are more efficient
<p>purely evented interfaces rule out a lot of stupidity
<h3>Evented programs are more efficient</h3>
<ol>
<li>pthread stack size
2mb default stack size on linux (1mb on windows, 64kb on FreeBSD)
@ -180,7 +191,7 @@ l2 cache ~ 14
<li>Apache vs. Nginx
<li>event machine vs mongrel (neverblock)
</ol>
<li>The appropriateness of Javascript
<h3>The appropriateness of Javascript</h3>
<ol>
<li>No I/O
<p> Javascript is without I/O. In the browser the DOM provides I/O,
@ -200,7 +211,6 @@ l2 cache ~ 14
systems tend to be written in C and portable web-level systems are
written in Javascript.
</ol>
</ol>
<h2 id="benchmarks">Benchmarks</h2>

Просмотреть файл

@ -60,7 +60,7 @@ HTTPConnection::v8NewClient (const Arguments& args)
if (args[0]->IsFunction() == false)
return ThrowException(String::New("Must pass a class as the first argument."));
Local<Function> protocol_class = Local<Function>::Cast(args[0]);
new HTTPConnection(args.This(), protocol_class, HTTP_RESPONSE);
new HTTPConnection(args.This(), HTTP_RESPONSE);
return args.This();
}
@ -71,7 +71,7 @@ HTTPConnection::v8NewServer (const Arguments& args)
if (args[0]->IsFunction() == false)
return ThrowException(String::New("Must pass a class as the first argument."));
Local<Function> protocol_class = Local<Function>::Cast(args[0]);
new HTTPConnection(args.This(), protocol_class, HTTP_REQUEST);
new HTTPConnection(args.This(), HTTP_REQUEST);
return args.This();
}
@ -90,8 +90,7 @@ HTTPConnection::on_message_begin (http_parser *parser)
HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data);
HandleScope scope;
Local<Object> protocol = connection->GetProtocol();
Local<Value> on_message_v = protocol->Get(ON_MESSAGE_SYMBOL);
Local<Value> on_message_v = connection->handle_->Get(ON_MESSAGE_SYMBOL);
if (!on_message_v->IsFunction()) return -1;
Handle<Function> on_message = Handle<Function>::Cast(on_message_v);
@ -272,8 +271,8 @@ HTTPConnection::on_message_complete (http_parser *parser)
return 0;
}
HTTPConnection::HTTPConnection (Handle<Object> handle, Handle<Function> protocol_class, enum http_parser_type type)
: Connection(handle, protocol_class)
HTTPConnection::HTTPConnection (Handle<Object> handle, enum http_parser_type type)
: Connection(handle)
{
http_parser_init (&parser_, type);
parser_.on_message_begin = on_message_begin;
@ -330,15 +329,14 @@ HTTPServer::OnConnection (struct sockaddr *addr, socklen_t len)
{
HandleScope scope;
Local<Function> protocol_class = GetProtocolClass();
if (protocol_class.IsEmpty()) {
Local<Function> connection_handler = GetConnectionHandler ();
if (connection_handler.IsEmpty()) {
Close();
return NULL;
}
Handle<Value> argv[] = { protocol_class };
Local<Object> connection_handle =
HTTPConnection::server_constructor_template->GetFunction()->NewInstance(1, argv);
HTTPConnection::server_constructor_template->GetFunction()->NewInstance(0, NULL);
HTTPConnection *connection = NODE_UNWRAP(HTTPConnection, connection_handle);
connection->SetAcceptor(handle_);

Просмотреть файл

@ -19,7 +19,6 @@ protected:
static v8::Handle<v8::Value> v8NewServer (const v8::Arguments& args);
HTTPConnection (v8::Handle<v8::Object> handle,
v8::Handle<v8::Function> protocol_class,
enum http_parser_type type);
void OnReceive (const void *buf, size_t len);

Просмотреть файл

@ -13,6 +13,9 @@
using namespace v8;
using namespace node;
#define UTF8_SYMBOL String::NewSymbol("utf8")
#define RAW_SYMBOL String::NewSymbol("raw")
#define ON_RECEIVE_SYMBOL String::NewSymbol("onReceive")
#define ON_DISCONNECT_SYMBOL String::NewSymbol("onDisconnect")
#define ON_CONNECT_SYMBOL String::NewSymbol("onConnect")
@ -26,7 +29,7 @@ using namespace node;
#define SERVER_SYMBOL String::NewSymbol("server")
#define PROTOCOL_SYMBOL String::NewSymbol("protocol")
#define PROTOCOL_CLASS_SYMBOL String::NewSymbol("protocol_class")
#define CONNECTION_HANDLER_SYMBOL String::NewSymbol("connection_handler")
static const struct addrinfo tcp_hints =
/* ai_flags */ { AI_PASSIVE
@ -57,36 +60,50 @@ Connection::Initialize (v8::Handle<v8::Object> target)
NODE_SET_PROTOTYPE_METHOD(constructor_template, "fullClose", v8FullClose);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "forceClose", v8ForceClose);
constructor_template->PrototypeTemplate()->SetAccessor(
String::NewSymbol("encoding"),
EncodingGetter,
EncodingSetter);
target->Set(String::NewSymbol("Connection"), constructor_template->GetFunction());
}
Connection::Connection (Handle<Object> handle, Handle<Function> protocol_class)
: ObjectWrap(handle)
Handle<Value>
Connection::EncodingGetter (Local<String> _, const AccessorInfo& info)
{
Connection *connection = NODE_UNWRAP(Connection, info.This());
HandleScope scope;
// Instanciate the protocol object
Handle<Value> argv[] = { handle_ };
Local<Object> protocol = protocol_class->NewInstance(1, argv);
handle_->Set(PROTOCOL_SYMBOL, protocol);
if (connection->encoding_ == UTF8)
return scope.Close(UTF8_SYMBOL);
else
return scope.Close(RAW_SYMBOL);
}
// TODO use SetNamedPropertyHandler (or whatever) for encoding and timeout
// instead of just reading it once?
encoding_ = RAW;
Local<Value> encoding_v = protocol->Get(ENCODING_SYMBOL);
if (encoding_v->IsString()) {
Local<String> encoding_string = encoding_v->ToString();
char buf[5]; // need enough room for "utf8" or "raw"
encoding_string->WriteAscii(buf, 0, 4);
buf[4] = '\0';
if(strcasecmp(buf, "utf8") == 0) encoding_ = UTF8;
void
Connection::EncodingSetter (Local<String> _, Local<Value> value, const AccessorInfo& info)
{
Connection *connection = NODE_UNWRAP(Connection, info.This());
if (!value->IsString()) {
connection->encoding_ = RAW;
return;
}
HandleScope scope;
Local<String> encoding = value->ToString();
char buf[5]; // need enough room for "utf8" or "raw"
encoding->WriteAscii(buf, 0, 4);
buf[4] = '\0';
if(strcasecmp(buf, "utf8") == 0)
connection->encoding_ = UTF8;
else
connection->encoding_ = RAW;
}
double timeout = 0.0; // default
Local<Value> timeout_v = protocol->Get(TIMEOUT_SYMBOL);
if (encoding_v->IsInt32())
timeout = timeout_v->Int32Value() / 1000.0;
Connection::Connection (Handle<Object> handle)
: ObjectWrap(handle)
{
encoding_ = RAW;
double timeout = 60.0; // default
host_ = NULL;
port_ = NULL;
@ -111,20 +128,6 @@ Connection::~Connection ()
ForceClose();
}
Local<Object>
Connection::GetProtocol (void)
{
HandleScope scope;
Local<Value> protocol_v = handle_->Get(PROTOCOL_SYMBOL);
if (protocol_v->IsObject()) {
Local<Object> protocol = protocol_v->ToObject();
return scope.Close(protocol);
}
return Local<Object>();
}
void
Connection::SetAcceptor (Handle<Object> acceptor_handle)
{
@ -138,10 +141,7 @@ Handle<Value>
Connection::v8New (const Arguments& args)
{
HandleScope scope;
if (args[0]->IsFunction() == false)
return ThrowException(String::New("Must pass a class as the first argument."));
Handle<Function> protocol_class = Handle<Function>::Cast(args[0]);
new Connection(args.This(), protocol_class);
new Connection(args.This());
return args.This();
}
@ -290,8 +290,7 @@ Connection::OnReceive (const void *buf, size_t len)
{
HandleScope scope;
Local<Object> protocol = GetProtocol();
Handle<Value> callback_v = protocol->Get(ON_RECEIVE_SYMBOL);
Handle<Value> callback_v = handle_->Get(ON_RECEIVE_SYMBOL);
if (!callback_v->IsFunction()) return;
Handle<Function> callback = Handle<Function>::Cast(callback_v);
@ -319,7 +318,7 @@ Connection::OnReceive (const void *buf, size_t len)
}
TryCatch try_catch;
callback->Call(protocol, argc, argv);
callback->Call(handle_, argc, argv);
if (try_catch.HasCaught())
fatal_exception(try_catch); // XXX is this the right action to take?
@ -329,11 +328,10 @@ Connection::OnReceive (const void *buf, size_t len)
void name () \
{ \
HandleScope scope; \
Local<Object> protocol = GetProtocol(); \
Local<Value> callback_v = protocol->Get(symbol); \
Local<Value> callback_v = handle_->Get(symbol); \
if (!callback_v->IsFunction()) return; \
Handle<Function> callback = Handle<Function>::Cast(callback_v); \
callback->Call(protocol, 0, NULL); \
callback->Call(handle_, 0, NULL); \
}
DEFINE_SIMPLE_CALLBACK(Connection::OnConnect, ON_CONNECT_SYMBOL)
@ -360,12 +358,12 @@ Acceptor::Initialize (Handle<Object> target)
target->Set(String::NewSymbol("Server"), constructor_template->GetFunction());
}
Acceptor::Acceptor (Handle<Object> handle, Handle<Function> protocol_class, Handle<Object> options)
Acceptor::Acceptor (Handle<Object> handle, Handle<Function> connection_handler, Handle<Object> options)
: ObjectWrap(handle)
{
HandleScope scope;
handle_->SetHiddenValue(PROTOCOL_CLASS_SYMBOL, protocol_class);
handle_->SetHiddenValue(CONNECTION_HANDLER_SYMBOL, connection_handler);
int backlog = 1024; // default value
Local<Value> backlog_v = options->Get(String::NewSymbol("backlog"));
@ -388,20 +386,27 @@ Acceptor::OnConnection (struct sockaddr *addr, socklen_t len)
{
HandleScope scope;
Local<Function> protocol_class = GetProtocolClass();
if (protocol_class.IsEmpty()) {
printf("protocol class was empty!");
Local<Function> connection_handler = GetConnectionHandler();
if (connection_handler.IsEmpty()) {
printf("Connection handler was empty!");
Close();
return NULL;
}
Handle<Value> argv[] = { protocol_class };
Local<Object> connection_handle =
Connection::constructor_template->GetFunction()->NewInstance(1, argv);
Connection::constructor_template->GetFunction()->NewInstance(0, NULL);
Connection *connection = NODE_UNWRAP(Connection, connection_handle);
connection->SetAcceptor(handle_);
Handle<Value> argv[1] = { connection_handle };
TryCatch try_catch;
Local<Value> ret = connection_handler->Call(handle_, 1, argv);
if (ret.IsEmpty())
fatal_exception(try_catch);
return connection;
}
@ -413,7 +418,7 @@ Acceptor::v8New (const Arguments& args)
if (args.Length() < 1 || args[0]->IsFunction() == false)
return ThrowException(String::New("Must at give connection handler as the first argument"));
Local<Function> protocol_class = Local<Function>::Cast(args[0]);
Local<Function> connection_handler = Local<Function>::Cast(args[0]);
Local<Object> options;
if (args.Length() > 1 && args[1]->IsObject()) {
@ -422,7 +427,7 @@ Acceptor::v8New (const Arguments& args)
options = Object::New();
}
new Acceptor(args.This(), protocol_class, options);
new Acceptor(args.This(), connection_handler, options);
return args.This();
}
@ -468,15 +473,16 @@ Acceptor::v8Close (const Arguments& args)
}
Local<v8::Function>
Acceptor::GetProtocolClass (void)
Acceptor::GetConnectionHandler (void)
{
HandleScope scope;
Local<Value> protocol_class_v = handle_->GetHiddenValue(PROTOCOL_CLASS_SYMBOL);
if (protocol_class_v->IsFunction()) {
Local<Function> protocol_class = Local<Function>::Cast(protocol_class_v);
return scope.Close(protocol_class);
Local<Value> connection_handler_v = handle_->GetHiddenValue(CONNECTION_HANDLER_SYMBOL);
if (connection_handler_v->IsFunction()) {
Local<Function> connection_handler = Local<Function>::Cast(connection_handler_v);
return scope.Close(connection_handler);
}
return Local<Function>();
}

Просмотреть файл

@ -22,8 +22,10 @@ protected:
static v8::Handle<v8::Value> v8Close (const v8::Arguments& args);
static v8::Handle<v8::Value> v8FullClose (const v8::Arguments& args);
static v8::Handle<v8::Value> v8ForceClose (const v8::Arguments& args);
static v8::Handle<v8::Value> EncodingGetter (v8::Local<v8::String> _, const v8::AccessorInfo& info);
static void EncodingSetter (v8::Local<v8::String> _, v8::Local<v8::Value> value, const v8::AccessorInfo& info);
Connection (v8::Handle<v8::Object> handle, v8::Handle<v8::Function> protocol_class);
Connection (v8::Handle<v8::Object> handle);
virtual ~Connection ();
int Connect (struct addrinfo *address) { return oi_socket_connect (&socket_, address); }
@ -105,11 +107,11 @@ protected:
static v8::Handle<v8::Value> v8Close (const v8::Arguments& args);
Acceptor (v8::Handle<v8::Object> handle,
v8::Handle<v8::Function> protocol_class,
v8::Handle<v8::Function> connection_handler,
v8::Handle<v8::Object> options);
virtual ~Acceptor () { Close(); puts("acceptor gc'd!");}
v8::Local<v8::Function> GetProtocolClass (void);
v8::Local<v8::Function> GetConnectionHandler (void);
int Listen (struct addrinfo *address) {
int r = oi_server_listen (&server_, address);

Просмотреть файл

@ -5,14 +5,13 @@ var N = 1000;
var count = 0;
function Ponger (socket) {
this.encoding = "UTF8";
this.timeout = 0;
socket.encoding = "UTF8";
socket.timeout = 0;
this.onConnect = function () {
puts("got socket.");
};
puts("got socket.");
this.onReceive = function (data) {
socket.onReceive = function (data) {
//puts("server recved data: " + JSON.stringify(data));
assertTrue(count <= N);
stdout.print("-");
if (/PING/.exec(data)) {
@ -20,46 +19,47 @@ function Ponger (socket) {
}
};
this.onEOF = function () {
socket.onEOF = function () {
puts("ponger: onEOF");
socket.close();
};
this.onDisconnect = function () {
socket.onDisconnect = function () {
puts("ponger: onDisconnect");
socket.server.close();
};
}
function Pinger (socket) {
this.encoding = "UTF8";
this.onConnect = function () {
socket.send("PING");
};
this.onReceive = function (data) {
stdout.print(".");
assertEquals("PONG", data);
count += 1;
if (count < N) {
socket.send("PING");
} else {
puts("sending FIN");
socket.close();
}
};
this.onEOF = function () {
puts("pinger: onEOF");
assertEquals(N, count);
};
}
function onLoad() {
var server = new node.tcp.Server(Ponger);
server.listen(port);
var client = new node.tcp.Connection(Pinger);
var client = new node.tcp.Connection();
client.encoding = "UTF8";
client.onConnect = function () {
puts("client is connected.");
client.send("PING");
};
client.onReceive = function (data) {
//puts("client recved data: " + JSON.stringify(data));
stdout.print(".");
assertEquals("PONG", data);
count += 1;
if (count < N) {
client.send("PING");
} else {
puts("sending FIN");
client.close();
}
};
client.onEOF = function () {
puts("pinger: onEOF");
assertEquals(N, count);
};
client.connect(port);
}