inspector: split main thread interface from transport

Workers debugging will require interfacing between the "main" inspector
and per-worker isolate inspectors. This is consistent with what WS
interface does. This change is a refactoring change and does not change
the functionality.

PR-URL: https://github.com/nodejs/node/pull/21182
Fixes: https://github.com/nodejs/node/issues/21725
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Eugene Ostroukhov 2018-05-21 16:59:04 -07:00 коммит произвёл Michaël Zasso
Родитель 42d75392c5
Коммит b2291296ef
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 770F7A9A5AE15600
14 изменённых файлов: 888 добавлений и 632 удалений

Просмотреть файл

@ -490,12 +490,14 @@
'src/inspector_js_api.cc',
'src/inspector_socket.cc',
'src/inspector_socket_server.cc',
'src/inspector/tracing_agent.cc',
'src/inspector/main_thread_interface.cc',
'src/inspector/node_string.cc',
'src/inspector/tracing_agent.cc',
'src/inspector_agent.h',
'src/inspector_io.h',
'src/inspector_socket.h',
'src/inspector_socket_server.h',
'src/inspector/main_thread_interface.h',
'src/inspector/node_string.h',
'src/inspector/tracing_agent.h',
'<@(node_inspector_generated_sources)'

Просмотреть файл

@ -0,0 +1,317 @@
#include "main_thread_interface.h"
#include "node_mutex.h"
#include "v8-inspector.h"
#include <unicode/unistr.h>
namespace node {
namespace inspector {
namespace {
using v8_inspector::StringView;
using v8_inspector::StringBuffer;
template <typename T>
class DeleteRequest : public Request {
public:
explicit DeleteRequest(T* object) : object_(object) {}
void Call() override {
delete object_;
}
private:
T* object_;
};
template <typename Target, typename Arg>
class SingleArgumentFunctionCall : public Request {
public:
using Fn = void (Target::*)(Arg);
SingleArgumentFunctionCall(Target* target, Fn fn, Arg argument)
: target_(target),
fn_(fn),
arg_(std::move(argument)) {}
void Call() override {
Apply(target_, fn_, std::move(arg_));
}
private:
template <typename Element>
void Apply(Element* target, Fn fn, Arg arg) {
(target->*fn)(std::move(arg));
}
Target* target_;
Fn fn_;
Arg arg_;
};
class PostMessageRequest : public Request {
public:
PostMessageRequest(InspectorSessionDelegate* delegate,
StringView message)
: delegate_(delegate),
message_(StringBuffer::create(message)) {}
void Call() override {
delegate_->SendMessageToFrontend(message_->string());
}
private:
InspectorSessionDelegate* delegate_;
std::unique_ptr<StringBuffer> message_;
};
class DispatchMessagesTask : public v8::Task {
public:
explicit DispatchMessagesTask(MainThreadInterface* thread)
: thread_(thread) {}
void Run() override {
thread_->DispatchMessages();
}
private:
MainThreadInterface* thread_;
};
void DisposePairCallback(uv_handle_t* ref) {
using AsyncAndInterface = std::pair<uv_async_t, MainThreadInterface*>;
AsyncAndInterface* pair = node::ContainerOf(
&AsyncAndInterface::first, reinterpret_cast<uv_async_t*>(ref));
delete pair;
}
template <typename T>
class AnotherThreadObjectReference {
public:
// We create it on whatever thread, just make sure it gets disposed on the
// proper thread.
AnotherThreadObjectReference(std::shared_ptr<MainThreadHandle> thread,
T* object)
: thread_(thread), object_(object) {
}
AnotherThreadObjectReference(AnotherThreadObjectReference&) = delete;
~AnotherThreadObjectReference() {
// Disappearing thread may cause a memory leak
CHECK(thread_->Post(
std::unique_ptr<DeleteRequest<T>>(new DeleteRequest<T>(object_))));
object_ = nullptr;
}
template <typename Fn, typename Arg>
void Post(Fn fn, Arg argument) const {
using R = SingleArgumentFunctionCall<T, Arg>;
thread_->Post(std::unique_ptr<R>(new R(object_, fn, std::move(argument))));
}
T* get() const {
return object_;
}
private:
std::shared_ptr<MainThreadHandle> thread_;
T* object_;
};
class MainThreadSessionState {
public:
MainThreadSessionState(
std::shared_ptr<MainThreadHandle> thread,
bool prevent_shutdown) : thread_(thread),
prevent_shutdown_(prevent_shutdown) {}
void Connect(std::unique_ptr<InspectorSessionDelegate> delegate) {
Agent* agent = thread_->GetInspectorAgent();
if (agent != nullptr)
session_ = agent->Connect(std::move(delegate), prevent_shutdown_);
}
void Dispatch(std::unique_ptr<StringBuffer> message) {
session_->Dispatch(message->string());
}
private:
std::shared_ptr<MainThreadHandle> thread_;
bool prevent_shutdown_;
std::unique_ptr<InspectorSession> session_;
};
class CrossThreadInspectorSession : public InspectorSession {
public:
CrossThreadInspectorSession(
int id,
std::shared_ptr<MainThreadHandle> thread,
std::unique_ptr<InspectorSessionDelegate> delegate,
bool prevent_shutdown)
: state_(thread, new MainThreadSessionState(thread, prevent_shutdown)) {
state_.Post(&MainThreadSessionState::Connect, std::move(delegate));
}
void Dispatch(const StringView& message) override {
state_.Post(&MainThreadSessionState::Dispatch,
StringBuffer::create(message));
}
private:
AnotherThreadObjectReference<MainThreadSessionState> state_;
};
class ThreadSafeDelegate : public InspectorSessionDelegate {
public:
ThreadSafeDelegate(std::shared_ptr<MainThreadHandle> thread,
std::unique_ptr<InspectorSessionDelegate> delegate)
: thread_(thread), delegate_(thread, delegate.release()) {}
void SendMessageToFrontend(const v8_inspector::StringView& message) override {
thread_->Post(std::unique_ptr<Request>(
new PostMessageRequest(delegate_.get(), message)));
}
private:
std::shared_ptr<MainThreadHandle> thread_;
AnotherThreadObjectReference<InspectorSessionDelegate> delegate_;
};
} // namespace
MainThreadInterface::MainThreadInterface(Agent* agent, uv_loop_t* loop,
v8::Isolate* isolate,
v8::Platform* platform)
: agent_(agent), isolate_(isolate),
platform_(platform) {
main_thread_request_.reset(new AsyncAndInterface(uv_async_t(), this));
CHECK_EQ(0, uv_async_init(loop, &main_thread_request_->first,
DispatchMessagesAsyncCallback));
// Inspector uv_async_t should not prevent main loop shutdown.
uv_unref(reinterpret_cast<uv_handle_t*>(&main_thread_request_->first));
}
MainThreadInterface::~MainThreadInterface() {
if (handle_)
handle_->Reset();
}
// static
void MainThreadInterface::DispatchMessagesAsyncCallback(uv_async_t* async) {
AsyncAndInterface* asyncAndInterface =
node::ContainerOf(&AsyncAndInterface::first, async);
asyncAndInterface->second->DispatchMessages();
}
// static
void MainThreadInterface::CloseAsync(AsyncAndInterface* pair) {
uv_close(reinterpret_cast<uv_handle_t*>(&pair->first), DisposePairCallback);
}
void MainThreadInterface::Post(std::unique_ptr<Request> request) {
Mutex::ScopedLock scoped_lock(requests_lock_);
bool needs_notify = requests_.empty();
requests_.push_back(std::move(request));
if (needs_notify) {
CHECK_EQ(0, uv_async_send(&main_thread_request_->first));
if (isolate_ != nullptr && platform_ != nullptr) {
platform_->CallOnForegroundThread(isolate_,
new DispatchMessagesTask(this));
isolate_->RequestInterrupt([](v8::Isolate* isolate, void* thread) {
static_cast<MainThreadInterface*>(thread)->DispatchMessages();
}, this);
}
}
incoming_message_cond_.Broadcast(scoped_lock);
}
bool MainThreadInterface::WaitForFrontendEvent() {
// We allow DispatchMessages reentry as we enter the pause. This is important
// to support debugging the code invoked by an inspector call, such
// as Runtime.evaluate
dispatching_messages_ = false;
if (dispatching_message_queue_.empty()) {
Mutex::ScopedLock scoped_lock(requests_lock_);
while (requests_.empty()) incoming_message_cond_.Wait(scoped_lock);
}
return true;
}
void MainThreadInterface::DispatchMessages() {
if (dispatching_messages_)
return;
dispatching_messages_ = true;
bool had_messages = false;
do {
if (dispatching_message_queue_.empty()) {
Mutex::ScopedLock scoped_lock(requests_lock_);
requests_.swap(dispatching_message_queue_);
}
had_messages = !dispatching_message_queue_.empty();
while (!dispatching_message_queue_.empty()) {
MessageQueue::value_type task;
std::swap(dispatching_message_queue_.front(), task);
dispatching_message_queue_.pop_front();
task->Call();
}
} while (had_messages);
dispatching_messages_ = false;
}
std::shared_ptr<MainThreadHandle> MainThreadInterface::GetHandle() {
if (handle_ == nullptr)
handle_ = std::make_shared<MainThreadHandle>(this);
return handle_;
}
std::unique_ptr<StringBuffer> Utf8ToStringView(const std::string& message) {
icu::UnicodeString utf16 = icu::UnicodeString::fromUTF8(
icu::StringPiece(message.data(), message.length()));
StringView view(reinterpret_cast<const uint16_t*>(utf16.getBuffer()),
utf16.length());
return StringBuffer::create(view);
}
std::unique_ptr<InspectorSession> MainThreadHandle::Connect(
std::unique_ptr<InspectorSessionDelegate> delegate,
bool prevent_shutdown) {
return std::unique_ptr<InspectorSession>(
new CrossThreadInspectorSession(++next_session_id_,
shared_from_this(),
std::move(delegate),
prevent_shutdown));
}
bool MainThreadHandle::Post(std::unique_ptr<Request> request) {
Mutex::ScopedLock scoped_lock(block_lock_);
if (!main_thread_)
return false;
main_thread_->Post(std::move(request));
return true;
}
void MainThreadHandle::Reset() {
Mutex::ScopedLock scoped_lock(block_lock_);
main_thread_ = nullptr;
}
Agent* MainThreadHandle::GetInspectorAgent() {
Mutex::ScopedLock scoped_lock(block_lock_);
if (main_thread_ == nullptr)
return nullptr;
return main_thread_->inspector_agent();
}
std::unique_ptr<InspectorSessionDelegate>
MainThreadHandle::MakeThreadSafeDelegate(
std::unique_ptr<InspectorSessionDelegate> delegate) {
return std::unique_ptr<InspectorSessionDelegate>(
new ThreadSafeDelegate(shared_from_this(), std::move(delegate)));
}
bool MainThreadHandle::Expired() {
Mutex::ScopedLock scoped_lock(block_lock_);
return main_thread_ == nullptr;
}
} // namespace inspector
} // namespace node

Просмотреть файл

@ -0,0 +1,99 @@
#ifndef SRC_INSPECTOR_MAIN_THREAD_INTERFACE_H_
#define SRC_INSPECTOR_MAIN_THREAD_INTERFACE_H_
#if !HAVE_INSPECTOR
#error("This header can only be used when inspector is enabled")
#endif
#include "env.h"
#include "inspector_agent.h"
#include "node_mutex.h"
#include <deque>
#include <memory>
#include <unordered_map>
#include <unordered_set>
namespace v8_inspector {
class StringBuffer;
class StringView;
} // namespace v8_inspector
namespace node {
namespace inspector {
class Request {
public:
virtual void Call() = 0;
virtual ~Request() {}
};
std::unique_ptr<v8_inspector::StringBuffer> Utf8ToStringView(
const std::string& message);
using MessageQueue = std::deque<std::unique_ptr<Request>>;
class MainThreadInterface;
class MainThreadHandle : public std::enable_shared_from_this<MainThreadHandle> {
public:
explicit MainThreadHandle(MainThreadInterface* main_thread)
: main_thread_(main_thread) {}
~MainThreadHandle() {
CHECK_NULL(main_thread_); // main_thread_ should have called Reset
}
std::unique_ptr<InspectorSession> Connect(
std::unique_ptr<InspectorSessionDelegate> delegate,
bool prevent_shutdown);
bool Post(std::unique_ptr<Request> request);
Agent* GetInspectorAgent();
std::unique_ptr<InspectorSessionDelegate> MakeThreadSafeDelegate(
std::unique_ptr<InspectorSessionDelegate> delegate);
bool Expired();
private:
void Reset();
MainThreadInterface* main_thread_;
Mutex block_lock_;
int next_session_id_ = 0;
friend class MainThreadInterface;
};
class MainThreadInterface {
public:
MainThreadInterface(Agent* agent, uv_loop_t*, v8::Isolate* isolate,
v8::Platform* platform);
~MainThreadInterface();
void DispatchMessages();
void Post(std::unique_ptr<Request> request);
bool WaitForFrontendEvent();
std::shared_ptr<MainThreadHandle> GetHandle();
Agent* inspector_agent() {
return agent_;
}
private:
using AsyncAndInterface = std::pair<uv_async_t, MainThreadInterface*>;
static void DispatchMessagesAsyncCallback(uv_async_t* async);
static void CloseAsync(AsyncAndInterface*);
MessageQueue requests_;
Mutex requests_lock_; // requests_ live across threads
// This queue is to maintain the order of the messages for the cases
// when we reenter the DispatchMessages function.
MessageQueue dispatching_message_queue_;
bool dispatching_messages_ = false;
ConditionVariable incoming_message_cond_;
// Used from any thread
Agent* const agent_;
v8::Isolate* const isolate_;
v8::Platform* const platform_;
DeleteFnPtr<AsyncAndInterface, CloseAsync> main_thread_request_;
std::shared_ptr<MainThreadHandle> handle_;
};
} // namespace inspector
} // namespace node
#endif // SRC_INSPECTOR_MAIN_THREAD_INTERFACE_H_

Просмотреть файл

@ -1,6 +1,7 @@
#include "inspector_agent.h"
#include "inspector_io.h"
#include "inspector/main_thread_interface.h"
#include "inspector/node_string.h"
#include "inspector/tracing_agent.h"
#include "node/inspector/protocol/Protocol.h"
@ -49,7 +50,7 @@ class StartIoTask : public v8::Task {
explicit StartIoTask(Agent* agent) : agent(agent) {}
void Run() override {
agent->StartIoThread(false);
agent->StartIoThread();
}
private:
@ -64,11 +65,11 @@ std::unique_ptr<StringBuffer> ToProtocolString(Isolate* isolate,
// Called on the main thread.
void StartIoThreadAsyncCallback(uv_async_t* handle) {
static_cast<Agent*>(handle->data)->StartIoThread(false);
static_cast<Agent*>(handle->data)->StartIoThread();
}
void StartIoInterrupt(Isolate* isolate, void* agent) {
static_cast<Agent*>(agent)->StartIoThread(false);
static_cast<Agent*>(agent)->StartIoThread();
}
@ -195,8 +196,10 @@ class ChannelImpl final : public v8_inspector::V8Inspector::Channel,
public:
explicit ChannelImpl(Environment* env,
const std::unique_ptr<V8Inspector>& inspector,
std::unique_ptr<InspectorSessionDelegate> delegate)
: delegate_(std::move(delegate)) {
std::unique_ptr<InspectorSessionDelegate> delegate,
bool prevent_shutdown)
: delegate_(std::move(delegate)),
prevent_shutdown_(prevent_shutdown) {
session_ = inspector->connect(1, this, StringView());
node_dispatcher_.reset(new protocol::UberDispatcher(this));
tracing_agent_.reset(new protocol::TracingAgent(env));
@ -208,7 +211,7 @@ class ChannelImpl final : public v8_inspector::V8Inspector::Channel,
tracing_agent_.reset(); // Dispose before the dispatchers
}
void dispatchProtocolMessage(const StringView& message) {
std::string dispatchProtocolMessage(const StringView& message) {
std::unique_ptr<protocol::DictionaryValue> parsed;
std::string method;
node_dispatcher_->getCommandName(
@ -219,6 +222,7 @@ class ChannelImpl final : public v8_inspector::V8Inspector::Channel,
} else {
node_dispatcher_->dispatch(std::move(parsed));
}
return method;
}
void schedulePauseOnNextStatement(const std::string& reason) {
@ -226,6 +230,10 @@ class ChannelImpl final : public v8_inspector::V8Inspector::Channel,
session_->schedulePauseOnNextStatement(buffer->string(), buffer->string());
}
bool preventShutdown() {
return prevent_shutdown_;
}
private:
void sendResponse(
int callId,
@ -263,6 +271,7 @@ class ChannelImpl final : public v8_inspector::V8Inspector::Channel,
std::unique_ptr<InspectorSessionDelegate> delegate_;
std::unique_ptr<v8_inspector::V8InspectorSession> session_;
std::unique_ptr<protocol::UberDispatcher> node_dispatcher_;
bool prevent_shutdown_;
};
class InspectorTimer {
@ -324,6 +333,44 @@ class InspectorTimerHandle {
private:
InspectorTimer* timer_;
};
class SameThreadInspectorSession : public InspectorSession {
public:
SameThreadInspectorSession(
int session_id, std::shared_ptr<NodeInspectorClient> client)
: session_id_(session_id), client_(client) {}
~SameThreadInspectorSession() override;
void Dispatch(const v8_inspector::StringView& message) override;
private:
int session_id_;
std::weak_ptr<NodeInspectorClient> client_;
};
void NotifyClusterWorkersDebugEnabled(Environment* env) {
v8::Isolate* isolate = env->isolate();
HandleScope handle_scope(isolate);
auto context = env->context();
// Send message to enable debug in cluster workers
Local<Object> process_object = env->process_object();
Local<Value> emit_fn =
process_object->Get(context, FIXED_ONE_BYTE_STRING(isolate, "emit"))
.ToLocalChecked();
// In case the thread started early during the startup
if (!emit_fn->IsFunction())
return;
Local<Object> message = Object::New(isolate);
message->Set(context, FIXED_ONE_BYTE_STRING(isolate, "cmd"),
FIXED_ONE_BYTE_STRING(isolate, "NODE_DEBUG_ENABLED")).FromJust();
Local<Value> argv[] = {
FIXED_ONE_BYTE_STRING(isolate, "internalMessage"),
message
};
MakeCallback(env->isolate(), process_object, emit_fn.As<Function>(),
arraysize(argv), argv, {0, 0});
}
} // namespace
class NodeInspectorClient : public V8InspectorClient {
@ -337,31 +384,18 @@ class NodeInspectorClient : public V8InspectorClient {
}
void runMessageLoopOnPause(int context_group_id) override {
runMessageLoop(false);
waiting_for_resume_ = true;
runMessageLoop();
}
void runMessageLoop(bool ignore_terminated) {
if (running_nested_loop_)
return;
terminated_ = false;
running_nested_loop_ = true;
MultiIsolatePlatform* platform = env_->isolate_data()->platform();
while ((ignore_terminated || !terminated_) && waitForFrontendEvent()) {
while (platform->FlushForegroundTasks(env_->isolate())) {}
}
terminated_ = false;
running_nested_loop_ = false;
void waitForIoShutdown() {
waiting_for_io_shutdown_ = true;
runMessageLoop();
}
bool waitForFrontendEvent() {
InspectorIo* io = env_->inspector_agent()->io();
if (io == nullptr)
return false;
return io->WaitForFrontendEvent();
}
double currentTimeMS() override {
return uv_hrtime() * 1.0 / NANOS_PER_MSEC;
void waitForFrontend() {
waiting_for_frontend_ = true;
runMessageLoop();
}
void maxAsyncCallStackDepthChanged(int depth) override {
@ -398,16 +432,17 @@ class NodeInspectorClient : public V8InspectorClient {
}
void quitMessageLoopOnPause() override {
terminated_ = true;
waiting_for_resume_ = false;
}
int connectFrontend(std::unique_ptr<InspectorSessionDelegate> delegate) {
int connectFrontend(std::unique_ptr<InspectorSessionDelegate> delegate,
bool prevent_shutdown) {
events_dispatched_ = true;
int session_id = next_session_id_++;
// TODO(addaleax): Revert back to using make_unique once we get issues
// with CI resolved (i.e. revert the patch that added this comment).
channels_[session_id].reset(
new ChannelImpl(env_, client_, std::move(delegate)));
new ChannelImpl(env_, client_, std::move(delegate), prevent_shutdown));
return session_id;
}
@ -418,7 +453,10 @@ class NodeInspectorClient : public V8InspectorClient {
void dispatchMessageFromFrontend(int session_id, const StringView& message) {
events_dispatched_ = true;
channels_[session_id]->dispatchProtocolMessage(message);
std::string method =
channels_[session_id]->dispatchProtocolMessage(message);
if (waiting_for_frontend_)
waiting_for_frontend_ = method != "Runtime.runIfWaitingForDebugger";
}
Local<Context> ensureDefaultContextInGroup(int contextGroupId) override {
@ -509,116 +547,150 @@ class NodeInspectorClient : public V8InspectorClient {
}
bool hasConnectedSessions() {
for (const auto& id_channel : channels_) {
// Other sessions are "invisible" more most purposes
if (id_channel.second->preventShutdown())
return true;
}
return false;
}
std::shared_ptr<MainThreadHandle> getThreadHandle() {
if (interface_ == nullptr) {
interface_.reset(new MainThreadInterface(
env_->inspector_agent(), env_->event_loop(), env_->isolate(),
env_->isolate_data()->platform()));
}
return interface_->GetHandle();
}
bool IsActive() {
return !channels_.empty();
}
private:
bool shouldRunMessageLoop() {
if (waiting_for_frontend_)
return true;
if (waiting_for_io_shutdown_ || waiting_for_resume_)
return hasConnectedSessions();
return false;
}
void runMessageLoop() {
if (running_nested_loop_)
return;
running_nested_loop_ = true;
MultiIsolatePlatform* platform = env_->isolate_data()->platform();
while (shouldRunMessageLoop()) {
if (interface_ && hasConnectedSessions())
interface_->WaitForFrontendEvent();
while (platform->FlushForegroundTasks(env_->isolate())) {}
}
running_nested_loop_ = false;
}
double currentTimeMS() override {
return uv_hrtime() * 1.0 / NANOS_PER_MSEC;
}
node::Environment* env_;
bool terminated_ = false;
bool running_nested_loop_ = false;
std::unique_ptr<V8Inspector> client_;
std::unordered_map<int, std::unique_ptr<ChannelImpl>> channels_;
std::unordered_map<void*, InspectorTimerHandle> timers_;
int next_session_id_ = 1;
bool events_dispatched_ = false;
bool waiting_for_resume_ = false;
bool waiting_for_frontend_ = false;
bool waiting_for_io_shutdown_ = false;
// Allows accessing Inspector from non-main threads
std::unique_ptr<MainThreadInterface> interface_;
};
Agent::Agent(Environment* env) : parent_env_(env) {}
// Destructor needs to be defined here in implementation file as the header
// does not have full definition of some classes.
Agent::~Agent() {
}
Agent::~Agent() = default;
bool Agent::Start(const char* path, const DebugOptions& options) {
path_ = path == nullptr ? "" : path;
bool Agent::Start(const std::string& path, const DebugOptions& options) {
path_ = path;
debug_options_ = options;
client_ = std::make_shared<NodeInspectorClient>(parent_env_);
CHECK_EQ(0, uv_async_init(uv_default_loop(),
&start_io_thread_async,
StartIoThreadAsyncCallback));
start_io_thread_async.data = this;
uv_unref(reinterpret_cast<uv_handle_t*>(&start_io_thread_async));
if (parent_env_->is_main_thread()) {
CHECK_EQ(0, uv_async_init(parent_env_->event_loop(),
&start_io_thread_async,
StartIoThreadAsyncCallback));
uv_unref(reinterpret_cast<uv_handle_t*>(&start_io_thread_async));
start_io_thread_async.data = this;
// Ignore failure, SIGUSR1 won't work, but that should not block node start.
StartDebugSignalHandler();
}
// Ignore failure, SIGUSR1 won't work, but that should not block node start.
StartDebugSignalHandler();
if (options.inspector_enabled()) {
// This will return false if listen failed on the inspector port.
return StartIoThread(options.wait_for_connect());
bool wait_for_connect = options.wait_for_connect();
if (!options.inspector_enabled() || !StartIoThread()) {
return false;
}
if (wait_for_connect) {
HandleScope scope(parent_env_->isolate());
parent_env_->process_object()->DefineOwnProperty(
parent_env_->context(),
FIXED_ONE_BYTE_STRING(parent_env_->isolate(), "_breakFirstLine"),
True(parent_env_->isolate()),
static_cast<v8::PropertyAttribute>(v8::ReadOnly | v8::DontEnum))
.FromJust();
client_->waitForFrontend();
}
return true;
}
bool Agent::StartIoThread(bool wait_for_connect) {
bool Agent::StartIoThread() {
if (io_ != nullptr)
return true;
CHECK_NOT_NULL(client_);
io_ = std::unique_ptr<InspectorIo>(
new InspectorIo(parent_env_, path_, debug_options_, wait_for_connect));
if (!io_->Start()) {
client_.reset();
io_ = InspectorIo::Start(
client_->getThreadHandle(), path_, debug_options_);
if (io_ == nullptr) {
return false;
}
v8::Isolate* isolate = parent_env_->isolate();
HandleScope handle_scope(isolate);
auto context = parent_env_->context();
// Send message to enable debug in workers
Local<Object> process_object = parent_env_->process_object();
Local<Value> emit_fn =
process_object->Get(context, FIXED_ONE_BYTE_STRING(isolate, "emit"))
.ToLocalChecked();
// In case the thread started early during the startup
if (!emit_fn->IsFunction())
return true;
Local<Object> message = Object::New(isolate);
message->Set(context, FIXED_ONE_BYTE_STRING(isolate, "cmd"),
FIXED_ONE_BYTE_STRING(isolate, "NODE_DEBUG_ENABLED")).FromJust();
Local<Value> argv[] = {
FIXED_ONE_BYTE_STRING(isolate, "internalMessage"),
message
};
MakeCallback(parent_env_->isolate(), process_object, emit_fn.As<Function>(),
arraysize(argv), argv, {0, 0});
NotifyClusterWorkersDebugEnabled(parent_env_);
return true;
}
void Agent::Stop() {
if (io_ != nullptr) {
io_->Stop();
io_.reset();
}
io_.reset();
}
std::unique_ptr<InspectorSession> Agent::Connect(
std::unique_ptr<InspectorSessionDelegate> delegate) {
int session_id = client_->connectFrontend(std::move(delegate));
std::unique_ptr<InspectorSessionDelegate> delegate,
bool prevent_shutdown) {
CHECK_NOT_NULL(client_);
int session_id = client_->connectFrontend(std::move(delegate),
prevent_shutdown);
return std::unique_ptr<InspectorSession>(
new InspectorSession(session_id, client_));
new SameThreadInspectorSession(session_id, client_));
}
void Agent::WaitForDisconnect() {
CHECK_NOT_NULL(client_);
if (client_->hasConnectedSessions()) {
fprintf(stderr, "Waiting for the debugger to disconnect...\n");
fflush(stderr);
}
// TODO(addaleax): Maybe this should use an at-exit hook for the Environment
// or something similar?
client_->contextDestroyed(parent_env_->context());
if (io_ != nullptr) {
io_->WaitForDisconnect();
// There is a bug in V8 Inspector (https://crbug.com/834056) that
// calls V8InspectorClient::quitMessageLoopOnPause when a session
// disconnects. We are using this flag to ignore those calls so the message
// loop is spinning as long as there's a reason to expect inspector messages
client_->runMessageLoop(true);
io_->StopAcceptingNewConnections();
client_->waitForIoShutdown();
}
}
void Agent::FatalException(Local<Value> error, Local<v8::Message> message) {
if (!IsStarted())
if (!IsListening())
return;
client_->FatalException(error, message);
WaitForDisconnect();
@ -718,26 +790,35 @@ void Agent::ContextCreated(Local<Context> context, const ContextInfo& info) {
client_->contextCreated(context, info);
}
bool Agent::IsWaitingForConnect() {
bool Agent::WillWaitForConnect() {
return debug_options_.wait_for_connect();
}
bool Agent::HasConnectedSessions() {
bool Agent::IsActive() {
if (client_ == nullptr)
return false;
return client_->hasConnectedSessions();
return io_ != nullptr || client_->IsActive();
}
InspectorSession::InspectorSession(int session_id,
std::shared_ptr<NodeInspectorClient> client)
: session_id_(session_id), client_(client) {}
InspectorSession::~InspectorSession() {
client_->disconnectFrontend(session_id_);
void Agent::WaitForConnect() {
CHECK_NOT_NULL(client_);
client_->waitForFrontend();
}
void InspectorSession::Dispatch(const StringView& message) {
client_->dispatchMessageFromFrontend(session_id_, message);
SameThreadInspectorSession::~SameThreadInspectorSession() {
auto client = client_.lock();
if (client)
client->disconnectFrontend(session_id_);
}
void SameThreadInspectorSession::Dispatch(
const v8_inspector::StringView& message) {
auto client = client_.lock();
if (client)
client->dispatchMessageFromFrontend(session_id_, message);
}
} // namespace inspector
} // namespace node

Просмотреть файл

@ -20,7 +20,6 @@ class StringView;
namespace node {
// Forward declaration to break recursive dependency chain with src/env.h.
class Environment;
class NodePlatform;
struct ContextInfo;
namespace inspector {
@ -29,12 +28,8 @@ class NodeInspectorClient;
class InspectorSession {
public:
InspectorSession(int session_id, std::shared_ptr<NodeInspectorClient> client);
~InspectorSession();
void Dispatch(const v8_inspector::StringView& message);
private:
int session_id_;
std::shared_ptr<NodeInspectorClient> client_;
virtual ~InspectorSession() {}
virtual void Dispatch(const v8_inspector::StringView& message) = 0;
};
class InspectorSessionDelegate {
@ -50,15 +45,21 @@ class Agent {
~Agent();
// Create client_, may create io_ if option enabled
bool Start(const char* path, const DebugOptions& options);
bool Start(const std::string& path, const DebugOptions& options);
// Stop and destroy io_
void Stop();
bool IsStarted() { return !!client_; }
// IO thread started, and client connected
bool IsWaitingForConnect();
bool IsListening() { return io_ != nullptr; }
// Returns true if the Node inspector is actually in use. It will be true
// if either the user explicitely opted into inspector (e.g. with the
// --inspect command line flag) or if inspector JS API had been used.
bool IsActive();
// Option is set to wait for session connection
bool WillWaitForConnect();
// Blocks till frontend connects and sends "runIfWaitingForDebugger"
void WaitForConnect();
// Blocks till all the sessions with "WaitForDisconnectOnShutdown" disconnect
void WaitForDisconnect();
void FatalException(v8::Local<v8::Value> error,
v8::Local<v8::Message> message);
@ -77,22 +78,20 @@ class Agent {
void EnableAsyncHook();
void DisableAsyncHook();
// Called by the WS protocol and JS binding to create inspector sessions.
// Called to create inspector sessions that can be used from the main thread.
// The inspector responds by using the delegate to send messages back.
std::unique_ptr<InspectorSession> Connect(
std::unique_ptr<InspectorSessionDelegate> delegate);
std::unique_ptr<InspectorSessionDelegate> delegate,
bool prevent_shutdown);
void PauseOnNextJavascriptStatement(const std::string& reason);
// Returns true as long as there is at least one connected session.
bool HasConnectedSessions();
InspectorIo* io() {
return io_.get();
}
// Can only be called from the main thread.
bool StartIoThread(bool wait_for_connect);
bool StartIoThread();
// Calls StartIoThread() from off the main thread.
void RequestIoThreadStart();
@ -105,7 +104,9 @@ class Agent {
const node::Persistent<v8::Function>& fn);
node::Environment* parent_env_;
// Encapsulates majority of the Inspector functionality
std::shared_ptr<NodeInspectorClient> client_;
// Interface for transports, e.g. WebSocket server
std::unique_ptr<InspectorIo> io_;
std::string path_;
DebugOptions debug_options_;

Просмотреть файл

@ -1,6 +1,7 @@
#include "inspector_io.h"
#include "inspector_socket_server.h"
#include "inspector/main_thread_interface.h"
#include "inspector/node_string.h"
#include "env-inl.h"
#include "debug_utils.h"
@ -11,23 +12,16 @@
#include "util.h"
#include "zlib.h"
#include <sstream>
#include <unicode/unistr.h>
#include <deque>
#include <string.h>
#include <vector>
namespace node {
namespace inspector {
namespace {
using AsyncAndAgent = std::pair<uv_async_t, Agent*>;
using v8_inspector::StringBuffer;
using v8_inspector::StringView;
template <typename Transport>
using TransportAndIo = std::pair<Transport*, InspectorIo*>;
std::string ScriptPath(uv_loop_t* loop, const std::string& script_name) {
std::string script_path;
@ -64,45 +58,151 @@ std::string GenerateID() {
return uuid;
}
void HandleSyncCloseCb(uv_handle_t* handle) {
*static_cast<bool*>(handle->data) = true;
}
class RequestToServer {
public:
RequestToServer(TransportAction action,
int session_id,
std::unique_ptr<v8_inspector::StringBuffer> message)
: action_(action),
session_id_(session_id),
message_(std::move(message)) {}
void CloseAsyncAndLoop(uv_async_t* async) {
bool is_closed = false;
async->data = &is_closed;
uv_close(reinterpret_cast<uv_handle_t*>(async), HandleSyncCloseCb);
while (!is_closed)
uv_run(async->loop, UV_RUN_ONCE);
async->data = nullptr;
CheckedUvLoopClose(async->loop);
}
void Dispatch(InspectorSocketServer* server) const {
switch (action_) {
case TransportAction::kKill:
server->TerminateConnections();
// Fallthrough
case TransportAction::kStop:
server->Stop();
break;
case TransportAction::kSendMessage:
server->Send(
session_id_,
protocol::StringUtil::StringViewToUtf8(message_->string()));
break;
}
}
// Delete main_thread_req_ on async handle close
void ReleasePairOnAsyncClose(uv_handle_t* async) {
std::unique_ptr<AsyncAndAgent> pair(node::ContainerOf(&AsyncAndAgent::first,
reinterpret_cast<uv_async_t*>(async)));
// Unique_ptr goes out of scope here and pointer is deleted.
}
private:
TransportAction action_;
int session_id_;
std::unique_ptr<v8_inspector::StringBuffer> message_;
};
class RequestQueueData {
public:
using MessageQueue = std::deque<RequestToServer>;
explicit RequestQueueData(uv_loop_t* loop)
: handle_(std::make_shared<RequestQueue>(this)) {
int err = uv_async_init(loop, &async_, [](uv_async_t* async) {
RequestQueueData* wrapper =
node::ContainerOf(&RequestQueueData::async_, async);
wrapper->DoDispatch();
});
CHECK_EQ(0, err);
}
static void CloseAndFree(RequestQueueData* queue);
void Post(int session_id,
TransportAction action,
std::unique_ptr<StringBuffer> message) {
Mutex::ScopedLock scoped_lock(state_lock_);
bool notify = messages_.empty();
messages_.emplace_back(action, session_id, std::move(message));
if (notify) {
CHECK_EQ(0, uv_async_send(&async_));
incoming_message_cond_.Broadcast(scoped_lock);
}
}
void Wait() {
Mutex::ScopedLock scoped_lock(state_lock_);
if (messages_.empty()) {
incoming_message_cond_.Wait(scoped_lock);
}
}
void SetServer(InspectorSocketServer* server) {
server_ = server;
}
std::shared_ptr<RequestQueue> handle() {
return handle_;
}
private:
~RequestQueueData() = default;
MessageQueue GetMessages() {
Mutex::ScopedLock scoped_lock(state_lock_);
MessageQueue messages;
messages_.swap(messages);
return messages;
}
void DoDispatch() {
if (server_ == nullptr)
return;
for (const auto& request : GetMessages()) {
request.Dispatch(server_);
}
}
std::shared_ptr<RequestQueue> handle_;
uv_async_t async_;
InspectorSocketServer* server_ = nullptr;
MessageQueue messages_;
Mutex state_lock_; // Locked before mutating the queue.
ConditionVariable incoming_message_cond_;
};
} // namespace
std::unique_ptr<StringBuffer> Utf8ToStringView(const std::string& message) {
icu::UnicodeString utf16 =
icu::UnicodeString::fromUTF8(icu::StringPiece(message.data(),
message.length()));
StringView view(reinterpret_cast<const uint16_t*>(utf16.getBuffer()),
utf16.length());
return StringBuffer::create(view);
}
class RequestQueue {
public:
explicit RequestQueue(RequestQueueData* data) : data_(data) {}
void Reset() {
Mutex::ScopedLock scoped_lock(lock_);
data_ = nullptr;
}
void Post(int session_id,
TransportAction action,
std::unique_ptr<StringBuffer> message) {
Mutex::ScopedLock scoped_lock(lock_);
if (data_ != nullptr)
data_->Post(session_id, action, std::move(message));
}
void SetServer(InspectorSocketServer* server) {
Mutex::ScopedLock scoped_lock(lock_);
if (data_ != nullptr)
data_->SetServer(server);
}
bool Expired() {
Mutex::ScopedLock scoped_lock(lock_);
return data_ == nullptr;
}
private:
RequestQueueData* data_;
Mutex lock_;
};
class IoSessionDelegate : public InspectorSessionDelegate {
public:
explicit IoSessionDelegate(InspectorIo* io, int id) : io_(io), id_(id) { }
void SendMessageToFrontend(const v8_inspector::StringView& message) override;
explicit IoSessionDelegate(std::shared_ptr<RequestQueue> queue, int id)
: request_queue_(queue), id_(id) { }
void SendMessageToFrontend(const v8_inspector::StringView& message) override {
request_queue_->Post(id_, TransportAction::kSendMessage,
StringBuffer::create(message));
}
private:
InspectorIo* io_;
std::shared_ptr<RequestQueue> request_queue_;
int id_;
};
@ -110,361 +210,133 @@ class IoSessionDelegate : public InspectorSessionDelegate {
// mostly session start, message received, and session end.
class InspectorIoDelegate: public node::inspector::SocketServerDelegate {
public:
InspectorIoDelegate(InspectorIo* io, const std::string& target_id,
InspectorIoDelegate(std::shared_ptr<RequestQueueData> queue,
std::shared_ptr<MainThreadHandle> main_threade,
const std::string& target_id,
const std::string& script_path,
const std::string& script_name, bool wait);
const std::string& script_name);
~InspectorIoDelegate() {
io_->ServerDone();
}
// Calls PostIncomingMessage() with appropriate InspectorAction:
// kStartSession
void StartSession(int session_id, const std::string& target_id) override;
// kSendMessage
void MessageReceived(int session_id, const std::string& message) override;
// kEndSession
void EndSession(int session_id) override;
std::vector<std::string> GetTargetIds() override;
std::string GetTargetTitle(const std::string& id) override;
std::string GetTargetUrl(const std::string& id) override;
void AssignServer(InspectorSocketServer* server) override {
server_ = server;
request_queue_->SetServer(server);
}
private:
InspectorIo* io_;
int session_id_;
std::shared_ptr<RequestQueueData> request_queue_;
std::shared_ptr<MainThreadHandle> main_thread_;
std::unordered_map<int, std::unique_ptr<InspectorSession>> sessions_;
const std::string script_name_;
const std::string script_path_;
const std::string target_id_;
bool waiting_;
InspectorSocketServer* server_;
};
void InterruptCallback(v8::Isolate*, void* agent) {
InspectorIo* io = static_cast<Agent*>(agent)->io();
if (io != nullptr)
io->DispatchMessages();
// static
std::unique_ptr<InspectorIo> InspectorIo::Start(
std::shared_ptr<MainThreadHandle> main_thread,
const std::string& path,
const DebugOptions& options) {
auto io = std::unique_ptr<InspectorIo>(
new InspectorIo(main_thread, path, options));
if (io->request_queue_->Expired()) { // Thread is not running
return nullptr;
}
return io;
}
class DispatchMessagesTask : public v8::Task {
public:
explicit DispatchMessagesTask(Agent* agent) : agent_(agent) {}
void Run() override {
InspectorIo* io = agent_->io();
if (io != nullptr)
io->DispatchMessages();
}
private:
Agent* agent_;
};
InspectorIo::InspectorIo(Environment* env, const std::string& path,
const DebugOptions& options, bool wait_for_connect)
: options_(options), thread_(), state_(State::kNew),
parent_env_(env), thread_req_(),
platform_(parent_env_->isolate_data()->platform()),
dispatching_messages_(false), script_name_(path),
wait_for_connect_(wait_for_connect), port_(-1),
id_(GenerateID()) {
main_thread_req_ = new AsyncAndAgent({uv_async_t(), env->inspector_agent()});
CHECK_EQ(0, uv_async_init(env->event_loop(), &main_thread_req_->first,
InspectorIo::MainThreadReqAsyncCb));
uv_unref(reinterpret_cast<uv_handle_t*>(&main_thread_req_->first));
CHECK_EQ(0, uv_sem_init(&thread_start_sem_, 0));
InspectorIo::InspectorIo(std::shared_ptr<MainThreadHandle> main_thread,
const std::string& path,
const DebugOptions& options)
: main_thread_(main_thread), options_(options),
thread_(), script_name_(path), id_(GenerateID()) {
Mutex::ScopedLock scoped_lock(thread_start_lock_);
CHECK_EQ(uv_thread_create(&thread_, InspectorIo::ThreadMain, this), 0);
thread_start_condition_.Wait(scoped_lock);
}
InspectorIo::~InspectorIo() {
uv_sem_destroy(&thread_start_sem_);
uv_close(reinterpret_cast<uv_handle_t*>(&main_thread_req_->first),
ReleasePairOnAsyncClose);
}
bool InspectorIo::Start() {
CHECK_EQ(state_, State::kNew);
CHECK_EQ(uv_thread_create(&thread_, InspectorIo::ThreadMain, this), 0);
uv_sem_wait(&thread_start_sem_);
if (state_ == State::kError) {
return false;
}
state_ = State::kAccepting;
if (wait_for_connect_) {
DispatchMessages();
}
return true;
}
void InspectorIo::Stop() {
CHECK_IMPLIES(sessions_.empty(), state_ == State::kAccepting);
Write(TransportAction::kKill, 0, StringView());
request_queue_->Post(0, TransportAction::kKill, nullptr);
int err = uv_thread_join(&thread_);
CHECK_EQ(err, 0);
state_ = State::kShutDown;
DispatchMessages();
}
bool InspectorIo::IsStarted() {
return platform_ != nullptr;
}
void InspectorIo::WaitForDisconnect() {
if (state_ == State::kAccepting)
state_ = State::kDone;
if (!sessions_.empty()) {
state_ = State::kShutDown;
Write(TransportAction::kStop, 0, StringView());
fprintf(stderr, "Waiting for the debugger to disconnect...\n");
fflush(stderr);
}
void InspectorIo::StopAcceptingNewConnections() {
request_queue_->Post(0, TransportAction::kStop, nullptr);
}
// static
void InspectorIo::ThreadMain(void* io) {
static_cast<InspectorIo*>(io)->ThreadMain<InspectorSocketServer>();
static_cast<InspectorIo*>(io)->ThreadMain();
}
// static
template <typename Transport>
void InspectorIo::IoThreadAsyncCb(uv_async_t* async) {
TransportAndIo<Transport>* transport_and_io =
static_cast<TransportAndIo<Transport>*>(async->data);
if (transport_and_io == nullptr) {
return;
}
Transport* transport = transport_and_io->first;
InspectorIo* io = transport_and_io->second;
MessageQueue<TransportAction> outgoing_message_queue;
io->SwapBehindLock(&io->outgoing_message_queue_, &outgoing_message_queue);
for (const auto& outgoing : outgoing_message_queue) {
int session_id = std::get<1>(outgoing);
switch (std::get<0>(outgoing)) {
case TransportAction::kKill:
transport->TerminateConnections();
// Fallthrough
case TransportAction::kStop:
transport->Stop();
break;
case TransportAction::kSendMessage:
transport->Send(session_id,
protocol::StringUtil::StringViewToUtf8(
std::get<2>(outgoing)->string()));
break;
case TransportAction::kAcceptSession:
transport->AcceptSession(session_id);
break;
case TransportAction::kDeclineSession:
transport->DeclineSession(session_id);
break;
}
}
}
template <typename Transport>
void InspectorIo::ThreadMain() {
uv_loop_t loop;
loop.data = nullptr;
int err = uv_loop_init(&loop);
CHECK_EQ(err, 0);
thread_req_.data = nullptr;
err = uv_async_init(&loop, &thread_req_, IoThreadAsyncCb<Transport>);
CHECK_EQ(err, 0);
std::shared_ptr<RequestQueueData> queue(new RequestQueueData(&loop),
RequestQueueData::CloseAndFree);
std::string script_path = ScriptPath(&loop, script_name_);
auto delegate = std::unique_ptr<InspectorIoDelegate>(
new InspectorIoDelegate(this, id_, script_path, script_name_,
wait_for_connect_));
Transport server(std::move(delegate), &loop, options_.host_name(),
options_.port());
TransportAndIo<Transport> queue_transport(&server, this);
thread_req_.data = &queue_transport;
if (!server.Start()) {
state_ = State::kError; // Safe, main thread is waiting on semaphore
CloseAsyncAndLoop(&thread_req_);
uv_sem_post(&thread_start_sem_);
return;
}
port_ = server.Port(); // Safe, main thread is waiting on semaphore.
if (!wait_for_connect_) {
uv_sem_post(&thread_start_sem_);
std::unique_ptr<InspectorIoDelegate> delegate(
new InspectorIoDelegate(queue, main_thread_, id_,
script_path, script_name_));
InspectorSocketServer server(std::move(delegate), &loop,
options_.host_name(), options_.port());
request_queue_ = queue->handle();
// Its lifetime is now that of the server delegate
queue.reset();
{
Mutex::ScopedLock scoped_lock(thread_start_lock_);
if (server.Start()) {
port_ = server.Port();
}
thread_start_condition_.Broadcast(scoped_lock);
}
uv_run(&loop, UV_RUN_DEFAULT);
thread_req_.data = nullptr;
CheckedUvLoopClose(&loop);
}
template <typename ActionType>
bool InspectorIo::AppendMessage(MessageQueue<ActionType>* queue,
ActionType action, int session_id,
std::unique_ptr<StringBuffer> buffer) {
Mutex::ScopedLock scoped_lock(state_lock_);
bool trigger_pumping = queue->empty();
queue->push_back(std::make_tuple(action, session_id, std::move(buffer)));
return trigger_pumping;
}
template <typename ActionType>
void InspectorIo::SwapBehindLock(MessageQueue<ActionType>* vector1,
MessageQueue<ActionType>* vector2) {
Mutex::ScopedLock scoped_lock(state_lock_);
vector1->swap(*vector2);
}
void InspectorIo::PostIncomingMessage(InspectorAction action, int session_id,
const std::string& message) {
Debug(parent_env_, DebugCategory::INSPECTOR_SERVER,
">>> %s\n", message.c_str());
if (AppendMessage(&incoming_message_queue_, action, session_id,
Utf8ToStringView(message))) {
Agent* agent = main_thread_req_->second;
v8::Isolate* isolate = parent_env_->isolate();
platform_->CallOnForegroundThread(isolate,
new DispatchMessagesTask(agent));
isolate->RequestInterrupt(InterruptCallback, agent);
CHECK_EQ(0, uv_async_send(&main_thread_req_->first));
}
Mutex::ScopedLock scoped_lock(state_lock_);
incoming_message_cond_.Broadcast(scoped_lock);
}
std::vector<std::string> InspectorIo::GetTargetIds() const {
return { id_ };
}
TransportAction InspectorIo::Attach(int session_id) {
Agent* agent = parent_env_->inspector_agent();
fprintf(stderr, "Debugger attached.\n");
sessions_[session_id] = agent->Connect(std::unique_ptr<IoSessionDelegate>(
new IoSessionDelegate(this, session_id)));
return TransportAction::kAcceptSession;
}
void InspectorIo::DispatchMessages() {
if (dispatching_messages_)
return;
dispatching_messages_ = true;
bool had_messages = false;
do {
if (dispatching_message_queue_.empty())
SwapBehindLock(&incoming_message_queue_, &dispatching_message_queue_);
had_messages = !dispatching_message_queue_.empty();
while (!dispatching_message_queue_.empty()) {
MessageQueue<InspectorAction>::value_type task;
std::swap(dispatching_message_queue_.front(), task);
dispatching_message_queue_.pop_front();
int id = std::get<1>(task);
StringView message = std::get<2>(task)->string();
switch (std::get<0>(task)) {
case InspectorAction::kStartSession:
Write(Attach(id), id, StringView());
break;
case InspectorAction::kStartSessionUnconditionally:
Attach(id);
break;
case InspectorAction::kEndSession:
sessions_.erase(id);
if (!sessions_.empty())
continue;
if (state_ == State::kShutDown) {
state_ = State::kDone;
} else {
state_ = State::kAccepting;
}
break;
case InspectorAction::kSendMessage:
auto session = sessions_.find(id);
if (session != sessions_.end() && session->second) {
session->second->Dispatch(message);
}
break;
}
}
} while (had_messages);
dispatching_messages_ = false;
}
// static
void InspectorIo::MainThreadReqAsyncCb(uv_async_t* req) {
AsyncAndAgent* pair = node::ContainerOf(&AsyncAndAgent::first, req);
// Note that this may be called after io was closed or even after a new
// one was created and ran.
InspectorIo* io = pair->second->io();
if (io != nullptr)
io->DispatchMessages();
}
void InspectorIo::Write(TransportAction action, int session_id,
const StringView& inspector_message) {
std::string message_str =
protocol::StringUtil::StringViewToUtf8(inspector_message);
Debug(parent_env_, DebugCategory::INSPECTOR_SERVER,
"<<< %s\n", message_str.c_str());
AppendMessage(&outgoing_message_queue_, action, session_id,
StringBuffer::create(inspector_message));
int err = uv_async_send(&thread_req_);
CHECK_EQ(0, err);
}
bool InspectorIo::WaitForFrontendEvent() {
// We allow DispatchMessages reentry as we enter the pause. This is important
// to support debugging the code invoked by an inspector call, such
// as Runtime.evaluate
dispatching_messages_ = false;
Mutex::ScopedLock scoped_lock(state_lock_);
if (sessions_.empty())
return false;
if (dispatching_message_queue_.empty() && incoming_message_queue_.empty()) {
incoming_message_cond_.Wait(scoped_lock);
}
return true;
}
InspectorIoDelegate::InspectorIoDelegate(InspectorIo* io,
const std::string& target_id,
const std::string& script_path,
const std::string& script_name,
bool wait)
: io_(io),
session_id_(0),
script_name_(script_name),
script_path_(script_path),
target_id_(target_id),
waiting_(wait),
server_(nullptr) { }
InspectorIoDelegate::InspectorIoDelegate(
std::shared_ptr<RequestQueueData> queue,
std::shared_ptr<MainThreadHandle> main_thread,
const std::string& target_id,
const std::string& script_path,
const std::string& script_name)
: request_queue_(queue), main_thread_(main_thread),
script_name_(script_name), script_path_(script_path),
target_id_(target_id) {}
void InspectorIoDelegate::StartSession(int session_id,
const std::string& target_id) {
session_id_ = session_id;
InspectorAction action = InspectorAction::kStartSession;
if (waiting_) {
action = InspectorAction::kStartSessionUnconditionally;
server_->AcceptSession(session_id);
auto session = main_thread_->Connect(
std::unique_ptr<InspectorSessionDelegate>(
new IoSessionDelegate(request_queue_->handle(), session_id)), true);
if (session) {
sessions_[session_id] = std::move(session);
fprintf(stderr, "Debugger attached.\n");
}
io_->PostIncomingMessage(action, session_id, "");
}
void InspectorIoDelegate::MessageReceived(int session_id,
const std::string& message) {
// TODO(pfeldman): Instead of blocking execution while debugger
// engages, node should wait for the run callback from the remote client
// and initiate its startup. This is a change to node.cc that should be
// upstreamed separately.
if (waiting_) {
if (message.find("\"Runtime.runIfWaitingForDebugger\"") !=
std::string::npos) {
waiting_ = false;
io_->ResumeStartup();
}
}
io_->PostIncomingMessage(InspectorAction::kSendMessage, session_id,
message);
auto session = sessions_.find(session_id);
if (session != sessions_.end())
session->second->Dispatch(Utf8ToStringView(message)->string());
}
void InspectorIoDelegate::EndSession(int session_id) {
io_->PostIncomingMessage(InspectorAction::kEndSession, session_id, "");
sessions_.erase(session_id);
}
std::vector<std::string> InspectorIoDelegate::GetTargetIds() {
@ -479,10 +351,17 @@ std::string InspectorIoDelegate::GetTargetUrl(const std::string& id) {
return "file://" + script_path_;
}
void IoSessionDelegate::SendMessageToFrontend(
const v8_inspector::StringView& message) {
io_->Write(TransportAction::kSendMessage, id_, message);
// static
void RequestQueueData::CloseAndFree(RequestQueueData* queue) {
queue->handle_->Reset();
queue->handle_.reset();
uv_close(reinterpret_cast<uv_handle_t*>(&queue->async_),
[](uv_handle_t* handle) {
uv_async_t* async = reinterpret_cast<uv_async_t*>(handle);
RequestQueueData* wrapper =
node::ContainerOf(&RequestQueueData::async_, async);
delete wrapper;
});
}
} // namespace inspector
} // namespace node

Просмотреть файл

@ -6,8 +6,6 @@
#include "node_mutex.h"
#include "uv.h"
#include <deque>
#include <unordered_map>
#include <memory>
#include <stddef.h>
@ -16,17 +14,14 @@
#endif
// Forward declaration to break recursive dependency chain with src/env.h.
namespace node {
class Environment;
} // namespace node
namespace v8_inspector {
class StringBuffer;
class StringView;
} // namespace v8_inspector
namespace node {
// Forward declaration to break recursive dependency chain with src/env.h.
class Environment;
namespace inspector {
std::string FormatWsAddress(const std::string& host, int port,
@ -34,143 +29,64 @@ std::string FormatWsAddress(const std::string& host, int port,
bool include_protocol);
class InspectorIoDelegate;
enum class InspectorAction {
kStartSession,
kStartSessionUnconditionally, // First attach with --inspect-brk
kEndSession,
kSendMessage
};
class MainThreadHandle;
class RequestQueue;
// kKill closes connections and stops the server, kStop only stops the server
enum class TransportAction {
kKill,
kSendMessage,
kStop,
kAcceptSession,
kDeclineSession
kStop
};
class InspectorIo {
public:
InspectorIo(node::Environment* env, const std::string& path,
const DebugOptions& options, bool wait_for_connect);
// Start the inspector agent thread, waiting for it to initialize
// bool Start();
// Returns empty pointer if thread was not started
static std::unique_ptr<InspectorIo> Start(
std::shared_ptr<MainThreadHandle> main_thread, const std::string& path,
const DebugOptions& options);
// Will block till the transport thread shuts down
~InspectorIo();
// Start the inspector agent thread, waiting for it to initialize,
// and waiting as well for a connection if wait_for_connect.
bool Start();
// Stop the inspector agent thread.
void Stop();
bool IsStarted();
void WaitForDisconnect();
// Called from thread to queue an incoming message and trigger
// DispatchMessages() on the main thread.
void PostIncomingMessage(InspectorAction action, int session_id,
const std::string& message);
void ResumeStartup() {
uv_sem_post(&thread_start_sem_);
}
void ServerDone() {
uv_close(reinterpret_cast<uv_handle_t*>(&thread_req_), nullptr);
}
bool WaitForFrontendEvent();
int port() const { return port_; }
void StopAcceptingNewConnections();
std::string host() const { return options_.host_name(); }
int port() const { return port_; }
std::vector<std::string> GetTargetIds() const;
private:
template <typename Action>
using MessageQueue =
std::deque<std::tuple<Action, int,
std::unique_ptr<v8_inspector::StringBuffer>>>;
enum class State {
kNew,
kAccepting,
kDone,
kError,
kShutDown
};
// Callback for main_thread_req_'s uv_async_t
static void MainThreadReqAsyncCb(uv_async_t* req);
InspectorIo(std::shared_ptr<MainThreadHandle> handle,
const std::string& path, const DebugOptions& options);
// Wrapper for agent->ThreadMain()
static void ThreadMain(void* agent);
// Runs a uv_loop_t
template <typename Transport> void ThreadMain();
// Called by ThreadMain's loop when triggered by thread_req_, writes
// messages from outgoing_message_queue to the InspectorSockerServer
template <typename Transport> static void IoThreadAsyncCb(uv_async_t* async);
void DispatchMessages();
// Write action to outgoing_message_queue, and wake the thread
void Write(TransportAction action, int session_id,
const v8_inspector::StringView& message);
// Thread-safe append of message to a queue. Return true if the queue
// used to be empty.
template <typename ActionType>
bool AppendMessage(MessageQueue<ActionType>* vector, ActionType action,
int session_id,
std::unique_ptr<v8_inspector::StringBuffer> buffer);
// Used as equivalent of a thread-safe "pop" of an entire queue's content.
template <typename ActionType>
void SwapBehindLock(MessageQueue<ActionType>* vector1,
MessageQueue<ActionType>* vector2);
// Attach session to an inspector. Either kAcceptSession or kDeclineSession
TransportAction Attach(int session_id);
void ThreadMain();
// This is a thread-safe object that will post async tasks. It lives as long
// as an Inspector object lives (almost as long as an Isolate).
std::shared_ptr<MainThreadHandle> main_thread_;
// Used to post on a frontend interface thread, lives while the server is
// running
std::shared_ptr<RequestQueue> request_queue_;
const DebugOptions options_;
// The IO thread runs its own uv_loop to implement the TCP server off
// the main thread.
uv_thread_t thread_;
// Used by Start() to wait for thread to initialize, or for it to initialize
// and receive a connection if wait_for_connect was requested.
uv_sem_t thread_start_sem_;
State state_;
node::Environment* parent_env_;
// Attached to the uv_loop in ThreadMain()
uv_async_t thread_req_;
// Note that this will live while the async is being closed - likely, past
// the parent object lifespan
std::pair<uv_async_t, Agent*>* main_thread_req_;
// Will be used to post tasks from another thread
v8::Platform* const platform_;
// Message queues
ConditionVariable incoming_message_cond_;
Mutex state_lock_; // Locked before mutating either queue.
MessageQueue<InspectorAction> incoming_message_queue_;
MessageQueue<TransportAction> outgoing_message_queue_;
// This queue is to maintain the order of the messages for the cases
// when we reenter the DispatchMessages function.
MessageQueue<InspectorAction> dispatching_message_queue_;
bool dispatching_messages_;
// For setting up interthread communications
Mutex thread_start_lock_;
ConditionVariable thread_start_condition_;
std::string script_name_;
std::string script_path_;
const bool wait_for_connect_;
int port_;
std::unordered_map<int, std::unique_ptr<InspectorSession>> sessions_;
int port_ = -1;
// May be accessed from any thread
const std::string id_;
friend class DispatchMessagesTask;
friend class IoSessionDelegate;
friend void InterruptCallback(v8::Isolate*, void* agent);
};
std::unique_ptr<v8_inspector::StringBuffer> Utf8ToStringView(
const std::string& message);
} // namespace inspector
} // namespace node

Просмотреть файл

@ -66,7 +66,7 @@ class JSBindingsConnection : public AsyncWrap {
callback_(env->isolate(), callback) {
Agent* inspector = env->inspector_agent();
session_ = inspector->Connect(std::unique_ptr<JSBindingsSessionDelegate>(
new JSBindingsSessionDelegate(env, this)));
new JSBindingsSessionDelegate(env, this)), false);
}
void OnMessage(Local<Value> value) {
@ -116,7 +116,7 @@ class JSBindingsConnection : public AsyncWrap {
static bool InspectorEnabled(Environment* env) {
Agent* agent = env->inspector_agent();
return agent->io() != nullptr || agent->HasConnectedSessions();
return agent->IsActive();
}
void AddCommandLineAPI(const FunctionCallbackInfo<Value>& info) {
@ -251,8 +251,9 @@ void Open(const FunctionCallbackInfo<Value>& args) {
if (args.Length() > 2 && args[2]->IsBoolean()) {
wait_for_connect = args[2]->BooleanValue(env->context()).FromJust();
}
agent->StartIoThread(wait_for_connect);
agent->StartIoThread();
if (wait_for_connect)
agent->WaitForConnect();
}
void Url(const FunctionCallbackInfo<Value>& args) {
@ -283,7 +284,7 @@ void Initialize(Local<Object> target, Local<Value> unused,
Agent* agent = env->inspector_agent();
env->SetMethod(target, "consoleCall", InspectorConsoleCall);
env->SetMethod(target, "addCommandLineAPI", AddCommandLineAPI);
if (agent->IsWaitingForConnect())
if (agent->WillWaitForConnect())
env->SetMethod(target, "callAndPauseOnStart", CallAndPauseOnStart);
env->SetMethod(target, "open", Open);
env->SetMethod(target, "url", Url);

Просмотреть файл

@ -173,11 +173,8 @@ class SocketSession {
InspectorSocket* ws_socket() {
return ws_socket_.get();
}
void set_ws_key(const std::string& ws_key) {
ws_key_ = ws_key;
}
void Accept() {
ws_socket_->AcceptUpgrade(ws_key_);
void Accept(const std::string& ws_key) {
ws_socket_->AcceptUpgrade(ws_key);
}
void Decline() {
ws_socket_->CancelHandshake();
@ -208,7 +205,6 @@ class SocketSession {
const int id_;
InspectorSocket::Pointer ws_socket_;
const int server_port_;
std::string ws_key_;
};
class ServerSocket {
@ -260,11 +256,11 @@ void InspectorSocketServer::SessionStarted(int session_id,
const std::string& ws_key) {
SocketSession* session = Session(session_id);
if (!TargetExists(id)) {
Session(session_id)->Decline();
session->Decline();
return;
}
connected_sessions_[session_id].first = id;
session->set_ws_key(ws_key);
session->Accept(ws_key);
delegate_->StartSession(session_id, id);
}
@ -404,6 +400,8 @@ bool InspectorSocketServer::Start() {
}
void InspectorSocketServer::Stop() {
if (state_ == ServerState::kStopped)
return;
CHECK_EQ(state_, ServerState::kRunning);
state_ = ServerState::kStopped;
server_sockets_.clear();
@ -446,23 +444,6 @@ void InspectorSocketServer::Accept(int server_port,
}
}
void InspectorSocketServer::AcceptSession(int session_id) {
SocketSession* session = Session(session_id);
if (session == nullptr) {
delegate_->EndSession(session_id);
} else {
session->Accept();
}
}
void InspectorSocketServer::DeclineSession(int session_id) {
auto it = connected_sessions_.find(session_id);
if (it != connected_sessions_.end()) {
it->second.first.clear();
it->second.second->Decline();
}
}
void InspectorSocketServer::Send(int session_id, const std::string& message) {
SocketSession* session = Session(session_id);
if (session != nullptr) {

Просмотреть файл

@ -54,10 +54,6 @@ class InspectorSocketServer {
void Send(int session_id, const std::string& message);
// kKill
void TerminateConnections();
// kAcceptSession
void AcceptSession(int session_id);
// kDeclineSession
void DeclineSession(int session_id);
int Port() const;
// Session connection lifecycle

Просмотреть файл

@ -323,11 +323,12 @@ static struct {
// Inspector agent can't fail to start, but if it was configured to listen
// right away on the websocket port and fails to bind/etc, this will return
// false.
return env->inspector_agent()->Start(script_path, options);
return env->inspector_agent()->Start(
script_path == nullptr ? "" : script_path, options);
}
bool InspectorStarted(Environment* env) {
return env->inspector_agent()->IsStarted();
return env->inspector_agent()->IsListening();
}
#endif // HAVE_INSPECTOR
@ -1510,7 +1511,7 @@ void InitGroups(const FunctionCallbackInfo<Value>& args) {
static void WaitForInspectorDisconnect(Environment* env) {
#if HAVE_INSPECTOR
if (env->inspector_agent()->HasConnectedSessions()) {
if (env->inspector_agent()->IsActive()) {
// Restore signal dispositions, the app is done and is no longer
// capable of handling signals.
#if defined(__POSIX__) && !defined(NODE_SHARED_MODE)
@ -3517,7 +3518,7 @@ static void ParseArgs(int* argc,
static void StartInspector(Environment* env, const char* path,
DebugOptions debug_options) {
#if HAVE_INSPECTOR
CHECK(!env->inspector_agent()->IsStarted());
CHECK(!env->inspector_agent()->IsListening());
v8_platform.StartInspector(env, path, debug_options);
#endif // HAVE_INSPECTOR
}
@ -3660,7 +3661,7 @@ static void DebugProcess(const FunctionCallbackInfo<Value>& args) {
static void DebugEnd(const FunctionCallbackInfo<Value>& args) {
#if HAVE_INSPECTOR
Environment* env = Environment::GetCurrent(args);
if (env->inspector_agent()->IsStarted()) {
if (env->inspector_agent()->IsListening()) {
env->inspector_agent()->Stop();
}
#endif

Просмотреть файл

@ -92,7 +92,7 @@ class SignalWrap : public HandleWrap {
#if defined(__POSIX__) && HAVE_INSPECTOR
if (signum == SIGPROF) {
Environment* env = Environment::GetCurrent(args);
if (env->inspector_agent()->IsStarted()) {
if (env->inspector_agent()->IsListening()) {
ProcessEmitWarning(env,
"process.on(SIGPROF) is reserved while debugging");
return;

Просмотреть файл

@ -14,7 +14,6 @@ static const char CLIENT_CLOSE_FRAME[] = "\x88\x80\x2D\x0E\x1E\xFA";
static const char SERVER_CLOSE_FRAME[] = "\x88\x00";
static const char MAIN_TARGET_ID[] = "main-target";
static const char UNCONNECTABLE_TARGET_ID[] = "unconnectable-target";
static const char WS_HANDSHAKE_RESPONSE[] =
"HTTP/1.1 101 Switching Protocols\r\n"
@ -258,10 +257,6 @@ class ServerHolder {
return server_->done();
}
void Connected() {
connected++;
}
void Disconnected() {
disconnected++;
}
@ -270,9 +265,10 @@ class ServerHolder {
delegate_done = true;
}
void PrepareSession(int id) {
void Connected(int id) {
buffer_.clear();
session_id_ = id;
connected++;
}
void Received(const std::string& message) {
@ -319,15 +315,9 @@ class TestSocketServerDelegate : public SocketServerDelegate {
void StartSession(int session_id, const std::string& target_id) override {
session_id_ = session_id;
harness_->PrepareSession(session_id_);
CHECK_NE(targets_.end(),
std::find(targets_.begin(), targets_.end(), target_id));
if (target_id == UNCONNECTABLE_TARGET_ID) {
server_->DeclineSession(session_id);
return;
}
harness_->Connected();
server_->AcceptSession(session_id);
harness_->Connected(session_id_);
}
void MessageReceived(int session_id, const std::string& message) override {
@ -363,7 +353,7 @@ ServerHolder::ServerHolder(bool has_targets, uv_loop_t* loop,
const std::string host, int port, FILE* out) {
std::vector<std::string> targets;
if (has_targets)
targets = { MAIN_TARGET_ID, UNCONNECTABLE_TARGET_ID };
targets = { MAIN_TARGET_ID };
std::unique_ptr<TestSocketServerDelegate> delegate(
new TestSocketServerDelegate(this, targets));
server_.reset(
@ -414,15 +404,6 @@ TEST_F(InspectorSocketServerTest, InspectorSessions) {
well_behaved_socket.Close();
// Declined connection
SocketWrapper declined_target_socket(&loop);
declined_target_socket.Connect(HOST, server.port());
declined_target_socket.Write(WsHandshakeRequest(UNCONNECTABLE_TARGET_ID));
declined_target_socket.Expect("HTTP/1.0 400 Bad Request");
declined_target_socket.ExpectEOF();
EXPECT_EQ(1, server.connected);
EXPECT_EQ(1, server.disconnected);
// Bogus target - start session callback should not even be invoked
SocketWrapper bogus_target_socket(&loop);
bogus_target_socket.Connect(HOST, server.port());
@ -491,7 +472,7 @@ TEST_F(InspectorSocketServerTest, ServerWithoutTargets) {
// Declined connection
SocketWrapper socket(&loop);
socket.Connect(HOST, server.port());
socket.Write(WsHandshakeRequest(UNCONNECTABLE_TARGET_ID));
socket.Write(WsHandshakeRequest("any target id"));
socket.Expect("HTTP/1.0 400 Bad Request");
socket.ExpectEOF();
server->Stop();

Просмотреть файл

@ -1,3 +1,4 @@
// Flags: --inspect=0
'use strict';
const common = require('../common');