Get stdin/stdout working. Add process->Close().

This commit is contained in:
Ryan 2009-06-21 13:10:00 +02:00
Родитель 83cb156b6f
Коммит 03c5772ce4
2 изменённых файлов: 247 добавлений и 26 удалений

Просмотреть файл

@ -2,6 +2,8 @@
#include "process.h"
#include <assert.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
@ -9,6 +11,10 @@
using namespace v8;
using namespace node;
#define ON_ERROR_SYMBOL String::NewSymbol("onError")
#define ON_OUTPUT_SYMBOL String::NewSymbol("onOutput")
#define ON_EXIT_SYMBOL String::NewSymbol("onExit")
Persistent<FunctionTemplate> Process::constructor_template;
void
@ -20,10 +26,8 @@ Process::Initialize (Handle<Object> target)
constructor_template = Persistent<FunctionTemplate>::New(t);
constructor_template->InstanceTemplate()->SetInternalFieldCount(1);
#if 0
NODE_SET_PROTOTYPE_METHOD(constructor_template, "start", Timer::Start);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", Timer::Stop);
#endif
NODE_SET_PROTOTYPE_METHOD(constructor_template, "write", Process::Write);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "close", Process::Close);
target->Set(String::NewSymbol("Process"), constructor_template->GetFunction());
}
@ -48,13 +52,113 @@ Process::New (const Arguments& args)
return args.This();
}
static void
free_buf (oi_buf *b)
{
V8::AdjustAmountOfExternalAllocatedMemory(-b->len);
free(b);
}
static oi_buf *
new_buf (size_t size)
{
size_t total = sizeof(oi_buf) + size;
void *p = malloc(total);
if (p == NULL) return NULL;
oi_buf *b = static_cast<oi_buf*>(p);
b->base = static_cast<char*>(p) + sizeof(oi_buf);
b->len = size;
b->release = free_buf;
V8::AdjustAmountOfExternalAllocatedMemory(total);
return b;
}
Handle<Value>
Process::Write (const Arguments& args)
{
HandleScope scope;
Process *process = NODE_UNWRAP(Process, args.Holder());
assert(process);
#if 0
if ( connection->ReadyState() != OPEN
&& connection->ReadyState() != WRITE_ONLY
)
return ThrowException(String::New("Socket is not open for writing"));
#endif
// XXX
// A lot of improvement can be made here. First of all we're allocating
// oi_bufs for every send which is clearly inefficent - it should use a
// memory pool or ring buffer. Of course, expressing binary data as an
// array of integers is extremely inefficent. This can improved when v8
// bug 270 (http://code.google.com/p/v8/issues/detail?id=270) has been
// addressed.
oi_buf *buf;
size_t len;
if (args[0]->IsString()) {
enum encoding enc = ParseEncoding(args[1]);
Local<String> s = args[0]->ToString();
len = s->Utf8Length();
buf = new_buf(len);
switch (enc) {
case RAW:
case ASCII:
s->WriteAscii(buf->base, 0, len);
break;
case UTF8:
s->WriteUtf8(buf->base, len);
break;
default:
assert(0 && "unhandled string encoding");
}
} else if (args[0]->IsArray()) {
Handle<Array> array = Handle<Array>::Cast(args[0]);
len = array->Length();
buf = new_buf(len);
for (size_t i = 0; i < len; i++) {
Local<Value> int_value = array->Get(Integer::New(i));
buf->base[i] = int_value->IntegerValue();
}
} else return ThrowException(String::New("Bad argument"));
if (process->Write(buf) != 0) {
return ThrowException(String::New("Pipe already closed"));
}
return Undefined();
}
Handle<Value>
Process::Close (const Arguments& args)
{
HandleScope scope;
Process *process = NODE_UNWRAP(Process, args.Holder());
assert(process);
if (process->Close() != 0) {
return ThrowException(String::New("Pipe already closed."));
}
return Undefined();
}
Process::Process (Handle<Object> handle)
: ObjectWrap(handle)
{
ev_init(&stdout_watcher_, Process::OnOutput);
stdout_watcher_.data = this;
ev_init(&stderr_watcher_, Process::OnError);
ev_init(&stderr_watcher_, Process::OnOutput);
stderr_watcher_.data = this;
ev_init(&stdin_watcher_, Process::OnWritable);
@ -70,7 +174,11 @@ Process::Process (Handle<Object> handle)
stdin_pipe_[0] = -1;
stdin_pipe_[1] = -1;
got_close_ = false;
pid_ = 0;
oi_queue_init(&out_stream_);
}
Process::~Process ()
@ -108,7 +216,7 @@ Process::Shutdown ()
Detach();
}
static int
static inline int
SetNonBlocking (int fd)
{
int flags = fcntl(fd, F_GETFL, 0);
@ -152,6 +260,8 @@ Process::Spawn (const char *command)
return -4;
case 0: // Child.
//printf("child process!\n");
close(stdout_pipe_[0]); // close read end
dup2(stdout_pipe_[1], STDOUT_FILENO);
@ -161,7 +271,10 @@ Process::Spawn (const char *command)
close(stdin_pipe_[1]); // close write end
dup2(stdin_pipe_[0], STDIN_FILENO);
execl("/bin/sh", "-c", command, (char *)NULL);
//printf("child process!\n");
execl("/bin/sh", "sh", "-c", command, (char *)NULL);
//execl(_PATH_BSHELL, "sh", "-c", program, (char *)NULL);
_exit(127);
}
@ -171,23 +284,21 @@ Process::Spawn (const char *command)
ev_child_start(EV_DEFAULT_UC_ &child_watcher_);
SetNonBlocking(stdout_pipe_[0]);
SetNonBlocking(stderr_pipe_[0]);
SetNonBlocking(stdin_pipe_[1]);
ev_io_set(&stdout_watcher_, stdout_pipe_[0], EV_READ);
ev_io_set(&stderr_watcher_, stderr_pipe_[0], EV_READ);
ev_io_set(&stdin_watcher_, stdin_pipe_[1], EV_WRITE);
ev_io_start(EV_DEFAULT_UC_ &stdout_watcher_);
ev_io_start(EV_DEFAULT_UC_ &stderr_watcher_);
ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_);
close(stdout_pipe_[1]); // close write end
close(stderr_pipe_[1]); // close write end
close(stdin_pipe_[0]); // close read end
stdout_pipe_[1] = -1;
SetNonBlocking(stderr_pipe_[0]);
ev_io_set(&stderr_watcher_, stderr_pipe_[0], EV_READ);
ev_io_start(EV_DEFAULT_UC_ &stderr_watcher_);
close(stderr_pipe_[1]); // close write end
stderr_pipe_[1] = -1;
SetNonBlocking(stdin_pipe_[1]);
ev_io_set(&stdin_watcher_, stdin_pipe_[1], EV_WRITE);
ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_);
close(stdin_pipe_[0]); // close read end
stdin_pipe_[0] = -1;
Attach();
@ -198,22 +309,96 @@ Process::Spawn (const char *command)
void
Process::OnOutput (EV_P_ ev_io *watcher, int revents)
{
Process *process = static_cast<Process*>(watcher->data);
assert(revents == EV_READ);
}
int r;
char buf[16*1024];
size_t buf_size = 16*1024;
void
Process::OnError (EV_P_ ev_io *watcher, int revents)
{
Process *process = static_cast<Process*>(watcher->data);
bool is_stdout = (&process->stdout_watcher_ == watcher);
int fd = is_stdout ? process->stdout_pipe_[0] : process->stderr_pipe_[0];
assert(revents == EV_READ);
assert(fd >= 0);
HandleScope scope;
Handle<Value> callback_v =
process->handle_->Get(is_stdout ? ON_OUTPUT_SYMBOL : ON_ERROR_SYMBOL);
Handle<Function> callback;
if (callback_v->IsFunction()) {
callback = Handle<Function>::Cast(callback_v);
}
Handle<Value> argv[1];
for (;;) {
r = read(fd, buf, buf_size);
if (r < 0) {
if (errno != EAGAIN) perror("IPC pipe read error");
break;
}
if (!callback.IsEmpty()) {
if (r == 0) {
argv[0] = Null();
} else {
// TODO multiple encodings
argv[0] = String::New((const char*)buf, r);
}
TryCatch try_catch;
callback->Call(process->handle_, 1, argv);
if (try_catch.HasCaught()) {
FatalException(try_catch);
return;
}
}
if (r == 0) {
ev_io_stop(EV_DEFAULT_UC_ watcher);
break;
}
}
}
void
Process::OnWritable (EV_P_ ev_io *watcher, int revents)
{
Process *process = static_cast<Process*>(watcher->data);
int sent;
assert(revents == EV_WRITE);
assert(process->stdin_pipe_[1] >= 0);
while (!oi_queue_empty(&process->out_stream_)) {
oi_queue *q = oi_queue_last(&process->out_stream_);
oi_buf *to_write = (oi_buf*) oi_queue_data(q, oi_buf, queue);
sent = write( process->stdin_pipe_[1]
, to_write->base + to_write->written
, to_write->len - to_write->written
);
if (sent < 0) {
if (errno == EAGAIN) break;
perror("IPC pipe write error");
break;
}
to_write->written += sent;
if (to_write->written == to_write->len) {
oi_queue_remove(q);
if (to_write->release) to_write->release(to_write);
}
}
if (oi_queue_empty(&process->out_stream_)) {
ev_io_stop(EV_DEFAULT_UC_ &process->stdin_watcher_);
if (process->got_close_) {
close(process->stdin_pipe_[1]);
process->stdin_pipe_[1] = -1;
}
}
}
void
@ -221,5 +406,33 @@ Process::OnCHLD (EV_P_ ev_child *watcher, int revents)
{
ev_child_stop(EV_A_ watcher);
Process *process = static_cast<Process*>(watcher->data);
assert(revents == EV_CHILD);
assert(process->pid_ == watcher->rpid);
assert(&process->child_watcher_ == watcher);
// Call onExit ( watcher->rstatus )
printf("OnCHLD with status %d\n", watcher->rstatus);
}
int
Process::Write (oi_buf *buf)
{
if (stdin_pipe_[1] < 0 || got_close_)
return -1;
oi_queue_insert_head(&out_stream_, &buf->queue);
buf->written = 0;
ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_);
return 0;
}
int
Process::Close ()
{
if (stdin_pipe_[1] < 0 || got_close_)
return -1;
got_close_ = true;
ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_);
return 0;
}

Просмотреть файл

@ -4,24 +4,28 @@
#include "node.h"
#include <v8.h>
#include <ev.h>
#include <oi_socket.h>
namespace node {
class Process : ObjectWrap {
public:
static void Initialize (v8::Handle<v8::Object> target);
virtual size_t size (void) { return sizeof(Process); }
protected:
static v8::Persistent<v8::FunctionTemplate> constructor_template;
static v8::Handle<v8::Value> New (const v8::Arguments& args);
static v8::Handle<v8::Value> Write (const v8::Arguments& args);
static v8::Handle<v8::Value> Close (const v8::Arguments& args);
Process(v8::Handle<v8::Object> handle);
~Process();
void Shutdown ();
int Spawn (const char *command);
int Write (oi_buf *buf);
int Close ();
private:
static void OnOutput (EV_P_ ev_io *watcher, int revents);
@ -39,6 +43,10 @@ class Process : ObjectWrap {
int stdin_pipe_[2];
pid_t pid_;
bool got_close_;
oi_queue out_stream_;
};
} // namespace node