diff --git a/node.gyp b/node.gyp index ae25427819..e0bd57dde5 100644 --- a/node.gyp +++ b/node.gyp @@ -490,12 +490,14 @@ 'src/inspector_js_api.cc', 'src/inspector_socket.cc', 'src/inspector_socket_server.cc', - 'src/inspector/tracing_agent.cc', + 'src/inspector/main_thread_interface.cc', 'src/inspector/node_string.cc', + 'src/inspector/tracing_agent.cc', 'src/inspector_agent.h', 'src/inspector_io.h', 'src/inspector_socket.h', 'src/inspector_socket_server.h', + 'src/inspector/main_thread_interface.h', 'src/inspector/node_string.h', 'src/inspector/tracing_agent.h', '<@(node_inspector_generated_sources)' diff --git a/src/inspector/main_thread_interface.cc b/src/inspector/main_thread_interface.cc new file mode 100644 index 0000000000..da43c95bea --- /dev/null +++ b/src/inspector/main_thread_interface.cc @@ -0,0 +1,317 @@ +#include "main_thread_interface.h" + +#include "node_mutex.h" +#include "v8-inspector.h" + +#include + +namespace node { +namespace inspector { +namespace { + +using v8_inspector::StringView; +using v8_inspector::StringBuffer; + +template +class DeleteRequest : public Request { + public: + explicit DeleteRequest(T* object) : object_(object) {} + void Call() override { + delete object_; + } + + private: + T* object_; +}; + +template +class SingleArgumentFunctionCall : public Request { + public: + using Fn = void (Target::*)(Arg); + + SingleArgumentFunctionCall(Target* target, Fn fn, Arg argument) + : target_(target), + fn_(fn), + arg_(std::move(argument)) {} + + void Call() override { + Apply(target_, fn_, std::move(arg_)); + } + + private: + template + void Apply(Element* target, Fn fn, Arg arg) { + (target->*fn)(std::move(arg)); + } + + Target* target_; + Fn fn_; + Arg arg_; +}; + +class PostMessageRequest : public Request { + public: + PostMessageRequest(InspectorSessionDelegate* delegate, + StringView message) + : delegate_(delegate), + message_(StringBuffer::create(message)) {} + + void Call() override { + delegate_->SendMessageToFrontend(message_->string()); + } + + private: + InspectorSessionDelegate* delegate_; + std::unique_ptr message_; +}; + +class DispatchMessagesTask : public v8::Task { + public: + explicit DispatchMessagesTask(MainThreadInterface* thread) + : thread_(thread) {} + + void Run() override { + thread_->DispatchMessages(); + } + + private: + MainThreadInterface* thread_; +}; + +void DisposePairCallback(uv_handle_t* ref) { + using AsyncAndInterface = std::pair; + AsyncAndInterface* pair = node::ContainerOf( + &AsyncAndInterface::first, reinterpret_cast(ref)); + delete pair; +} + +template +class AnotherThreadObjectReference { + public: + // We create it on whatever thread, just make sure it gets disposed on the + // proper thread. + AnotherThreadObjectReference(std::shared_ptr thread, + T* object) + : thread_(thread), object_(object) { + } + AnotherThreadObjectReference(AnotherThreadObjectReference&) = delete; + + ~AnotherThreadObjectReference() { + // Disappearing thread may cause a memory leak + CHECK(thread_->Post( + std::unique_ptr>(new DeleteRequest(object_)))); + object_ = nullptr; + } + + template + void Post(Fn fn, Arg argument) const { + using R = SingleArgumentFunctionCall; + thread_->Post(std::unique_ptr(new R(object_, fn, std::move(argument)))); + } + + T* get() const { + return object_; + } + + private: + std::shared_ptr thread_; + T* object_; +}; + +class MainThreadSessionState { + public: + MainThreadSessionState( + std::shared_ptr thread, + bool prevent_shutdown) : thread_(thread), + prevent_shutdown_(prevent_shutdown) {} + + void Connect(std::unique_ptr delegate) { + Agent* agent = thread_->GetInspectorAgent(); + if (agent != nullptr) + session_ = agent->Connect(std::move(delegate), prevent_shutdown_); + } + + void Dispatch(std::unique_ptr message) { + session_->Dispatch(message->string()); + } + + private: + std::shared_ptr thread_; + bool prevent_shutdown_; + std::unique_ptr session_; +}; + +class CrossThreadInspectorSession : public InspectorSession { + public: + CrossThreadInspectorSession( + int id, + std::shared_ptr thread, + std::unique_ptr delegate, + bool prevent_shutdown) + : state_(thread, new MainThreadSessionState(thread, prevent_shutdown)) { + state_.Post(&MainThreadSessionState::Connect, std::move(delegate)); + } + + void Dispatch(const StringView& message) override { + state_.Post(&MainThreadSessionState::Dispatch, + StringBuffer::create(message)); + } + + private: + AnotherThreadObjectReference state_; +}; + +class ThreadSafeDelegate : public InspectorSessionDelegate { + public: + ThreadSafeDelegate(std::shared_ptr thread, + std::unique_ptr delegate) + : thread_(thread), delegate_(thread, delegate.release()) {} + + void SendMessageToFrontend(const v8_inspector::StringView& message) override { + thread_->Post(std::unique_ptr( + new PostMessageRequest(delegate_.get(), message))); + } + + private: + std::shared_ptr thread_; + AnotherThreadObjectReference delegate_; +}; +} // namespace + + +MainThreadInterface::MainThreadInterface(Agent* agent, uv_loop_t* loop, + v8::Isolate* isolate, + v8::Platform* platform) + : agent_(agent), isolate_(isolate), + platform_(platform) { + main_thread_request_.reset(new AsyncAndInterface(uv_async_t(), this)); + CHECK_EQ(0, uv_async_init(loop, &main_thread_request_->first, + DispatchMessagesAsyncCallback)); + // Inspector uv_async_t should not prevent main loop shutdown. + uv_unref(reinterpret_cast(&main_thread_request_->first)); +} + +MainThreadInterface::~MainThreadInterface() { + if (handle_) + handle_->Reset(); +} + +// static +void MainThreadInterface::DispatchMessagesAsyncCallback(uv_async_t* async) { + AsyncAndInterface* asyncAndInterface = + node::ContainerOf(&AsyncAndInterface::first, async); + asyncAndInterface->second->DispatchMessages(); +} + +// static +void MainThreadInterface::CloseAsync(AsyncAndInterface* pair) { + uv_close(reinterpret_cast(&pair->first), DisposePairCallback); +} + +void MainThreadInterface::Post(std::unique_ptr request) { + Mutex::ScopedLock scoped_lock(requests_lock_); + bool needs_notify = requests_.empty(); + requests_.push_back(std::move(request)); + if (needs_notify) { + CHECK_EQ(0, uv_async_send(&main_thread_request_->first)); + if (isolate_ != nullptr && platform_ != nullptr) { + platform_->CallOnForegroundThread(isolate_, + new DispatchMessagesTask(this)); + isolate_->RequestInterrupt([](v8::Isolate* isolate, void* thread) { + static_cast(thread)->DispatchMessages(); + }, this); + } + } + incoming_message_cond_.Broadcast(scoped_lock); +} + +bool MainThreadInterface::WaitForFrontendEvent() { + // We allow DispatchMessages reentry as we enter the pause. This is important + // to support debugging the code invoked by an inspector call, such + // as Runtime.evaluate + dispatching_messages_ = false; + if (dispatching_message_queue_.empty()) { + Mutex::ScopedLock scoped_lock(requests_lock_); + while (requests_.empty()) incoming_message_cond_.Wait(scoped_lock); + } + return true; +} + +void MainThreadInterface::DispatchMessages() { + if (dispatching_messages_) + return; + dispatching_messages_ = true; + bool had_messages = false; + do { + if (dispatching_message_queue_.empty()) { + Mutex::ScopedLock scoped_lock(requests_lock_); + requests_.swap(dispatching_message_queue_); + } + had_messages = !dispatching_message_queue_.empty(); + while (!dispatching_message_queue_.empty()) { + MessageQueue::value_type task; + std::swap(dispatching_message_queue_.front(), task); + dispatching_message_queue_.pop_front(); + task->Call(); + } + } while (had_messages); + dispatching_messages_ = false; +} + +std::shared_ptr MainThreadInterface::GetHandle() { + if (handle_ == nullptr) + handle_ = std::make_shared(this); + return handle_; +} + +std::unique_ptr Utf8ToStringView(const std::string& message) { + icu::UnicodeString utf16 = icu::UnicodeString::fromUTF8( + icu::StringPiece(message.data(), message.length())); + StringView view(reinterpret_cast(utf16.getBuffer()), + utf16.length()); + return StringBuffer::create(view); +} + +std::unique_ptr MainThreadHandle::Connect( + std::unique_ptr delegate, + bool prevent_shutdown) { + return std::unique_ptr( + new CrossThreadInspectorSession(++next_session_id_, + shared_from_this(), + std::move(delegate), + prevent_shutdown)); +} + +bool MainThreadHandle::Post(std::unique_ptr request) { + Mutex::ScopedLock scoped_lock(block_lock_); + if (!main_thread_) + return false; + main_thread_->Post(std::move(request)); + return true; +} + +void MainThreadHandle::Reset() { + Mutex::ScopedLock scoped_lock(block_lock_); + main_thread_ = nullptr; +} + +Agent* MainThreadHandle::GetInspectorAgent() { + Mutex::ScopedLock scoped_lock(block_lock_); + if (main_thread_ == nullptr) + return nullptr; + return main_thread_->inspector_agent(); +} + +std::unique_ptr +MainThreadHandle::MakeThreadSafeDelegate( + std::unique_ptr delegate) { + return std::unique_ptr( + new ThreadSafeDelegate(shared_from_this(), std::move(delegate))); +} + +bool MainThreadHandle::Expired() { + Mutex::ScopedLock scoped_lock(block_lock_); + return main_thread_ == nullptr; +} +} // namespace inspector +} // namespace node diff --git a/src/inspector/main_thread_interface.h b/src/inspector/main_thread_interface.h new file mode 100644 index 0000000000..75df5ffe80 --- /dev/null +++ b/src/inspector/main_thread_interface.h @@ -0,0 +1,99 @@ +#ifndef SRC_INSPECTOR_MAIN_THREAD_INTERFACE_H_ +#define SRC_INSPECTOR_MAIN_THREAD_INTERFACE_H_ + +#if !HAVE_INSPECTOR +#error("This header can only be used when inspector is enabled") +#endif + +#include "env.h" +#include "inspector_agent.h" +#include "node_mutex.h" + +#include +#include +#include +#include + +namespace v8_inspector { +class StringBuffer; +class StringView; +} // namespace v8_inspector + +namespace node { +namespace inspector { +class Request { + public: + virtual void Call() = 0; + virtual ~Request() {} +}; + +std::unique_ptr Utf8ToStringView( + const std::string& message); + +using MessageQueue = std::deque>; +class MainThreadInterface; + +class MainThreadHandle : public std::enable_shared_from_this { + public: + explicit MainThreadHandle(MainThreadInterface* main_thread) + : main_thread_(main_thread) {} + ~MainThreadHandle() { + CHECK_NULL(main_thread_); // main_thread_ should have called Reset + } + std::unique_ptr Connect( + std::unique_ptr delegate, + bool prevent_shutdown); + bool Post(std::unique_ptr request); + Agent* GetInspectorAgent(); + std::unique_ptr MakeThreadSafeDelegate( + std::unique_ptr delegate); + bool Expired(); + + private: + void Reset(); + + MainThreadInterface* main_thread_; + Mutex block_lock_; + int next_session_id_ = 0; + + friend class MainThreadInterface; +}; + +class MainThreadInterface { + public: + MainThreadInterface(Agent* agent, uv_loop_t*, v8::Isolate* isolate, + v8::Platform* platform); + ~MainThreadInterface(); + + void DispatchMessages(); + void Post(std::unique_ptr request); + bool WaitForFrontendEvent(); + std::shared_ptr GetHandle(); + Agent* inspector_agent() { + return agent_; + } + + private: + using AsyncAndInterface = std::pair; + + static void DispatchMessagesAsyncCallback(uv_async_t* async); + static void CloseAsync(AsyncAndInterface*); + + MessageQueue requests_; + Mutex requests_lock_; // requests_ live across threads + // This queue is to maintain the order of the messages for the cases + // when we reenter the DispatchMessages function. + MessageQueue dispatching_message_queue_; + bool dispatching_messages_ = false; + ConditionVariable incoming_message_cond_; + // Used from any thread + Agent* const agent_; + v8::Isolate* const isolate_; + v8::Platform* const platform_; + DeleteFnPtr main_thread_request_; + std::shared_ptr handle_; +}; + +} // namespace inspector +} // namespace node +#endif // SRC_INSPECTOR_MAIN_THREAD_INTERFACE_H_ diff --git a/src/inspector_agent.cc b/src/inspector_agent.cc index 2b03fd05b1..ebb8d8a161 100644 --- a/src/inspector_agent.cc +++ b/src/inspector_agent.cc @@ -1,6 +1,7 @@ #include "inspector_agent.h" #include "inspector_io.h" +#include "inspector/main_thread_interface.h" #include "inspector/node_string.h" #include "inspector/tracing_agent.h" #include "node/inspector/protocol/Protocol.h" @@ -49,7 +50,7 @@ class StartIoTask : public v8::Task { explicit StartIoTask(Agent* agent) : agent(agent) {} void Run() override { - agent->StartIoThread(false); + agent->StartIoThread(); } private: @@ -64,11 +65,11 @@ std::unique_ptr ToProtocolString(Isolate* isolate, // Called on the main thread. void StartIoThreadAsyncCallback(uv_async_t* handle) { - static_cast(handle->data)->StartIoThread(false); + static_cast(handle->data)->StartIoThread(); } void StartIoInterrupt(Isolate* isolate, void* agent) { - static_cast(agent)->StartIoThread(false); + static_cast(agent)->StartIoThread(); } @@ -195,8 +196,10 @@ class ChannelImpl final : public v8_inspector::V8Inspector::Channel, public: explicit ChannelImpl(Environment* env, const std::unique_ptr& inspector, - std::unique_ptr delegate) - : delegate_(std::move(delegate)) { + std::unique_ptr delegate, + bool prevent_shutdown) + : delegate_(std::move(delegate)), + prevent_shutdown_(prevent_shutdown) { session_ = inspector->connect(1, this, StringView()); node_dispatcher_.reset(new protocol::UberDispatcher(this)); tracing_agent_.reset(new protocol::TracingAgent(env)); @@ -208,7 +211,7 @@ class ChannelImpl final : public v8_inspector::V8Inspector::Channel, tracing_agent_.reset(); // Dispose before the dispatchers } - void dispatchProtocolMessage(const StringView& message) { + std::string dispatchProtocolMessage(const StringView& message) { std::unique_ptr parsed; std::string method; node_dispatcher_->getCommandName( @@ -219,6 +222,7 @@ class ChannelImpl final : public v8_inspector::V8Inspector::Channel, } else { node_dispatcher_->dispatch(std::move(parsed)); } + return method; } void schedulePauseOnNextStatement(const std::string& reason) { @@ -226,6 +230,10 @@ class ChannelImpl final : public v8_inspector::V8Inspector::Channel, session_->schedulePauseOnNextStatement(buffer->string(), buffer->string()); } + bool preventShutdown() { + return prevent_shutdown_; + } + private: void sendResponse( int callId, @@ -263,6 +271,7 @@ class ChannelImpl final : public v8_inspector::V8Inspector::Channel, std::unique_ptr delegate_; std::unique_ptr session_; std::unique_ptr node_dispatcher_; + bool prevent_shutdown_; }; class InspectorTimer { @@ -324,6 +333,44 @@ class InspectorTimerHandle { private: InspectorTimer* timer_; }; + +class SameThreadInspectorSession : public InspectorSession { + public: + SameThreadInspectorSession( + int session_id, std::shared_ptr client) + : session_id_(session_id), client_(client) {} + ~SameThreadInspectorSession() override; + void Dispatch(const v8_inspector::StringView& message) override; + + private: + int session_id_; + std::weak_ptr client_; +}; + +void NotifyClusterWorkersDebugEnabled(Environment* env) { + v8::Isolate* isolate = env->isolate(); + HandleScope handle_scope(isolate); + auto context = env->context(); + + // Send message to enable debug in cluster workers + Local process_object = env->process_object(); + Local emit_fn = + process_object->Get(context, FIXED_ONE_BYTE_STRING(isolate, "emit")) + .ToLocalChecked(); + // In case the thread started early during the startup + if (!emit_fn->IsFunction()) + return; + + Local message = Object::New(isolate); + message->Set(context, FIXED_ONE_BYTE_STRING(isolate, "cmd"), + FIXED_ONE_BYTE_STRING(isolate, "NODE_DEBUG_ENABLED")).FromJust(); + Local argv[] = { + FIXED_ONE_BYTE_STRING(isolate, "internalMessage"), + message + }; + MakeCallback(env->isolate(), process_object, emit_fn.As(), + arraysize(argv), argv, {0, 0}); +} } // namespace class NodeInspectorClient : public V8InspectorClient { @@ -337,31 +384,18 @@ class NodeInspectorClient : public V8InspectorClient { } void runMessageLoopOnPause(int context_group_id) override { - runMessageLoop(false); + waiting_for_resume_ = true; + runMessageLoop(); } - void runMessageLoop(bool ignore_terminated) { - if (running_nested_loop_) - return; - terminated_ = false; - running_nested_loop_ = true; - MultiIsolatePlatform* platform = env_->isolate_data()->platform(); - while ((ignore_terminated || !terminated_) && waitForFrontendEvent()) { - while (platform->FlushForegroundTasks(env_->isolate())) {} - } - terminated_ = false; - running_nested_loop_ = false; + void waitForIoShutdown() { + waiting_for_io_shutdown_ = true; + runMessageLoop(); } - bool waitForFrontendEvent() { - InspectorIo* io = env_->inspector_agent()->io(); - if (io == nullptr) - return false; - return io->WaitForFrontendEvent(); - } - - double currentTimeMS() override { - return uv_hrtime() * 1.0 / NANOS_PER_MSEC; + void waitForFrontend() { + waiting_for_frontend_ = true; + runMessageLoop(); } void maxAsyncCallStackDepthChanged(int depth) override { @@ -398,16 +432,17 @@ class NodeInspectorClient : public V8InspectorClient { } void quitMessageLoopOnPause() override { - terminated_ = true; + waiting_for_resume_ = false; } - int connectFrontend(std::unique_ptr delegate) { + int connectFrontend(std::unique_ptr delegate, + bool prevent_shutdown) { events_dispatched_ = true; int session_id = next_session_id_++; // TODO(addaleax): Revert back to using make_unique once we get issues // with CI resolved (i.e. revert the patch that added this comment). channels_[session_id].reset( - new ChannelImpl(env_, client_, std::move(delegate))); + new ChannelImpl(env_, client_, std::move(delegate), prevent_shutdown)); return session_id; } @@ -418,7 +453,10 @@ class NodeInspectorClient : public V8InspectorClient { void dispatchMessageFromFrontend(int session_id, const StringView& message) { events_dispatched_ = true; - channels_[session_id]->dispatchProtocolMessage(message); + std::string method = + channels_[session_id]->dispatchProtocolMessage(message); + if (waiting_for_frontend_) + waiting_for_frontend_ = method != "Runtime.runIfWaitingForDebugger"; } Local ensureDefaultContextInGroup(int contextGroupId) override { @@ -509,116 +547,150 @@ class NodeInspectorClient : public V8InspectorClient { } bool hasConnectedSessions() { + for (const auto& id_channel : channels_) { + // Other sessions are "invisible" more most purposes + if (id_channel.second->preventShutdown()) + return true; + } + return false; + } + + std::shared_ptr getThreadHandle() { + if (interface_ == nullptr) { + interface_.reset(new MainThreadInterface( + env_->inspector_agent(), env_->event_loop(), env_->isolate(), + env_->isolate_data()->platform())); + } + return interface_->GetHandle(); + } + + bool IsActive() { return !channels_.empty(); } private: + bool shouldRunMessageLoop() { + if (waiting_for_frontend_) + return true; + if (waiting_for_io_shutdown_ || waiting_for_resume_) + return hasConnectedSessions(); + return false; + } + + void runMessageLoop() { + if (running_nested_loop_) + return; + + running_nested_loop_ = true; + + MultiIsolatePlatform* platform = env_->isolate_data()->platform(); + while (shouldRunMessageLoop()) { + if (interface_ && hasConnectedSessions()) + interface_->WaitForFrontendEvent(); + while (platform->FlushForegroundTasks(env_->isolate())) {} + } + running_nested_loop_ = false; + } + + double currentTimeMS() override { + return uv_hrtime() * 1.0 / NANOS_PER_MSEC; + } + node::Environment* env_; - bool terminated_ = false; bool running_nested_loop_ = false; std::unique_ptr client_; std::unordered_map> channels_; std::unordered_map timers_; int next_session_id_ = 1; bool events_dispatched_ = false; + bool waiting_for_resume_ = false; + bool waiting_for_frontend_ = false; + bool waiting_for_io_shutdown_ = false; + // Allows accessing Inspector from non-main threads + std::unique_ptr interface_; }; Agent::Agent(Environment* env) : parent_env_(env) {} -// Destructor needs to be defined here in implementation file as the header -// does not have full definition of some classes. -Agent::~Agent() { -} +Agent::~Agent() = default; -bool Agent::Start(const char* path, const DebugOptions& options) { - path_ = path == nullptr ? "" : path; +bool Agent::Start(const std::string& path, const DebugOptions& options) { + path_ = path; debug_options_ = options; client_ = std::make_shared(parent_env_); - CHECK_EQ(0, uv_async_init(uv_default_loop(), - &start_io_thread_async, - StartIoThreadAsyncCallback)); - start_io_thread_async.data = this; - uv_unref(reinterpret_cast(&start_io_thread_async)); + if (parent_env_->is_main_thread()) { + CHECK_EQ(0, uv_async_init(parent_env_->event_loop(), + &start_io_thread_async, + StartIoThreadAsyncCallback)); + uv_unref(reinterpret_cast(&start_io_thread_async)); + start_io_thread_async.data = this; + // Ignore failure, SIGUSR1 won't work, but that should not block node start. + StartDebugSignalHandler(); + } - // Ignore failure, SIGUSR1 won't work, but that should not block node start. - StartDebugSignalHandler(); - if (options.inspector_enabled()) { - // This will return false if listen failed on the inspector port. - return StartIoThread(options.wait_for_connect()); + bool wait_for_connect = options.wait_for_connect(); + if (!options.inspector_enabled() || !StartIoThread()) { + return false; + } + if (wait_for_connect) { + HandleScope scope(parent_env_->isolate()); + parent_env_->process_object()->DefineOwnProperty( + parent_env_->context(), + FIXED_ONE_BYTE_STRING(parent_env_->isolate(), "_breakFirstLine"), + True(parent_env_->isolate()), + static_cast(v8::ReadOnly | v8::DontEnum)) + .FromJust(); + client_->waitForFrontend(); } return true; } -bool Agent::StartIoThread(bool wait_for_connect) { +bool Agent::StartIoThread() { if (io_ != nullptr) return true; CHECK_NOT_NULL(client_); - io_ = std::unique_ptr( - new InspectorIo(parent_env_, path_, debug_options_, wait_for_connect)); - if (!io_->Start()) { - client_.reset(); + io_ = InspectorIo::Start( + client_->getThreadHandle(), path_, debug_options_); + if (io_ == nullptr) { return false; } - - v8::Isolate* isolate = parent_env_->isolate(); - HandleScope handle_scope(isolate); - auto context = parent_env_->context(); - - // Send message to enable debug in workers - Local process_object = parent_env_->process_object(); - Local emit_fn = - process_object->Get(context, FIXED_ONE_BYTE_STRING(isolate, "emit")) - .ToLocalChecked(); - // In case the thread started early during the startup - if (!emit_fn->IsFunction()) - return true; - - Local message = Object::New(isolate); - message->Set(context, FIXED_ONE_BYTE_STRING(isolate, "cmd"), - FIXED_ONE_BYTE_STRING(isolate, "NODE_DEBUG_ENABLED")).FromJust(); - Local argv[] = { - FIXED_ONE_BYTE_STRING(isolate, "internalMessage"), - message - }; - MakeCallback(parent_env_->isolate(), process_object, emit_fn.As(), - arraysize(argv), argv, {0, 0}); - + NotifyClusterWorkersDebugEnabled(parent_env_); return true; } void Agent::Stop() { - if (io_ != nullptr) { - io_->Stop(); - io_.reset(); - } + io_.reset(); } std::unique_ptr Agent::Connect( - std::unique_ptr delegate) { - int session_id = client_->connectFrontend(std::move(delegate)); + std::unique_ptr delegate, + bool prevent_shutdown) { + CHECK_NOT_NULL(client_); + int session_id = client_->connectFrontend(std::move(delegate), + prevent_shutdown); return std::unique_ptr( - new InspectorSession(session_id, client_)); + new SameThreadInspectorSession(session_id, client_)); } void Agent::WaitForDisconnect() { CHECK_NOT_NULL(client_); + if (client_->hasConnectedSessions()) { + fprintf(stderr, "Waiting for the debugger to disconnect...\n"); + fflush(stderr); + } // TODO(addaleax): Maybe this should use an at-exit hook for the Environment // or something similar? client_->contextDestroyed(parent_env_->context()); if (io_ != nullptr) { - io_->WaitForDisconnect(); - // There is a bug in V8 Inspector (https://crbug.com/834056) that - // calls V8InspectorClient::quitMessageLoopOnPause when a session - // disconnects. We are using this flag to ignore those calls so the message - // loop is spinning as long as there's a reason to expect inspector messages - client_->runMessageLoop(true); + io_->StopAcceptingNewConnections(); + client_->waitForIoShutdown(); } } void Agent::FatalException(Local error, Local message) { - if (!IsStarted()) + if (!IsListening()) return; client_->FatalException(error, message); WaitForDisconnect(); @@ -718,26 +790,35 @@ void Agent::ContextCreated(Local context, const ContextInfo& info) { client_->contextCreated(context, info); } -bool Agent::IsWaitingForConnect() { +bool Agent::WillWaitForConnect() { return debug_options_.wait_for_connect(); } -bool Agent::HasConnectedSessions() { +bool Agent::IsActive() { if (client_ == nullptr) return false; - return client_->hasConnectedSessions(); + return io_ != nullptr || client_->IsActive(); } -InspectorSession::InspectorSession(int session_id, - std::shared_ptr client) - : session_id_(session_id), client_(client) {} - -InspectorSession::~InspectorSession() { - client_->disconnectFrontend(session_id_); +void Agent::WaitForConnect() { + CHECK_NOT_NULL(client_); + client_->waitForFrontend(); } -void InspectorSession::Dispatch(const StringView& message) { - client_->dispatchMessageFromFrontend(session_id_, message); +SameThreadInspectorSession::~SameThreadInspectorSession() { + auto client = client_.lock(); + if (client) + client->disconnectFrontend(session_id_); } + +void SameThreadInspectorSession::Dispatch( + const v8_inspector::StringView& message) { + auto client = client_.lock(); + if (client) + client->dispatchMessageFromFrontend(session_id_, message); +} + + + } // namespace inspector } // namespace node diff --git a/src/inspector_agent.h b/src/inspector_agent.h index 7295d048b6..dcd6e13aba 100644 --- a/src/inspector_agent.h +++ b/src/inspector_agent.h @@ -20,7 +20,6 @@ class StringView; namespace node { // Forward declaration to break recursive dependency chain with src/env.h. class Environment; -class NodePlatform; struct ContextInfo; namespace inspector { @@ -29,12 +28,8 @@ class NodeInspectorClient; class InspectorSession { public: - InspectorSession(int session_id, std::shared_ptr client); - ~InspectorSession(); - void Dispatch(const v8_inspector::StringView& message); - private: - int session_id_; - std::shared_ptr client_; + virtual ~InspectorSession() {} + virtual void Dispatch(const v8_inspector::StringView& message) = 0; }; class InspectorSessionDelegate { @@ -50,15 +45,21 @@ class Agent { ~Agent(); // Create client_, may create io_ if option enabled - bool Start(const char* path, const DebugOptions& options); + bool Start(const std::string& path, const DebugOptions& options); // Stop and destroy io_ void Stop(); - bool IsStarted() { return !!client_; } - - // IO thread started, and client connected - bool IsWaitingForConnect(); + bool IsListening() { return io_ != nullptr; } + // Returns true if the Node inspector is actually in use. It will be true + // if either the user explicitely opted into inspector (e.g. with the + // --inspect command line flag) or if inspector JS API had been used. + bool IsActive(); + // Option is set to wait for session connection + bool WillWaitForConnect(); + // Blocks till frontend connects and sends "runIfWaitingForDebugger" + void WaitForConnect(); + // Blocks till all the sessions with "WaitForDisconnectOnShutdown" disconnect void WaitForDisconnect(); void FatalException(v8::Local error, v8::Local message); @@ -77,22 +78,20 @@ class Agent { void EnableAsyncHook(); void DisableAsyncHook(); - // Called by the WS protocol and JS binding to create inspector sessions. + // Called to create inspector sessions that can be used from the main thread. // The inspector responds by using the delegate to send messages back. std::unique_ptr Connect( - std::unique_ptr delegate); + std::unique_ptr delegate, + bool prevent_shutdown); void PauseOnNextJavascriptStatement(const std::string& reason); - // Returns true as long as there is at least one connected session. - bool HasConnectedSessions(); - InspectorIo* io() { return io_.get(); } // Can only be called from the main thread. - bool StartIoThread(bool wait_for_connect); + bool StartIoThread(); // Calls StartIoThread() from off the main thread. void RequestIoThreadStart(); @@ -105,7 +104,9 @@ class Agent { const node::Persistent& fn); node::Environment* parent_env_; + // Encapsulates majority of the Inspector functionality std::shared_ptr client_; + // Interface for transports, e.g. WebSocket server std::unique_ptr io_; std::string path_; DebugOptions debug_options_; diff --git a/src/inspector_io.cc b/src/inspector_io.cc index 78ecce7398..41fea546a8 100644 --- a/src/inspector_io.cc +++ b/src/inspector_io.cc @@ -1,6 +1,7 @@ #include "inspector_io.h" #include "inspector_socket_server.h" +#include "inspector/main_thread_interface.h" #include "inspector/node_string.h" #include "env-inl.h" #include "debug_utils.h" @@ -11,23 +12,16 @@ #include "util.h" #include "zlib.h" -#include -#include - +#include #include #include - namespace node { namespace inspector { namespace { -using AsyncAndAgent = std::pair; using v8_inspector::StringBuffer; using v8_inspector::StringView; -template -using TransportAndIo = std::pair; - std::string ScriptPath(uv_loop_t* loop, const std::string& script_name) { std::string script_path; @@ -64,45 +58,151 @@ std::string GenerateID() { return uuid; } -void HandleSyncCloseCb(uv_handle_t* handle) { - *static_cast(handle->data) = true; -} +class RequestToServer { + public: + RequestToServer(TransportAction action, + int session_id, + std::unique_ptr message) + : action_(action), + session_id_(session_id), + message_(std::move(message)) {} -void CloseAsyncAndLoop(uv_async_t* async) { - bool is_closed = false; - async->data = &is_closed; - uv_close(reinterpret_cast(async), HandleSyncCloseCb); - while (!is_closed) - uv_run(async->loop, UV_RUN_ONCE); - async->data = nullptr; - CheckedUvLoopClose(async->loop); -} + void Dispatch(InspectorSocketServer* server) const { + switch (action_) { + case TransportAction::kKill: + server->TerminateConnections(); + // Fallthrough + case TransportAction::kStop: + server->Stop(); + break; + case TransportAction::kSendMessage: + server->Send( + session_id_, + protocol::StringUtil::StringViewToUtf8(message_->string())); + break; + } + } -// Delete main_thread_req_ on async handle close -void ReleasePairOnAsyncClose(uv_handle_t* async) { - std::unique_ptr pair(node::ContainerOf(&AsyncAndAgent::first, - reinterpret_cast(async))); - // Unique_ptr goes out of scope here and pointer is deleted. -} + private: + TransportAction action_; + int session_id_; + std::unique_ptr message_; +}; +class RequestQueueData { + public: + using MessageQueue = std::deque; + + explicit RequestQueueData(uv_loop_t* loop) + : handle_(std::make_shared(this)) { + int err = uv_async_init(loop, &async_, [](uv_async_t* async) { + RequestQueueData* wrapper = + node::ContainerOf(&RequestQueueData::async_, async); + wrapper->DoDispatch(); + }); + CHECK_EQ(0, err); + } + + static void CloseAndFree(RequestQueueData* queue); + + void Post(int session_id, + TransportAction action, + std::unique_ptr message) { + Mutex::ScopedLock scoped_lock(state_lock_); + bool notify = messages_.empty(); + messages_.emplace_back(action, session_id, std::move(message)); + if (notify) { + CHECK_EQ(0, uv_async_send(&async_)); + incoming_message_cond_.Broadcast(scoped_lock); + } + } + + void Wait() { + Mutex::ScopedLock scoped_lock(state_lock_); + if (messages_.empty()) { + incoming_message_cond_.Wait(scoped_lock); + } + } + + void SetServer(InspectorSocketServer* server) { + server_ = server; + } + + std::shared_ptr handle() { + return handle_; + } + + private: + ~RequestQueueData() = default; + + MessageQueue GetMessages() { + Mutex::ScopedLock scoped_lock(state_lock_); + MessageQueue messages; + messages_.swap(messages); + return messages; + } + + void DoDispatch() { + if (server_ == nullptr) + return; + for (const auto& request : GetMessages()) { + request.Dispatch(server_); + } + } + + std::shared_ptr handle_; + uv_async_t async_; + InspectorSocketServer* server_ = nullptr; + MessageQueue messages_; + Mutex state_lock_; // Locked before mutating the queue. + ConditionVariable incoming_message_cond_; +}; } // namespace -std::unique_ptr Utf8ToStringView(const std::string& message) { - icu::UnicodeString utf16 = - icu::UnicodeString::fromUTF8(icu::StringPiece(message.data(), - message.length())); - StringView view(reinterpret_cast(utf16.getBuffer()), - utf16.length()); - return StringBuffer::create(view); -} +class RequestQueue { + public: + explicit RequestQueue(RequestQueueData* data) : data_(data) {} + void Reset() { + Mutex::ScopedLock scoped_lock(lock_); + data_ = nullptr; + } + + void Post(int session_id, + TransportAction action, + std::unique_ptr message) { + Mutex::ScopedLock scoped_lock(lock_); + if (data_ != nullptr) + data_->Post(session_id, action, std::move(message)); + } + + void SetServer(InspectorSocketServer* server) { + Mutex::ScopedLock scoped_lock(lock_); + if (data_ != nullptr) + data_->SetServer(server); + } + + bool Expired() { + Mutex::ScopedLock scoped_lock(lock_); + return data_ == nullptr; + } + + private: + RequestQueueData* data_; + Mutex lock_; +}; class IoSessionDelegate : public InspectorSessionDelegate { public: - explicit IoSessionDelegate(InspectorIo* io, int id) : io_(io), id_(id) { } - void SendMessageToFrontend(const v8_inspector::StringView& message) override; + explicit IoSessionDelegate(std::shared_ptr queue, int id) + : request_queue_(queue), id_(id) { } + void SendMessageToFrontend(const v8_inspector::StringView& message) override { + request_queue_->Post(id_, TransportAction::kSendMessage, + StringBuffer::create(message)); + } + private: - InspectorIo* io_; + std::shared_ptr request_queue_; int id_; }; @@ -110,361 +210,133 @@ class IoSessionDelegate : public InspectorSessionDelegate { // mostly session start, message received, and session end. class InspectorIoDelegate: public node::inspector::SocketServerDelegate { public: - InspectorIoDelegate(InspectorIo* io, const std::string& target_id, + InspectorIoDelegate(std::shared_ptr queue, + std::shared_ptr main_threade, + const std::string& target_id, const std::string& script_path, - const std::string& script_name, bool wait); + const std::string& script_name); ~InspectorIoDelegate() { - io_->ServerDone(); } - // Calls PostIncomingMessage() with appropriate InspectorAction: - // kStartSession + void StartSession(int session_id, const std::string& target_id) override; - // kSendMessage void MessageReceived(int session_id, const std::string& message) override; - // kEndSession void EndSession(int session_id) override; std::vector GetTargetIds() override; std::string GetTargetTitle(const std::string& id) override; std::string GetTargetUrl(const std::string& id) override; - void AssignServer(InspectorSocketServer* server) override { - server_ = server; + request_queue_->SetServer(server); } private: - InspectorIo* io_; - int session_id_; + std::shared_ptr request_queue_; + std::shared_ptr main_thread_; + std::unordered_map> sessions_; const std::string script_name_; const std::string script_path_; const std::string target_id_; - bool waiting_; - InspectorSocketServer* server_; }; -void InterruptCallback(v8::Isolate*, void* agent) { - InspectorIo* io = static_cast(agent)->io(); - if (io != nullptr) - io->DispatchMessages(); +// static +std::unique_ptr InspectorIo::Start( + std::shared_ptr main_thread, + const std::string& path, + const DebugOptions& options) { + auto io = std::unique_ptr( + new InspectorIo(main_thread, path, options)); + if (io->request_queue_->Expired()) { // Thread is not running + return nullptr; + } + return io; } -class DispatchMessagesTask : public v8::Task { - public: - explicit DispatchMessagesTask(Agent* agent) : agent_(agent) {} - - void Run() override { - InspectorIo* io = agent_->io(); - if (io != nullptr) - io->DispatchMessages(); - } - - private: - Agent* agent_; -}; - -InspectorIo::InspectorIo(Environment* env, const std::string& path, - const DebugOptions& options, bool wait_for_connect) - : options_(options), thread_(), state_(State::kNew), - parent_env_(env), thread_req_(), - platform_(parent_env_->isolate_data()->platform()), - dispatching_messages_(false), script_name_(path), - wait_for_connect_(wait_for_connect), port_(-1), - id_(GenerateID()) { - main_thread_req_ = new AsyncAndAgent({uv_async_t(), env->inspector_agent()}); - CHECK_EQ(0, uv_async_init(env->event_loop(), &main_thread_req_->first, - InspectorIo::MainThreadReqAsyncCb)); - uv_unref(reinterpret_cast(&main_thread_req_->first)); - CHECK_EQ(0, uv_sem_init(&thread_start_sem_, 0)); +InspectorIo::InspectorIo(std::shared_ptr main_thread, + const std::string& path, + const DebugOptions& options) + : main_thread_(main_thread), options_(options), + thread_(), script_name_(path), id_(GenerateID()) { + Mutex::ScopedLock scoped_lock(thread_start_lock_); + CHECK_EQ(uv_thread_create(&thread_, InspectorIo::ThreadMain, this), 0); + thread_start_condition_.Wait(scoped_lock); } InspectorIo::~InspectorIo() { - uv_sem_destroy(&thread_start_sem_); - uv_close(reinterpret_cast(&main_thread_req_->first), - ReleasePairOnAsyncClose); -} - -bool InspectorIo::Start() { - CHECK_EQ(state_, State::kNew); - CHECK_EQ(uv_thread_create(&thread_, InspectorIo::ThreadMain, this), 0); - uv_sem_wait(&thread_start_sem_); - - if (state_ == State::kError) { - return false; - } - state_ = State::kAccepting; - if (wait_for_connect_) { - DispatchMessages(); - } - return true; -} - -void InspectorIo::Stop() { - CHECK_IMPLIES(sessions_.empty(), state_ == State::kAccepting); - Write(TransportAction::kKill, 0, StringView()); + request_queue_->Post(0, TransportAction::kKill, nullptr); int err = uv_thread_join(&thread_); CHECK_EQ(err, 0); - state_ = State::kShutDown; - DispatchMessages(); } -bool InspectorIo::IsStarted() { - return platform_ != nullptr; -} - -void InspectorIo::WaitForDisconnect() { - if (state_ == State::kAccepting) - state_ = State::kDone; - if (!sessions_.empty()) { - state_ = State::kShutDown; - Write(TransportAction::kStop, 0, StringView()); - fprintf(stderr, "Waiting for the debugger to disconnect...\n"); - fflush(stderr); - } +void InspectorIo::StopAcceptingNewConnections() { + request_queue_->Post(0, TransportAction::kStop, nullptr); } // static void InspectorIo::ThreadMain(void* io) { - static_cast(io)->ThreadMain(); + static_cast(io)->ThreadMain(); } -// static -template -void InspectorIo::IoThreadAsyncCb(uv_async_t* async) { - TransportAndIo* transport_and_io = - static_cast*>(async->data); - if (transport_and_io == nullptr) { - return; - } - Transport* transport = transport_and_io->first; - InspectorIo* io = transport_and_io->second; - MessageQueue outgoing_message_queue; - io->SwapBehindLock(&io->outgoing_message_queue_, &outgoing_message_queue); - for (const auto& outgoing : outgoing_message_queue) { - int session_id = std::get<1>(outgoing); - switch (std::get<0>(outgoing)) { - case TransportAction::kKill: - transport->TerminateConnections(); - // Fallthrough - case TransportAction::kStop: - transport->Stop(); - break; - case TransportAction::kSendMessage: - transport->Send(session_id, - protocol::StringUtil::StringViewToUtf8( - std::get<2>(outgoing)->string())); - break; - case TransportAction::kAcceptSession: - transport->AcceptSession(session_id); - break; - case TransportAction::kDeclineSession: - transport->DeclineSession(session_id); - break; - } - } -} - -template void InspectorIo::ThreadMain() { uv_loop_t loop; loop.data = nullptr; int err = uv_loop_init(&loop); CHECK_EQ(err, 0); - thread_req_.data = nullptr; - err = uv_async_init(&loop, &thread_req_, IoThreadAsyncCb); - CHECK_EQ(err, 0); + std::shared_ptr queue(new RequestQueueData(&loop), + RequestQueueData::CloseAndFree); std::string script_path = ScriptPath(&loop, script_name_); - auto delegate = std::unique_ptr( - new InspectorIoDelegate(this, id_, script_path, script_name_, - wait_for_connect_)); - Transport server(std::move(delegate), &loop, options_.host_name(), - options_.port()); - TransportAndIo queue_transport(&server, this); - thread_req_.data = &queue_transport; - if (!server.Start()) { - state_ = State::kError; // Safe, main thread is waiting on semaphore - CloseAsyncAndLoop(&thread_req_); - uv_sem_post(&thread_start_sem_); - return; - } - port_ = server.Port(); // Safe, main thread is waiting on semaphore. - if (!wait_for_connect_) { - uv_sem_post(&thread_start_sem_); + std::unique_ptr delegate( + new InspectorIoDelegate(queue, main_thread_, id_, + script_path, script_name_)); + InspectorSocketServer server(std::move(delegate), &loop, + options_.host_name(), options_.port()); + request_queue_ = queue->handle(); + // Its lifetime is now that of the server delegate + queue.reset(); + { + Mutex::ScopedLock scoped_lock(thread_start_lock_); + if (server.Start()) { + port_ = server.Port(); + } + thread_start_condition_.Broadcast(scoped_lock); } uv_run(&loop, UV_RUN_DEFAULT); - thread_req_.data = nullptr; CheckedUvLoopClose(&loop); } -template -bool InspectorIo::AppendMessage(MessageQueue* queue, - ActionType action, int session_id, - std::unique_ptr buffer) { - Mutex::ScopedLock scoped_lock(state_lock_); - bool trigger_pumping = queue->empty(); - queue->push_back(std::make_tuple(action, session_id, std::move(buffer))); - return trigger_pumping; -} - -template -void InspectorIo::SwapBehindLock(MessageQueue* vector1, - MessageQueue* vector2) { - Mutex::ScopedLock scoped_lock(state_lock_); - vector1->swap(*vector2); -} - -void InspectorIo::PostIncomingMessage(InspectorAction action, int session_id, - const std::string& message) { - Debug(parent_env_, DebugCategory::INSPECTOR_SERVER, - ">>> %s\n", message.c_str()); - if (AppendMessage(&incoming_message_queue_, action, session_id, - Utf8ToStringView(message))) { - Agent* agent = main_thread_req_->second; - v8::Isolate* isolate = parent_env_->isolate(); - platform_->CallOnForegroundThread(isolate, - new DispatchMessagesTask(agent)); - isolate->RequestInterrupt(InterruptCallback, agent); - CHECK_EQ(0, uv_async_send(&main_thread_req_->first)); - } - Mutex::ScopedLock scoped_lock(state_lock_); - incoming_message_cond_.Broadcast(scoped_lock); -} - std::vector InspectorIo::GetTargetIds() const { return { id_ }; } -TransportAction InspectorIo::Attach(int session_id) { - Agent* agent = parent_env_->inspector_agent(); - fprintf(stderr, "Debugger attached.\n"); - sessions_[session_id] = agent->Connect(std::unique_ptr( - new IoSessionDelegate(this, session_id))); - return TransportAction::kAcceptSession; -} - -void InspectorIo::DispatchMessages() { - if (dispatching_messages_) - return; - dispatching_messages_ = true; - bool had_messages = false; - do { - if (dispatching_message_queue_.empty()) - SwapBehindLock(&incoming_message_queue_, &dispatching_message_queue_); - had_messages = !dispatching_message_queue_.empty(); - while (!dispatching_message_queue_.empty()) { - MessageQueue::value_type task; - std::swap(dispatching_message_queue_.front(), task); - dispatching_message_queue_.pop_front(); - int id = std::get<1>(task); - StringView message = std::get<2>(task)->string(); - switch (std::get<0>(task)) { - case InspectorAction::kStartSession: - Write(Attach(id), id, StringView()); - break; - case InspectorAction::kStartSessionUnconditionally: - Attach(id); - break; - case InspectorAction::kEndSession: - sessions_.erase(id); - if (!sessions_.empty()) - continue; - if (state_ == State::kShutDown) { - state_ = State::kDone; - } else { - state_ = State::kAccepting; - } - break; - case InspectorAction::kSendMessage: - auto session = sessions_.find(id); - if (session != sessions_.end() && session->second) { - session->second->Dispatch(message); - } - break; - } - } - } while (had_messages); - dispatching_messages_ = false; -} - -// static -void InspectorIo::MainThreadReqAsyncCb(uv_async_t* req) { - AsyncAndAgent* pair = node::ContainerOf(&AsyncAndAgent::first, req); - // Note that this may be called after io was closed or even after a new - // one was created and ran. - InspectorIo* io = pair->second->io(); - if (io != nullptr) - io->DispatchMessages(); -} - -void InspectorIo::Write(TransportAction action, int session_id, - const StringView& inspector_message) { - std::string message_str = - protocol::StringUtil::StringViewToUtf8(inspector_message); - Debug(parent_env_, DebugCategory::INSPECTOR_SERVER, - "<<< %s\n", message_str.c_str()); - AppendMessage(&outgoing_message_queue_, action, session_id, - StringBuffer::create(inspector_message)); - int err = uv_async_send(&thread_req_); - CHECK_EQ(0, err); -} - -bool InspectorIo::WaitForFrontendEvent() { - // We allow DispatchMessages reentry as we enter the pause. This is important - // to support debugging the code invoked by an inspector call, such - // as Runtime.evaluate - dispatching_messages_ = false; - Mutex::ScopedLock scoped_lock(state_lock_); - if (sessions_.empty()) - return false; - if (dispatching_message_queue_.empty() && incoming_message_queue_.empty()) { - incoming_message_cond_.Wait(scoped_lock); - } - return true; -} - -InspectorIoDelegate::InspectorIoDelegate(InspectorIo* io, - const std::string& target_id, - const std::string& script_path, - const std::string& script_name, - bool wait) - : io_(io), - session_id_(0), - script_name_(script_name), - script_path_(script_path), - target_id_(target_id), - waiting_(wait), - server_(nullptr) { } - +InspectorIoDelegate::InspectorIoDelegate( + std::shared_ptr queue, + std::shared_ptr main_thread, + const std::string& target_id, + const std::string& script_path, + const std::string& script_name) + : request_queue_(queue), main_thread_(main_thread), + script_name_(script_name), script_path_(script_path), + target_id_(target_id) {} void InspectorIoDelegate::StartSession(int session_id, const std::string& target_id) { - session_id_ = session_id; - InspectorAction action = InspectorAction::kStartSession; - if (waiting_) { - action = InspectorAction::kStartSessionUnconditionally; - server_->AcceptSession(session_id); + auto session = main_thread_->Connect( + std::unique_ptr( + new IoSessionDelegate(request_queue_->handle(), session_id)), true); + if (session) { + sessions_[session_id] = std::move(session); + fprintf(stderr, "Debugger attached.\n"); } - io_->PostIncomingMessage(action, session_id, ""); } void InspectorIoDelegate::MessageReceived(int session_id, const std::string& message) { - // TODO(pfeldman): Instead of blocking execution while debugger - // engages, node should wait for the run callback from the remote client - // and initiate its startup. This is a change to node.cc that should be - // upstreamed separately. - if (waiting_) { - if (message.find("\"Runtime.runIfWaitingForDebugger\"") != - std::string::npos) { - waiting_ = false; - io_->ResumeStartup(); - } - } - io_->PostIncomingMessage(InspectorAction::kSendMessage, session_id, - message); + auto session = sessions_.find(session_id); + if (session != sessions_.end()) + session->second->Dispatch(Utf8ToStringView(message)->string()); } void InspectorIoDelegate::EndSession(int session_id) { - io_->PostIncomingMessage(InspectorAction::kEndSession, session_id, ""); + sessions_.erase(session_id); } std::vector InspectorIoDelegate::GetTargetIds() { @@ -479,10 +351,17 @@ std::string InspectorIoDelegate::GetTargetUrl(const std::string& id) { return "file://" + script_path_; } -void IoSessionDelegate::SendMessageToFrontend( - const v8_inspector::StringView& message) { - io_->Write(TransportAction::kSendMessage, id_, message); +// static +void RequestQueueData::CloseAndFree(RequestQueueData* queue) { + queue->handle_->Reset(); + queue->handle_.reset(); + uv_close(reinterpret_cast(&queue->async_), + [](uv_handle_t* handle) { + uv_async_t* async = reinterpret_cast(handle); + RequestQueueData* wrapper = + node::ContainerOf(&RequestQueueData::async_, async); + delete wrapper; + }); } - } // namespace inspector } // namespace node diff --git a/src/inspector_io.h b/src/inspector_io.h index c897a44a52..7c43d212f0 100644 --- a/src/inspector_io.h +++ b/src/inspector_io.h @@ -6,8 +6,6 @@ #include "node_mutex.h" #include "uv.h" -#include -#include #include #include @@ -16,17 +14,14 @@ #endif -// Forward declaration to break recursive dependency chain with src/env.h. -namespace node { -class Environment; -} // namespace node - namespace v8_inspector { class StringBuffer; class StringView; } // namespace v8_inspector namespace node { +// Forward declaration to break recursive dependency chain with src/env.h. +class Environment; namespace inspector { std::string FormatWsAddress(const std::string& host, int port, @@ -34,143 +29,64 @@ std::string FormatWsAddress(const std::string& host, int port, bool include_protocol); class InspectorIoDelegate; - -enum class InspectorAction { - kStartSession, - kStartSessionUnconditionally, // First attach with --inspect-brk - kEndSession, - kSendMessage -}; +class MainThreadHandle; +class RequestQueue; // kKill closes connections and stops the server, kStop only stops the server enum class TransportAction { kKill, kSendMessage, - kStop, - kAcceptSession, - kDeclineSession + kStop }; class InspectorIo { public: - InspectorIo(node::Environment* env, const std::string& path, - const DebugOptions& options, bool wait_for_connect); + // Start the inspector agent thread, waiting for it to initialize + // bool Start(); + // Returns empty pointer if thread was not started + static std::unique_ptr Start( + std::shared_ptr main_thread, const std::string& path, + const DebugOptions& options); + // Will block till the transport thread shuts down ~InspectorIo(); - // Start the inspector agent thread, waiting for it to initialize, - // and waiting as well for a connection if wait_for_connect. - bool Start(); - // Stop the inspector agent thread. - void Stop(); - bool IsStarted(); - - void WaitForDisconnect(); - // Called from thread to queue an incoming message and trigger - // DispatchMessages() on the main thread. - void PostIncomingMessage(InspectorAction action, int session_id, - const std::string& message); - void ResumeStartup() { - uv_sem_post(&thread_start_sem_); - } - void ServerDone() { - uv_close(reinterpret_cast(&thread_req_), nullptr); - } - bool WaitForFrontendEvent(); - - int port() const { return port_; } + void StopAcceptingNewConnections(); std::string host() const { return options_.host_name(); } + int port() const { return port_; } std::vector GetTargetIds() const; private: - template - using MessageQueue = - std::deque>>; - enum class State { - kNew, - kAccepting, - kDone, - kError, - kShutDown - }; - - // Callback for main_thread_req_'s uv_async_t - static void MainThreadReqAsyncCb(uv_async_t* req); + InspectorIo(std::shared_ptr handle, + const std::string& path, const DebugOptions& options); // Wrapper for agent->ThreadMain() static void ThreadMain(void* agent); // Runs a uv_loop_t - template void ThreadMain(); - // Called by ThreadMain's loop when triggered by thread_req_, writes - // messages from outgoing_message_queue to the InspectorSockerServer - template static void IoThreadAsyncCb(uv_async_t* async); - - void DispatchMessages(); - // Write action to outgoing_message_queue, and wake the thread - void Write(TransportAction action, int session_id, - const v8_inspector::StringView& message); - // Thread-safe append of message to a queue. Return true if the queue - // used to be empty. - template - bool AppendMessage(MessageQueue* vector, ActionType action, - int session_id, - std::unique_ptr buffer); - // Used as equivalent of a thread-safe "pop" of an entire queue's content. - template - void SwapBehindLock(MessageQueue* vector1, - MessageQueue* vector2); - // Attach session to an inspector. Either kAcceptSession or kDeclineSession - TransportAction Attach(int session_id); + void ThreadMain(); + // This is a thread-safe object that will post async tasks. It lives as long + // as an Inspector object lives (almost as long as an Isolate). + std::shared_ptr main_thread_; + // Used to post on a frontend interface thread, lives while the server is + // running + std::shared_ptr request_queue_; const DebugOptions options_; // The IO thread runs its own uv_loop to implement the TCP server off // the main thread. uv_thread_t thread_; - // Used by Start() to wait for thread to initialize, or for it to initialize - // and receive a connection if wait_for_connect was requested. - uv_sem_t thread_start_sem_; - - State state_; - node::Environment* parent_env_; - - // Attached to the uv_loop in ThreadMain() - uv_async_t thread_req_; - // Note that this will live while the async is being closed - likely, past - // the parent object lifespan - std::pair* main_thread_req_; - // Will be used to post tasks from another thread - v8::Platform* const platform_; - - // Message queues - ConditionVariable incoming_message_cond_; - Mutex state_lock_; // Locked before mutating either queue. - MessageQueue incoming_message_queue_; - MessageQueue outgoing_message_queue_; - // This queue is to maintain the order of the messages for the cases - // when we reenter the DispatchMessages function. - MessageQueue dispatching_message_queue_; - - bool dispatching_messages_; + // For setting up interthread communications + Mutex thread_start_lock_; + ConditionVariable thread_start_condition_; std::string script_name_; - std::string script_path_; - const bool wait_for_connect_; - int port_; - std::unordered_map> sessions_; + int port_ = -1; // May be accessed from any thread const std::string id_; - - friend class DispatchMessagesTask; - friend class IoSessionDelegate; - friend void InterruptCallback(v8::Isolate*, void* agent); }; -std::unique_ptr Utf8ToStringView( - const std::string& message); - } // namespace inspector } // namespace node diff --git a/src/inspector_js_api.cc b/src/inspector_js_api.cc index 13e91f283c..268c25aeb4 100644 --- a/src/inspector_js_api.cc +++ b/src/inspector_js_api.cc @@ -66,7 +66,7 @@ class JSBindingsConnection : public AsyncWrap { callback_(env->isolate(), callback) { Agent* inspector = env->inspector_agent(); session_ = inspector->Connect(std::unique_ptr( - new JSBindingsSessionDelegate(env, this))); + new JSBindingsSessionDelegate(env, this)), false); } void OnMessage(Local value) { @@ -116,7 +116,7 @@ class JSBindingsConnection : public AsyncWrap { static bool InspectorEnabled(Environment* env) { Agent* agent = env->inspector_agent(); - return agent->io() != nullptr || agent->HasConnectedSessions(); + return agent->IsActive(); } void AddCommandLineAPI(const FunctionCallbackInfo& info) { @@ -251,8 +251,9 @@ void Open(const FunctionCallbackInfo& args) { if (args.Length() > 2 && args[2]->IsBoolean()) { wait_for_connect = args[2]->BooleanValue(env->context()).FromJust(); } - - agent->StartIoThread(wait_for_connect); + agent->StartIoThread(); + if (wait_for_connect) + agent->WaitForConnect(); } void Url(const FunctionCallbackInfo& args) { @@ -283,7 +284,7 @@ void Initialize(Local target, Local unused, Agent* agent = env->inspector_agent(); env->SetMethod(target, "consoleCall", InspectorConsoleCall); env->SetMethod(target, "addCommandLineAPI", AddCommandLineAPI); - if (agent->IsWaitingForConnect()) + if (agent->WillWaitForConnect()) env->SetMethod(target, "callAndPauseOnStart", CallAndPauseOnStart); env->SetMethod(target, "open", Open); env->SetMethod(target, "url", Url); diff --git a/src/inspector_socket_server.cc b/src/inspector_socket_server.cc index 174dc7c726..1621b408b4 100644 --- a/src/inspector_socket_server.cc +++ b/src/inspector_socket_server.cc @@ -173,11 +173,8 @@ class SocketSession { InspectorSocket* ws_socket() { return ws_socket_.get(); } - void set_ws_key(const std::string& ws_key) { - ws_key_ = ws_key; - } - void Accept() { - ws_socket_->AcceptUpgrade(ws_key_); + void Accept(const std::string& ws_key) { + ws_socket_->AcceptUpgrade(ws_key); } void Decline() { ws_socket_->CancelHandshake(); @@ -208,7 +205,6 @@ class SocketSession { const int id_; InspectorSocket::Pointer ws_socket_; const int server_port_; - std::string ws_key_; }; class ServerSocket { @@ -260,11 +256,11 @@ void InspectorSocketServer::SessionStarted(int session_id, const std::string& ws_key) { SocketSession* session = Session(session_id); if (!TargetExists(id)) { - Session(session_id)->Decline(); + session->Decline(); return; } connected_sessions_[session_id].first = id; - session->set_ws_key(ws_key); + session->Accept(ws_key); delegate_->StartSession(session_id, id); } @@ -404,6 +400,8 @@ bool InspectorSocketServer::Start() { } void InspectorSocketServer::Stop() { + if (state_ == ServerState::kStopped) + return; CHECK_EQ(state_, ServerState::kRunning); state_ = ServerState::kStopped; server_sockets_.clear(); @@ -446,23 +444,6 @@ void InspectorSocketServer::Accept(int server_port, } } -void InspectorSocketServer::AcceptSession(int session_id) { - SocketSession* session = Session(session_id); - if (session == nullptr) { - delegate_->EndSession(session_id); - } else { - session->Accept(); - } -} - -void InspectorSocketServer::DeclineSession(int session_id) { - auto it = connected_sessions_.find(session_id); - if (it != connected_sessions_.end()) { - it->second.first.clear(); - it->second.second->Decline(); - } -} - void InspectorSocketServer::Send(int session_id, const std::string& message) { SocketSession* session = Session(session_id); if (session != nullptr) { diff --git a/src/inspector_socket_server.h b/src/inspector_socket_server.h index bbc2195095..271be6ec55 100644 --- a/src/inspector_socket_server.h +++ b/src/inspector_socket_server.h @@ -54,10 +54,6 @@ class InspectorSocketServer { void Send(int session_id, const std::string& message); // kKill void TerminateConnections(); - // kAcceptSession - void AcceptSession(int session_id); - // kDeclineSession - void DeclineSession(int session_id); int Port() const; // Session connection lifecycle diff --git a/src/node.cc b/src/node.cc index bbf76853ed..a9318cb82b 100644 --- a/src/node.cc +++ b/src/node.cc @@ -323,11 +323,12 @@ static struct { // Inspector agent can't fail to start, but if it was configured to listen // right away on the websocket port and fails to bind/etc, this will return // false. - return env->inspector_agent()->Start(script_path, options); + return env->inspector_agent()->Start( + script_path == nullptr ? "" : script_path, options); } bool InspectorStarted(Environment* env) { - return env->inspector_agent()->IsStarted(); + return env->inspector_agent()->IsListening(); } #endif // HAVE_INSPECTOR @@ -1510,7 +1511,7 @@ void InitGroups(const FunctionCallbackInfo& args) { static void WaitForInspectorDisconnect(Environment* env) { #if HAVE_INSPECTOR - if (env->inspector_agent()->HasConnectedSessions()) { + if (env->inspector_agent()->IsActive()) { // Restore signal dispositions, the app is done and is no longer // capable of handling signals. #if defined(__POSIX__) && !defined(NODE_SHARED_MODE) @@ -3517,7 +3518,7 @@ static void ParseArgs(int* argc, static void StartInspector(Environment* env, const char* path, DebugOptions debug_options) { #if HAVE_INSPECTOR - CHECK(!env->inspector_agent()->IsStarted()); + CHECK(!env->inspector_agent()->IsListening()); v8_platform.StartInspector(env, path, debug_options); #endif // HAVE_INSPECTOR } @@ -3660,7 +3661,7 @@ static void DebugProcess(const FunctionCallbackInfo& args) { static void DebugEnd(const FunctionCallbackInfo& args) { #if HAVE_INSPECTOR Environment* env = Environment::GetCurrent(args); - if (env->inspector_agent()->IsStarted()) { + if (env->inspector_agent()->IsListening()) { env->inspector_agent()->Stop(); } #endif diff --git a/src/signal_wrap.cc b/src/signal_wrap.cc index 346f442f61..f44b232a1a 100644 --- a/src/signal_wrap.cc +++ b/src/signal_wrap.cc @@ -92,7 +92,7 @@ class SignalWrap : public HandleWrap { #if defined(__POSIX__) && HAVE_INSPECTOR if (signum == SIGPROF) { Environment* env = Environment::GetCurrent(args); - if (env->inspector_agent()->IsStarted()) { + if (env->inspector_agent()->IsListening()) { ProcessEmitWarning(env, "process.on(SIGPROF) is reserved while debugging"); return; diff --git a/test/cctest/test_inspector_socket_server.cc b/test/cctest/test_inspector_socket_server.cc index 60b7eefc5e..349356ef56 100644 --- a/test/cctest/test_inspector_socket_server.cc +++ b/test/cctest/test_inspector_socket_server.cc @@ -14,7 +14,6 @@ static const char CLIENT_CLOSE_FRAME[] = "\x88\x80\x2D\x0E\x1E\xFA"; static const char SERVER_CLOSE_FRAME[] = "\x88\x00"; static const char MAIN_TARGET_ID[] = "main-target"; -static const char UNCONNECTABLE_TARGET_ID[] = "unconnectable-target"; static const char WS_HANDSHAKE_RESPONSE[] = "HTTP/1.1 101 Switching Protocols\r\n" @@ -258,10 +257,6 @@ class ServerHolder { return server_->done(); } - void Connected() { - connected++; - } - void Disconnected() { disconnected++; } @@ -270,9 +265,10 @@ class ServerHolder { delegate_done = true; } - void PrepareSession(int id) { + void Connected(int id) { buffer_.clear(); session_id_ = id; + connected++; } void Received(const std::string& message) { @@ -319,15 +315,9 @@ class TestSocketServerDelegate : public SocketServerDelegate { void StartSession(int session_id, const std::string& target_id) override { session_id_ = session_id; - harness_->PrepareSession(session_id_); CHECK_NE(targets_.end(), std::find(targets_.begin(), targets_.end(), target_id)); - if (target_id == UNCONNECTABLE_TARGET_ID) { - server_->DeclineSession(session_id); - return; - } - harness_->Connected(); - server_->AcceptSession(session_id); + harness_->Connected(session_id_); } void MessageReceived(int session_id, const std::string& message) override { @@ -363,7 +353,7 @@ ServerHolder::ServerHolder(bool has_targets, uv_loop_t* loop, const std::string host, int port, FILE* out) { std::vector targets; if (has_targets) - targets = { MAIN_TARGET_ID, UNCONNECTABLE_TARGET_ID }; + targets = { MAIN_TARGET_ID }; std::unique_ptr delegate( new TestSocketServerDelegate(this, targets)); server_.reset( @@ -414,15 +404,6 @@ TEST_F(InspectorSocketServerTest, InspectorSessions) { well_behaved_socket.Close(); - // Declined connection - SocketWrapper declined_target_socket(&loop); - declined_target_socket.Connect(HOST, server.port()); - declined_target_socket.Write(WsHandshakeRequest(UNCONNECTABLE_TARGET_ID)); - declined_target_socket.Expect("HTTP/1.0 400 Bad Request"); - declined_target_socket.ExpectEOF(); - EXPECT_EQ(1, server.connected); - EXPECT_EQ(1, server.disconnected); - // Bogus target - start session callback should not even be invoked SocketWrapper bogus_target_socket(&loop); bogus_target_socket.Connect(HOST, server.port()); @@ -491,7 +472,7 @@ TEST_F(InspectorSocketServerTest, ServerWithoutTargets) { // Declined connection SocketWrapper socket(&loop); socket.Connect(HOST, server.port()); - socket.Write(WsHandshakeRequest(UNCONNECTABLE_TARGET_ID)); + socket.Write(WsHandshakeRequest("any target id")); socket.Expect("HTTP/1.0 400 Bad Request"); socket.ExpectEOF(); server->Stop(); diff --git a/test/parallel/test-warn-sigprof.js b/test/parallel/test-warn-sigprof.js index 71ac25443b..e5335215d7 100644 --- a/test/parallel/test-warn-sigprof.js +++ b/test/parallel/test-warn-sigprof.js @@ -1,3 +1,4 @@ +// Flags: --inspect=0 'use strict'; const common = require('../common');