Add support for host process stdio. (#5056) (#5072)

This commit is contained in:
Paul Liétar 2023-03-01 10:49:20 +00:00 коммит произвёл GitHub
Родитель e082da0314
Коммит 8e6a575786
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 317 добавлений и 58 удалений

Просмотреть файл

@ -1,4 +1,4 @@
___ ___
(- *) (o o) | Y & +
( V ) z v z O +---'---'
/--x-m- /--m-m---xXx--/--yy---
(~ ~) (+ +) | Y & +
( V ) z O z O +---'---'
/--x-m- /--m-m---xXx--/--yy------

Просмотреть файл

@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
## [3.0.9]
### Added
- When starting a host subprocess, applications may now pass data to its standard input. Additionally, the process' output is captured and logged by CCF (#5056).
## [3.0.8]
[3.0.8]: https://github.com/microsoft/CCF/releases/tag/ccf-3.0.8

Просмотреть файл

@ -730,16 +730,14 @@ if(BUILD_TESTS)
${CMAKE_SOURCE_DIR}/tests
)
if(LONG_TESTS)
add_e2e_test(
NAME launch_host_process_test
PYTHON_SCRIPT
${CMAKE_SOURCE_DIR}/tests/js-launch-host-process/host_process.py
CONSENSUS cft
ADDITIONAL_ARGS --js-app-bundle
${CMAKE_SOURCE_DIR}/tests/js-launch-host-process
)
endif()
add_e2e_test(
NAME launch_host_process_test
PYTHON_SCRIPT
${CMAKE_SOURCE_DIR}/tests/js-launch-host-process/host_process.py
CONSENSUS cft
ADDITIONAL_ARGS --js-app-bundle
${CMAKE_SOURCE_DIR}/tests/js-launch-host-process
)
add_e2e_test(
NAME governance_test

Просмотреть файл

@ -20,6 +20,7 @@ namespace ccf
}
virtual void trigger_host_process_launch(
const std::vector<std::string>& args) = 0;
const std::vector<std::string>& args,
const std::vector<uint8_t>& input = {}) = 0;
};
}

Просмотреть файл

@ -53,12 +53,12 @@ enum AppMessage : ringbuffer::Message
};
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
AppMessage::launch_host_process, std::string);
AppMessage::launch_host_process, std::string, std::vector<uint8_t>);
struct LaunchHostProcessMessage
struct HostProcessArguments
{
std::vector<std::string> args;
};
DECLARE_JSON_TYPE(LaunchHostProcessMessage);
DECLARE_JSON_REQUIRED_FIELDS(LaunchHostProcessMessage, args);
DECLARE_JSON_TYPE(HostProcessArguments);
DECLARE_JSON_REQUIRED_FIELDS(HostProcessArguments, args);

Просмотреть файл

@ -12,6 +12,180 @@
namespace asynchost
{
struct ProcessPipe : public with_uv_handle<uv_pipe_t>
{
public:
ProcessPipe()
{
uv_handle.data = this;
uv_pipe_init(uv_default_loop(), &uv_handle, 0);
}
virtual ~ProcessPipe() = default;
uv_stream_t* stream()
{
return (uv_stream_t*)&uv_handle;
}
protected:
pid_t pid = 0;
};
/**
* Read the output of a process line by line and print each one to our logs.
*/
class ProcessReader : public ProcessPipe
{
static constexpr size_t max_read_size = 16384;
public:
ProcessReader(std::string name) : name(name) {}
void start(pid_t pid)
{
this->pid = pid;
int rc = uv_read_start((uv_stream_t*)&uv_handle, on_alloc_cb, on_read_cb);
if (rc < 0)
{
LOG_FAIL_FMT("uv_read_start failed: {}", uv_strerror(rc));
close();
}
}
private:
static void on_alloc_cb(
uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
static_cast<ProcessReader*>(handle->data)->on_alloc(suggested_size, buf);
}
static void on_read_cb(
uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf)
{
static_cast<ProcessReader*>(handle->data)->on_read(nread, buf);
}
void on_alloc(size_t suggested_size, uv_buf_t* buf)
{
auto alloc_size = std::min<size_t>(suggested_size, max_read_size);
LOG_TRACE_FMT(
"Allocating {} bytes for reading from host process pid={}",
alloc_size,
pid);
buf->base = new char[alloc_size];
buf->len = alloc_size;
}
void on_read(ssize_t nread, const uv_buf_t* buf)
{
if (nread < 0)
{
LOG_DEBUG_FMT(
"ProcessReader on_read: status={} pid={} file={}",
uv_strerror(nread),
pid,
name);
// Print any trailing text which didn't have a newline
if (!buffer.empty())
{
LOG_INFO_FMT("{} from process {}: {}", name, pid, buffer);
}
close();
}
else if (nread > 0)
{
buffer.insert(buffer.end(), buf->base, buf->base + nread);
LOG_DEBUG_FMT(
"Read {} bytes from host process, total={} file={}",
nread,
buffer.size(),
name);
print_lines();
}
on_free(buf);
}
void on_free(const uv_buf_t* buf)
{
delete[] buf->base;
}
/**
* Take each line out of the buffer and print it to the logs.
*/
void print_lines()
{
auto start = buffer.begin();
while (true)
{
auto newline = std::find(start, buffer.end(), '\n');
if (newline == buffer.end())
{
break;
}
size_t count = newline - start;
std::string_view line(&*start, count);
LOG_INFO_FMT("{} from process {}: {}", name, pid, line);
// Move past the newline character so we can look for the next one.
start = newline + 1;
}
buffer.erase(buffer.begin(), start);
}
std::string name;
std::string buffer;
};
/**
* Write a byte buffer to a process' standard input.
*/
class ProcessWriter : public ProcessPipe
{
public:
ProcessWriter(std::vector<uint8_t>&& data) : buffer(std::move(data))
{
request.data = this;
}
void start(pid_t pid)
{
this->pid = pid;
LOG_DEBUG_FMT(
"Writing {} bytes to host process pid={}", buffer.size(), pid);
uv_buf_t buf = {(char*)buffer.data(), buffer.size()};
int rc =
uv_write(&request, (uv_stream_t*)&uv_handle, &buf, 1, on_write_done_cb);
if (rc < 0)
{
LOG_FAIL_FMT("uv_write failed: {}", uv_strerror(rc));
close();
}
}
private:
static void on_write_done_cb(uv_write_t* req, int status)
{
static_cast<ProcessWriter*>(req->data)->on_write_done(req, status);
}
void on_write_done(uv_write_t* req, int status)
{
LOG_DEBUG_FMT(
"Write to host process completed: status={} pid={}", status, pid);
close();
}
uv_write_t request;
std::vector<uint8_t> buffer;
};
class ProcessLauncher
{
static constexpr size_t max_processes = 8;
@ -20,7 +194,8 @@ namespace asynchost
struct QueueEntry
{
LaunchHostProcessMessage msg;
std::vector<std::string> args;
std::vector<uint8_t> input;
std::chrono::steady_clock::time_point queued_at;
};
@ -28,7 +203,7 @@ namespace asynchost
struct ProcessEntry
{
LaunchHostProcessMessage msg;
std::vector<std::string> args;
std::chrono::steady_clock::time_point started_at;
};
@ -53,8 +228,7 @@ namespace asynchost
now - entry.queued_at)
.count();
auto& msg = entry.msg;
auto& args = msg.args;
const auto& args = entry.args;
std::vector<const char*> argv;
for (size_t i = 0; i < args.size(); i++)
@ -63,30 +237,49 @@ namespace asynchost
}
argv.push_back(nullptr);
close_ptr<ProcessReader> stdout_reader("stdout");
close_ptr<ProcessReader> stderr_reader("stderr");
close_ptr<ProcessWriter> stdin_writer(std::move(entry.input));
auto handle = new uv_process_t;
handle->data = this;
uv_stdio_container_t stdio[3];
stdio[0].flags = (uv_stdio_flags)(UV_CREATE_PIPE | UV_READABLE_PIPE);
stdio[0].data.stream = stdin_writer->stream();
stdio[1].flags = (uv_stdio_flags)(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
stdio[1].data.stream = stdout_reader->stream();
stdio[2].flags = (uv_stdio_flags)(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
stdio[2].data.stream = stderr_reader->stream();
uv_process_options_t options = {};
options.file = argv.at(0);
options.args = const_cast<char**>(argv.data());
options.exit_cb = ProcessLauncher::on_process_exit;
options.stdio = stdio;
options.stdio_count = 3;
auto rc = uv_spawn(uv_default_loop(), handle, &options);
if (rc != 0)
{
LOG_FAIL_FMT("Error starting host process: {}", uv_strerror(rc));
return;
}
LOG_DEBUG_FMT(
LOG_INFO_FMT(
"Launching host process: pid={} queuetime={}ms cmd={}",
handle->pid,
queue_time_ms,
fmt::join(args, " "));
stdin_writer.release()->start(handle->pid);
stdout_reader.release()->start(handle->pid);
stderr_reader.release()->start(handle->pid);
auto started_at = std::chrono::steady_clock::now();
ProcessEntry process_entry{std::move(entry.msg), started_at};
ProcessEntry process_entry{std::move(entry.args), started_at};
running.insert({handle->pid, std::move(process_entry)});
}
@ -106,12 +299,24 @@ namespace asynchost
t_end - process.started_at)
.count();
LOG_DEBUG_FMT(
"Host process exited: pid={} status={} runtime={}ms cmd={}",
handle->pid,
exit_status,
runtime_ms,
fmt::join(process.msg.args, " "));
if (exit_status == 0)
{
LOG_INFO_FMT(
"Host process exited: pid={} status={} runtime={}ms cmd={}",
handle->pid,
exit_status,
runtime_ms,
fmt::join(process.args, " "));
}
else
{
LOG_FAIL_FMT(
"Host process exited: pid={} status={} runtime={}ms cmd={}",
handle->pid,
exit_status,
runtime_ms,
fmt::join(process.args, " "));
}
running.erase(handle->pid);
@ -133,15 +338,19 @@ namespace asynchost
disp,
AppMessage::launch_host_process,
[this](const uint8_t* data, size_t size) {
auto [json] =
auto [json, input] =
ringbuffer::read_message<AppMessage::launch_host_process>(
data, size);
auto obj = nlohmann::json::parse(json);
auto msg = obj.get<LaunchHostProcessMessage>();
auto msg = obj.get<HostProcessArguments>();
auto queued_at = std::chrono::steady_clock::now();
QueueEntry entry{msg, queued_at};
QueueEntry entry{
std::move(msg.args),
std::move(input),
queued_at,
};
LOG_DEBUG_FMT("Queueing host process launch: {}", json);

Просмотреть файл

@ -28,7 +28,20 @@ namespace asynchost
~close_ptr()
{
raw->close();
if (raw != nullptr)
{
raw->close();
}
}
T* operator->()
{
return raw;
}
T* release()
{
return std::exchange(raw, nullptr);
}
};
@ -74,7 +87,7 @@ namespace asynchost
virtual ~with_uv_handle() = default;
private:
protected:
template <typename T>
friend class close_ptr;
@ -86,6 +99,7 @@ namespace asynchost
}
}
private:
static void on_close(uv_handle_t* handle)
{
static_cast<with_uv_handle<handle_type>*>(handle->data)->on_close();

Просмотреть файл

@ -1299,23 +1299,36 @@ namespace ccf::js
{
js::Context& jsctx = *(js::Context*)JS_GetContextOpaque(ctx);
if (argc != 1)
if (argc != 1 && argc != 2)
{
return JS_ThrowTypeError(ctx, "Passed %d arguments but expected 1", argc);
return JS_ThrowTypeError(
ctx, "Passed %d arguments but expected 1 or 2", argc);
}
std::vector<std::string> process_args;
JSValue r = get_string_array(ctx, argv[0], process_args);
std::vector<uint8_t> process_input;
JSValue r = get_string_array(ctx, argv[0], process_args);
if (!JS_IsUndefined(r))
{
return r;
}
if (argc == 2)
{
size_t size;
uint8_t* buf = JS_GetArrayBuffer(ctx, &size, argv[1]);
if (!buf)
{
return JS_ThrowTypeError(ctx, "Argument must be an ArrayBuffer");
}
process_input.assign(buf, buf + size);
}
auto host_processes = static_cast<ccf::AbstractHostProcesses*>(
JS_GetOpaque(this_val, host_class_id));
host_processes->trigger_host_process_launch(process_args);
host_processes->trigger_host_process_launch(process_args, process_input);
return JS_UNDEFINED;
}
@ -2383,4 +2396,4 @@ extern "C"
}
return 0;
}
}
}

Просмотреть файл

@ -1368,13 +1368,16 @@ namespace ccf
}
void trigger_host_process_launch(
const std::vector<std::string>& args) override
const std::vector<std::string>& args,
const std::vector<uint8_t>& input) override
{
LaunchHostProcessMessage msg{args};
HostProcessArguments msg{args};
nlohmann::json j = msg;
auto json = j.dump();
LOG_DEBUG_FMT("Triggering host process launch: {}", json);
RINGBUFFER_WRITE_MESSAGE(AppMessage::launch_host_process, to_host, json);
LOG_DEBUG_FMT(
"Triggering host process launch: {} size={}", json, input.size());
RINGBUFFER_WRITE_MESSAGE(
AppMessage::launch_host_process, to_host, json, input);
}
void transition_service_to_open(
@ -2604,4 +2607,4 @@ namespace ccf
client->send_request(std::move(req));
}
};
}
}

Просмотреть файл

@ -15,9 +15,10 @@ namespace ccf
HostProcesses(AbstractNodeState& impl_) : impl(impl_) {}
void trigger_host_process_launch(
const std::vector<std::string>& args) override
const std::vector<std::string>& args,
const std::vector<uint8_t>& input) override
{
impl.trigger_host_process_launch(args);
impl.trigger_host_process_launch(args, input);
}
};
}

Просмотреть файл

@ -29,7 +29,8 @@ namespace ccf
virtual void trigger_ledger_chunk(kv::Tx& tx) = 0;
virtual void trigger_snapshot(kv::Tx& tx) = 0;
virtual void trigger_host_process_launch(
const std::vector<std::string>& args) = 0;
const std::vector<std::string>& args,
const std::vector<uint8_t>& input) = 0;
virtual void trigger_acme_refresh(
kv::Tx& tx,
const std::optional<std::vector<std::string>>& interfaces =
@ -67,4 +68,4 @@ namespace ccf
callback,
const std::vector<std::string>& ca_certs = {}) = 0;
};
}
}

Просмотреть файл

@ -133,7 +133,8 @@ namespace ccf
{
public:
void trigger_host_process_launch(
const std::vector<std::string>& args) override
const std::vector<std::string>& args,
const std::vector<uint8_t>& input) override
{
return;
}
@ -270,4 +271,4 @@ namespace ccf
return kv::test::PrimaryNodeId;
}
};
}
}

Просмотреть файл

@ -19,11 +19,18 @@ def test_host_process_launch(network, args):
with tempfile.TemporaryDirectory() as tmp_dir:
script_path = os.path.join(os.path.dirname(__file__), "host_process.sh")
out_path = os.path.join(tmp_dir, "test.out")
expected_content = "Hello world!"
args = [script_path, expected_content, out_path]
first = "Hello world!\n"
second = "Goodbye"
expected_content = first + second
body = {
"args": [script_path, first, out_path],
"input": second,
}
with primary.client("user0") as c:
r = c.post("/app/launch", body={"args": args})
r = c.post("/app/launch", body=body)
assert r.status_code == http.HTTPStatus.OK, r.status_code
timeout = 1

Просмотреть файл

@ -2,4 +2,8 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
echo -n "$1" > "$2"
echo "Writing to stdout"
echo >&2 "Writing to stderr"
STDIN=$(cat)
echo -n "$1$STDIN" > "$2"

Просмотреть файл

@ -1,6 +1,7 @@
export function launch(request) {
const args = request.body.json()["args"];
ccf.host.triggerSubprocess(args);
const body = request.body.json();
ccf.host.triggerSubprocess(body.args, ccf.strToBuf(body.input));
return {};
}