Timers related function, set/clear Immediate/Timeout/Interval (#207)

* Set/Clear Immedidate/Timeout/Interval.

* add test case for napa set/clear Immediate/Timeout/Interval.

* Fix build error on windows for Timer related.

* Fix timer queue loop wait too long on new urgent tasks.
More robust timer-test.

* More tolerant on timer scheduler error.

* Fix windows build break using node 6.

* Tolerant more on timer schedule error special in auto build environment.

* Using strick js function tradition in Binding export functions on timer.

* change module name from timer to timers.

* make timers a buildin module.

* Remove non-used code line caused by copy/paste

* Do not need varible arguments here when constructing Timeout.

* Free persistent handles saved for timers at correct time point.

* Add test program under misc/ to check the memory usage on timers.

* Fix build break under node 6 for v8 SetWeak api.
Free timer early when it is not used.

* Add condition on immediate where no timer is attached to Timeout.

* Remove release timer in advance.

* Reset to release persistent handle.

* Group timers test under napa/timers

* typo fix

* Add SchedulePhase to generalize immediate/normal task types and keep expansion for future.

* Remove internal memeroy test scripts

* Some name style changes according to review suggestions.

* modify unittest on worker schedule interface changed.

* Force node 8.9.4 to avoid auto build test tools fail on new node 8.10.0 temperately.

* Force windows build use node 8.9.4 to avoid 8.10.0 crash issue temperately.

* Force OSX build use node 8.9.4 to avoid node 8.10.0 crash issue temperately.

* Run timer related test in another standalone mocha.
So that timer scheduler error not affected by other tests.

* revert back node version related changes.

* fix commandline issue on windows

* Force OSX build use node 8.9.4 to avoid node 8.10.0 crash issue temperately.

* revert osx node version on 8.9.4

* working 1

* working binding build 2

* remove static function getzoneScheduler

* Hide CallbackTask inside cpp. Add more comments. And some other modifications according to review.

* Refactor the napa-bingding build related changes. Do not add any
timers logic into napa-binding.node, so original build works without
add the timer-warpper.cpp into its source.

* fix typo in comments.
This commit is contained in:
Zhang Lei 2018-04-04 16:47:03 -07:00 коммит произвёл GitHub
Родитель 77ecdf7f22
Коммит a8d23e0d37
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
20 изменённых файлов: 672 добавлений и 58 удалений

Просмотреть файл

@ -18,5 +18,9 @@
{
"name": "util",
"type": "core"
},
{
"name": "timers",
"type": "builtin"
}
]

2
lib/core/timers.ts Normal file
Просмотреть файл

@ -0,0 +1,2 @@
export * from './timers/timer';
export * from './timers/timer-api';

Просмотреть файл

@ -0,0 +1,36 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
import { Timeout, Immediate } from './timer';
let binding = require('../../binding');
export function setImmediate(func: (...args: any[]) => void, ...args: any[]): Immediate {
let timeout = new Timeout(func, 0, 0, args);
binding.setImmediate(timeout);
return timeout;
}
export function clearImmediate(immediate: Immediate): void {
immediate._active = false;
}
export function setTimeout(func: (...args: any[]) => void, after: number, ...args: any[]): Timeout {
let timeout = new Timeout(func, after, 0, args);
binding.setTimers(timeout);
return timeout;
}
export function clearTimeout(timeout: Timeout): void {
timeout._active = false;
}
export function setInterval(func: (...args: any[]) => void, after: number, ...args: any[]): Timeout {
let timeout = new Timeout(func, after, after, args);
binding.setTimers(timeout);
return timeout;
}
export function clearInterval(timeout: Timeout): void {
timeout._active = false;
}

36
lib/core/timers/timer.ts Normal file
Просмотреть файл

@ -0,0 +1,36 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
/// <summary> Timer is a facility to run timed callbacks in caller's isolates. </summary>
export interface Timer {
}
const TIMEOUT_MAX = 2 ** 31 -1;
export class Timeout {
_callback: (...args: any[]) => void;
private _after: number;
private _repeat: number;
private _args: any[];
_active: boolean;
private _timer: Timer;
constructor(callback: (...args: any[]) => void,
after: number, repeat: number, args: any[]) {
if (after < 1) after = 0; //0 used for immediate
if (after > TIMEOUT_MAX) after = 1;
if (repeat < 1 || after == 0) repeat = 0; // do not repeat
if (repeat > TIMEOUT_MAX) repeat = 1;
this._callback = callback;
this._after = after;
this._repeat = repeat;
this._args = args;
this._timer = null;
this._active = true;
}
}
export type Immediate = Timeout;

Просмотреть файл

@ -18,4 +18,4 @@ import { call } from './zone/function-call';
(<any>(global))["__napa_zone_call__"] = call;
// Export 'napa' in global for all isolates that require napajs.
(<any>(global))["napa"] = exports;
(<any>(global))["napa"] = exports;

Просмотреть файл

@ -1,4 +1,6 @@
# Files to compile
# Note: Do not add napa core-modules cpp files that not needed in node isolation,
# like timer-wrap.cpp.
file(GLOB SOURCE_FILES
"addon.cpp"
"node-zone-delegates.cpp"
@ -8,7 +10,17 @@ file(GLOB SOURCE_FILES
"${PROJECT_SOURCE_DIR}/src/zone/call-task.cpp"
"${PROJECT_SOURCE_DIR}/src/zone/eval-task.cpp"
"${PROJECT_SOURCE_DIR}/src/zone/terminable-task.cpp"
"${PROJECT_SOURCE_DIR}/src/module/core-modules/napa/*.cpp")
"${PROJECT_SOURCE_DIR}/src/module/core-modules/napa/allocator-debugger-wrap.cpp"
"${PROJECT_SOURCE_DIR}/src/module/core-modules/napa/allocator-wrap.cpp"
"${PROJECT_SOURCE_DIR}/src/module/core-modules/napa/call-context-wrap.cpp"
"${PROJECT_SOURCE_DIR}/src/module/core-modules/napa/lock-wrap.cpp"
"${PROJECT_SOURCE_DIR}/src/module/core-modules/napa/metric-wrap.cpp"
"${PROJECT_SOURCE_DIR}/src/module/core-modules/napa/napa-binding.cpp"
"${PROJECT_SOURCE_DIR}/src/module/core-modules/napa/shared-ptr-wrap.cpp"
"${PROJECT_SOURCE_DIR}/src/module/core-modules/napa/store-wrap.cpp"
"${PROJECT_SOURCE_DIR}/src/module/core-modules/napa/transport-context-wrap-impl.cpp"
"${PROJECT_SOURCE_DIR}/src/module/core-modules/napa/zone-wrap.cpp"
)
# The addon name
set(TARGET_NAME "${PROJECT_NAME}-binding")

Просмотреть файл

@ -33,7 +33,7 @@
"benchmark": "node benchmark/bench.js",
"install": "node scripts/install.js",
"prepare": "tsc -p lib && tsc -p test && tsc -p benchmark",
"test": "mocha test --recursive",
"test": "mocha test -g \"^((?!napajs/timers).)*$\" --recursive && mocha test -g \"^napajs/timers\"",
"rebuild": "cmake-js rebuild && tsc -p lib",
"rebuildd": "cmake-js rebuild --debug && tsc -p lib",
"retest": "cmake-js rebuild -d test/module/addon && tsc -p test && mocha test --recursive",

Просмотреть файл

@ -10,6 +10,7 @@
#include "metric-wrap.h"
#include "shared-ptr-wrap.h"
#include "store-wrap.h"
#include "timer-wrap.h"
#include "transport-context-wrap-impl.h"
#include "zone-wrap.h"
@ -271,6 +272,31 @@ void DeserializeValue(const v8::FunctionCallbackInfo<v8::Value>& args) {
#endif
}
/////////////////////////////////////////////////////////////////////
/// Timers APIs, these APIs only valid in non-node isolation, i.e.,
/// they are not needed when building the napa_binding.node
#ifdef BUILDING_NAPA_EXTENSION
static void SetImmediate(const v8::FunctionCallbackInfo<v8::Value>& args) {
TimerWrap::SetImmediateCallback(args);
}
// Set Timeout or Set Interval
static void SetTimers(const v8::FunctionCallbackInfo<v8::Value>& args) {
TimerWrap::SetTimersCallback(args);
}
#endif
static void InitNapaOnlyBindings(v8::Local<v8::Object> exports) {
#ifdef BUILDING_NAPA_EXTENSION
TimerWrap::Init();
NAPA_SET_METHOD(exports, "setImmediate", SetImmediate);
NAPA_SET_METHOD(exports, "setTimers", SetTimers);
#endif
}
void binding::Init(v8::Local<v8::Object> exports, v8::Local<v8::Object> module) {
// Register napa binding in worker context.
RegisterBinding(module);
@ -311,4 +337,6 @@ void binding::Init(v8::Local<v8::Object> exports, v8::Local<v8::Object> module)
NAPA_SET_METHOD(exports, "serializeValue", SerializeValue);
NAPA_SET_METHOD(exports, "deserializeValue", DeserializeValue);
InitNapaOnlyBindings(exports);
}

Просмотреть файл

@ -1,13 +0,0 @@
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
<ClCompile Include="$(MSBuildThisFileDirectory)\allocator-debugger-wrap.cpp" />
<ClCompile Include="$(MSBuildThisFileDirectory)\allocator-wrap.cpp" />
<ClCompile Include="$(MSBuildThisFileDirectory)\call-context-wrap.cpp" />
<ClCompile Include="$(MSBuildThisFileDirectory)\metric-wrap.cpp" />
<ClCompile Include="$(MSBuildThisFileDirectory)\napa-binding.cpp" />
<ClCompile Include="$(MSBuildThisFileDirectory)\shared-ptr-wrap.cpp" />
<ClCompile Include="$(MSBuildThisFileDirectory)\store-wrap.cpp" />
<ClCompile Include="$(MSBuildThisFileDirectory)\transport-context-wrap-impl.cpp" />
<ClCompile Include="$(MSBuildThisFileDirectory)\zone-wrap.cpp" />
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,227 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
// See: https://groups.google.com/forum/#!topic/nodejs/onA0S01INtw
#ifdef BUILDING_NODE_EXTENSION
#include <node.h>
#endif
#include <vector>
#include <memory>
#include <functional>
#include "timer-wrap.h"
#include <napa/zone.h>
#include <zone/worker.h>
#include <zone/scheduler.h>
#include <zone/worker-context.h>
#include <zone/async-context.h>
#include <zone/task.h>
using napa::zone::WorkerId;
using napa::zone::WorkerContext;
using napa::zone::WorkerContextItem;
using napa::zone::NapaZone;
using napa::zone::SchedulePhase;
using v8::Array;
using v8::Boolean;
using v8::Context;
using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Global;
using v8::HandleScope;
using v8::Isolate;
using v8::Local;
using v8::Number;
using v8::Object;
using v8::Persistent;
using v8::String;
using v8::Value;
namespace napa {
namespace zone {
/// <summary> A task to run a C++ callback task without cross isolation. </summary>
class CallbackTask : public Task {
public:
typedef std::function<void(void)> Callback;
/// <summary> Constructor. </summary>
/// <param name="context"> Structure containing asynchronous work's context. </param>
CallbackTask(Callback callback)
: _callback(std::move(callback))
{
}
/// <summary> Overrides Task.Execute to define running execution logic. </summary>
virtual void Execute()
{
_callback();
}
private:
Callback _callback;
};
}
}
using namespace napa::module;
NAPA_DEFINE_PERSISTENT_CONSTRUCTOR(TimerWrap);
void TimerWrap::Init() {
auto isolate = Isolate::GetCurrent();
auto constructorTemplate = FunctionTemplate::New(isolate, DefaultConstructorCallback<TimerWrap>);
constructorTemplate->SetClassName(v8_helpers::MakeV8String(isolate, exportName));
constructorTemplate->InstanceTemplate()->SetInternalFieldCount(1);
NAPA_SET_PERSISTENT_CONSTRUCTOR(exportName, constructorTemplate->GetFunction());
}
Local<Object> TimerWrap::NewInstance(std::shared_ptr<napa::zone::Timer> timer) {
auto object = napa::module::NewInstance<TimerWrap>().ToLocalChecked();
auto wrap = NAPA_OBJECTWRAP::Unwrap<TimerWrap>(object);
wrap->_timer = std::move(timer);
return object;
}
void TimerWrap::Reset() {
_timer.reset();
}
napa::zone::Timer& TimerWrap::Get() {
return *_timer;
}
// This is created as SetWeak(void) is not exists in v8 used in NodeJS 6.
static void EmptyWeakCallback(const v8::WeakCallbackInfo<int>& data) {
}
std::shared_ptr<napa::zone::CallbackTask> buildTimeoutTask(
std::shared_ptr<Persistent<Object>> sharedTimeout,
std::shared_ptr<Persistent<Context>> sharedContext)
{
return std::make_shared<napa::zone::CallbackTask>(
[sharedTimeout, sharedContext]() {
auto isolate = Isolate::GetCurrent();
HandleScope handleScope(isolate);
auto context = Local<Context>::New(isolate, *sharedContext);
Context::Scope contextScope(context);
auto timeout = Local<Object>::New(isolate, *sharedTimeout);
bool needDestroy = true;
Local<Boolean> active = Local<Boolean>::Cast(timeout->Get(String::NewFromUtf8(isolate, "_active")));
if (active->Value()) {
Local<Function> cb = Local<Function>::Cast(timeout->Get(String::NewFromUtf8(isolate, "_callback")));
Local<Array> args = Local<Array>::Cast(timeout->Get(String::NewFromUtf8(isolate, "_args")));
std::vector<Local<Value>> parameters;
parameters.reserve(args->Length());
for (int i = 0; i < static_cast<int>(args->Length()); ++i) {
Local<Value> v = args->Get(context, i).ToLocalChecked();
parameters.emplace_back(v);
}
cb->Call(context, context->Global(), static_cast<int>(parameters.size()), parameters.data());
Local<Number> interval = Local<Number>::Cast(timeout->Get(String::NewFromUtf8(isolate, "_repeat")));
bool isInterval = (interval->Value() >= 1);
if (isInterval) {
auto jsTimer = NAPA_OBJECTWRAP::Unwrap<TimerWrap>(
Local<Object>::Cast(timeout->Get(String::NewFromUtf8(isolate, "_timer"))));
//Re-arm the interval timer in napa's timer schedule thread.
jsTimer->Get().Start();
// If not interval timer, global v8 handle for Timeout and Context will be SetWeak.
// Otherwise keep holding the hanle as they will be used some time later.
needDestroy = false;
}
}
if (needDestroy) {
if (!sharedTimeout->IsEmpty()) {
sharedTimeout->SetWeak((int*)nullptr, EmptyWeakCallback, v8::WeakCallbackType::kParameter);
sharedTimeout->Reset();
}
if (!sharedContext->IsEmpty()) {
sharedContext->SetWeak((int*)nullptr, EmptyWeakCallback, v8::WeakCallbackType::kParameter);
sharedContext->Reset();
}
}
}
);
}
void TimerWrap::SetImmediateCallback(const FunctionCallbackInfo<Value>& args) {
auto isolate = Isolate::GetCurrent();
HandleScope scope(isolate);
CHECK_ARG(isolate, args.Length() == 1, "1 argument is required for calling 'SetImmediateCallback'.");
CHECK_ARG(isolate, args[0]->IsObject(), "Argument \"timeout\" shall be 'Timeout' type.");
auto zone = reinterpret_cast<NapaZone*>(WorkerContext::Get(WorkerContextItem::ZONE));
if (zone == nullptr) {
throw new std::runtime_error("Null zone encountered!");
}
auto scheduler = zone->GetScheduler().get();
if (scheduler == nullptr) {
throw new std::runtime_error("Null scheduler encountered!");
}
Local<Object> timeout = Local<Object>::Cast(args[0]);
auto sharedTimeout = std::make_shared<Persistent<Object>>(isolate, timeout);
auto context = isolate->GetCurrentContext();
auto sharedContext = std::make_shared<Persistent<Context>>(isolate, context);
auto immediateCallbackTask = buildTimeoutTask(sharedTimeout, sharedContext);
auto workerId = static_cast<WorkerId>(
reinterpret_cast<uintptr_t>(WorkerContext::Get(WorkerContextItem::WORKER_ID)));
scheduler->ScheduleOnWorker(workerId, immediateCallbackTask, SchedulePhase::ImmediatePhase);
}
void TimerWrap::SetTimersCallback(const FunctionCallbackInfo<Value>& args) {
auto isolate = Isolate::GetCurrent();
HandleScope scope(isolate);
CHECK_ARG(isolate, args.Length() == 1, "1 argument is required for calling 'SetTimersCallback'.");
CHECK_ARG(isolate, args[0]->IsObject(), "Argument \"timeout\" shall be 'Timeout' type.");
auto zone = reinterpret_cast<NapaZone*>(WorkerContext::Get(WorkerContextItem::ZONE));
if (zone == nullptr) {
throw new std::runtime_error("Null zone encountered!");
}
auto scheduler = zone->GetScheduler().get();
if (scheduler == nullptr) {
throw new std::runtime_error("Null scheduler encountered!");
}
auto workerId = static_cast<WorkerId>(
reinterpret_cast<uintptr_t>(WorkerContext::Get(WorkerContextItem::WORKER_ID)));
Local<Object> timeout = Local<Object>::Cast(args[0]);
auto sharedTimeout = std::make_shared<Persistent<Object>>(isolate, timeout);
auto context = isolate->GetCurrentContext();
auto sharedContext = std::make_shared<Persistent<Context>>(isolate, context);
Local<Number> after = Local<Number>::Cast(timeout->Get(String::NewFromUtf8(isolate, "_after")));
std::chrono::milliseconds msAfter{static_cast<int>(after->Value())};
auto sharedTimer = std::make_shared<napa::zone::Timer>(
[sharedTimeout, sharedContext, scheduler, workerId]() {
auto timerCallbackTask = buildTimeoutTask(sharedTimeout, sharedContext);
scheduler->ScheduleOnWorker(workerId, timerCallbackTask, SchedulePhase::DefaultPhase);
}, msAfter);
auto jsTimer = TimerWrap::NewInstance(sharedTimer);
timeout->Set(String::NewFromUtf8(isolate, "_timer"), jsTimer);
sharedTimer->Start();
}

Просмотреть файл

@ -0,0 +1,55 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
#pragma once
#include <napa/module.h>
#include <zone/timer.h>
namespace napa {
namespace module {
/// <summary> It wraps napa::zone::Timer. </summary>
/// <remarks> Reference: napajs/lib/timer.ts#Timer </remarks>
class TimerWrap: public NAPA_OBJECTWRAP {
public:
/// <summary> Init this wrap. </summary>
static void Init();
/// <summary> It creates an instance of TimerWrap with a napa::zone::Timer pointer. </summary>
static v8::Local<v8::Object> NewInstance(std::shared_ptr<napa::zone::Timer> timer);
napa::zone::Timer& Get();
void Reset();
static void SetImmediateCallback(const v8::FunctionCallbackInfo<v8::Value>& args);
static void SetTimersCallback(const v8::FunctionCallbackInfo<v8::Value>& args);
private:
/// <summary> Default constructor. </summary>
TimerWrap() = default;
/// <summary> No copy allowed. </summary>
TimerWrap(const TimerWrap&) = delete;
TimerWrap& operator=(const TimerWrap&) = delete;
/// <summary> Friend default constructor callback. </summary>
template <typename T>
friend void napa::module::DefaultConstructorCallback(const v8::FunctionCallbackInfo<v8::Value>&);
template <typename T>
friend v8::MaybeLocal<v8::Object> napa::module::NewInstance(int argc, v8::Local<v8::Value> argv[]);
/// <summary> Exported class name. </summary>
static constexpr const char* exportName = "TimerWrap";
/// <summary> Hid constructor from public access. </summary>
NAPA_DECLARE_PERSISTENT_CONSTRUCTOR();
std::shared_ptr<napa::zone::Timer> _timer;
};
}
}

Просмотреть файл

@ -1,11 +0,0 @@
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
<ClCompile Include="$(MSBuildThisFileDirectory)\console.cpp" />
<ClCompile Include="$(MSBuildThisFileDirectory)\file-system.cpp" />
<ClCompile Include="$(MSBuildThisFileDirectory)\file-system-helpers.cpp" />
<ClCompile Include="$(MSBuildThisFileDirectory)\os.cpp" />
<ClCompile Include="$(MSBuildThisFileDirectory)\path.cpp" />
<ClCompile Include="$(MSBuildThisFileDirectory)\process.cpp" />
<ClCompile Include="$(MSBuildThisFileDirectory)\tty-wrap.cpp" />
</ItemGroup>
</Project>

19
src/zone/schedule-phase.h Normal file
Просмотреть файл

@ -0,0 +1,19 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
#pragma once
#include <stdint.h>
namespace napa {
namespace zone {
// Define the phase (like priority related type) of tasks.
// To be used mainly by schduler and worker to schedule its tasks.
enum class SchedulePhase : uint32_t {
DefaultPhase = 0,
ImmediatePhase = 1
};
};
}

3
src/zone/scheduler.cpp Normal file
Просмотреть файл

@ -0,0 +1,3 @@
#include "scheduler.h"
template class napa::zone::SchedulerImpl<napa::zone::Worker>;

Просмотреть файл

@ -3,6 +3,7 @@
#pragma once
#include "schedule-phase.h"
#include "simple-thread-pool.h"
#include "task.h"
#include "worker.h"
@ -41,11 +42,14 @@ namespace zone {
/// <summary> Schedules the task on a specific worker. </summary>
/// <param name="workerId"> The id of the worker. </param>
/// <param name="task"> Task to schedule. </param>
/// <param name="phase"> Which phase of the task, like Immediate or Normal. </param>
/// <remarks>
/// By design, it enqueues a task immediately,
/// so the task will have higher priority than ones called by Schedule().
/// </remarks>
void ScheduleOnWorker(WorkerId workerId, std::shared_ptr<Task> task);
void ScheduleOnWorker(WorkerId workerId,
std::shared_ptr<Task> task,
SchedulePhase phase = SchedulePhase::DefaultPhase);
/// <summary> Schedules the task on all workers. </summary>
/// <param name="task"> Task to schedule. </param>
@ -149,10 +153,11 @@ namespace zone {
}
template <typename WorkerType>
void SchedulerImpl<WorkerType>::ScheduleOnWorker(WorkerId workerId, std::shared_ptr<Task> task) {
void SchedulerImpl<WorkerType>::ScheduleOnWorker(
WorkerId workerId, std::shared_ptr<Task> task, SchedulePhase phase) {
NAPA_ASSERT(workerId < _workers.size(), "worker id out of range");
_synchronizer->Execute([workerId, this, task]() {
_synchronizer->Execute([workerId, this, task, phase]() {
// If the worker is idle, change it's status.
if (_idleWorkersFlags[workerId] != _idleWorkers.end()) {
_idleWorkers.erase(_idleWorkersFlags[workerId]);
@ -160,7 +165,7 @@ namespace zone {
}
// Schedule task on worker
_workers[workerId].Schedule(std::move(task));
_workers[workerId].Schedule(std::move(task), phase);
NAPA_DEBUG("Scheduler", "Explicitly scheduled task on worker %u.", workerId);
});

Просмотреть файл

@ -74,30 +74,30 @@ bool TimersScheduler::StartMainLoop() {
return;
}
const auto& topTimer = activeTimers.top();
if (topTimer.expirationTime <= std::chrono::high_resolution_clock::now()) {
if (timers[topTimer.index].active) {
auto nextExpirationTime = activeTimers.top().expirationTime;
if (nextExpirationTime <= std::chrono::high_resolution_clock::now()) {
// Pop before callback(), so that it could be re-armed for interval task logic.
auto expiredTimer = activeTimers.top();
activeTimers.pop();
if (timers[expiredTimer.index].active) {
timers[expiredTimer.index].active = false;
try {
// Fire the callback.
// The callback is assumed to be very fast as it is meant to dispatch to appropriate
// callback queues.
timers[topTimer.index].callback();
timers[expiredTimer.index].callback();
}
catch (const std::exception &ex) {
LOG_ERROR("Timers", "Timer callback threw an exception. %s", ex.what());
}
// We only support single trigger timers.
timers[topTimer.index].active = false;
}
// Timer expired, so it's no longer active.
activeTimers.pop();
}
else {
// Wait for timer expiration.
cv.wait_until(lock, topTimer.expirationTime, [&topTimer]() {
return topTimer.expirationTime <= std::chrono::high_resolution_clock::now();
// Wait for timer expiration. Stop waiting if new urgent active timer is arm-ed.
cv.wait_until(lock, nextExpirationTime, [this, nextExpirationTime]() {
return activeTimers.top().expirationTime < nextExpirationTime;
});
}
}

Просмотреть файл

@ -35,11 +35,14 @@ struct Worker::Impl {
/// <summary> Queue for tasks scheduled on this worker. </summary>
std::queue<std::shared_ptr<Task>> tasks;
/// <summary> Queue for tasks scheduled on this worker. </summary>
std::queue<std::shared_ptr<Task>> immediateTasks;
/// <summary> Condition variable to indicate if there are more tasks to consume. </summary>
std::condition_variable hasTaskEvent;
/// <summary> Lock for task queue. </summary>
/// <summary> Lock for task queue and immediate task queue. </summary>
std::mutex queueLock;
/// <summary> V8 isolate associated with this worker. </summary>
@ -69,7 +72,7 @@ Worker::Worker(WorkerId id,
Worker::~Worker() {
// Signal the thread loop that it should stop processing tasks.
Enqueue(nullptr);
Enqueue(nullptr, SchedulePhase::DefaultPhase);
NAPA_DEBUG("Worker", "(id=%u) Shutting down: Start draining task queue.", _impl->id);
_impl->workerThread.join();
@ -87,16 +90,21 @@ void Worker::Start() {
_impl->workerThread = std::thread(&Worker::WorkerThreadFunc, this, _impl->settings);
}
void Worker::Schedule(std::shared_ptr<Task> task) {
void Worker::Schedule(std::shared_ptr<Task> task, SchedulePhase phase) {
NAPA_ASSERT(task != nullptr, "Task should not be null");
Enqueue(task);
Enqueue(task, phase);
NAPA_DEBUG("Worker", "(id=%u) Task queued.", _impl->id);
}
void Worker::Enqueue(std::shared_ptr<Task> task) {
void Worker::Enqueue(std::shared_ptr<Task> task, SchedulePhase phase) {
{
std::unique_lock<std::mutex> lock(_impl->queueLock);
_impl->tasks.emplace(std::move(task));
if (phase == SchedulePhase::ImmediatePhase && task != nullptr) {
_impl->immediateTasks.emplace(std::move(task));
}
else {
_impl->tasks.emplace(std::move(task));
}
}
_impl->hasTaskEvent.notify_one();
}
@ -130,16 +138,28 @@ void Worker::WorkerThreadFunc(const settings::ZoneSettings& settings) {
std::shared_ptr<Task> task;
{
// Logically one merged task queue is the concatenation of immediate queue and
// the normal queue. The logically merged queue is treated as empty only when both queues
// are empty. And when not empty, immediate tasks will be handled prior to normal tasks.
// Inside each single queue (immediate or normal), tasks are first in first out.
std::unique_lock<std::mutex> lock(_impl->queueLock);
if (_impl->tasks.empty()) {
if (_impl->tasks.empty() && _impl->immediateTasks.empty()) {
_impl->idleNotificationCallback(_impl->id);
// Wait until new tasks come.
_impl->hasTaskEvent.wait(lock, [this]() { return !_impl->tasks.empty(); });
_impl->hasTaskEvent.wait(
lock,
[this]() { return !(_impl->tasks.empty() && _impl->immediateTasks.empty()); });
}
task = _impl->tasks.front();
_impl->tasks.pop();
if (_impl->immediateTasks.empty()) {
task = _impl->tasks.front();
_impl->tasks.pop();
}
else {
task = _impl->immediateTasks.front();
_impl->immediateTasks.pop();
}
}
// A null task means that the worker needs to shutdown.

Просмотреть файл

@ -4,6 +4,7 @@
#pragma once
#include "task.h"
#include "schedule-phase.h"
#include "settings/settings.h"
#include <functional>
@ -48,7 +49,7 @@ namespace zone {
/// <summary> Schedules a task on this worker. </summary>
/// <param name="task"> Task to schedule. </param>
/// <note> Same task instance may run on multiple workers, hence the use of shared_ptr. </node>
void Schedule(std::shared_ptr<Task> task);
void Schedule(std::shared_ptr<Task> task, SchedulePhase phase=SchedulePhase::DefaultPhase);
private:
@ -56,8 +57,8 @@ namespace zone {
void WorkerThreadFunc(const settings::ZoneSettings& settings);
/// <summary> Enqueue a task. </summary>
void Enqueue(std::shared_ptr<Task> task);
void Enqueue(std::shared_ptr<Task> task, SchedulePhase phase);
struct Impl;
std::unique_ptr<Impl> _impl;
};

190
test/timer-test.ts Normal file
Просмотреть файл

@ -0,0 +1,190 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
import * as napa from "../lib/index";
import {setImmediate, clearImmediate, setTimeout, clearTimeout, setInterval, clearInterval } from "timers";
// To be execute in napa workers
export function setImmediateTest(taskGroupId: number) : Promise<string> {
const kTaskGroupSize = 4;
const kAllowedScheduleDiffInMS = 200;
let correctResult = "";
let lastTaskId = 0;
for (let taskId = 0; taskId < kTaskGroupSize; taskId++) {
if (taskId != 1) {
correctResult = `${correctResult}:${taskId}_OnTime`;
lastTaskId = taskId;
}
}
let promise = new Promise<string>((resolve, reject) => {
let execResult = "";
for (let taskId = 0; taskId < kTaskGroupSize; taskId++) {
let startTime = Date.now();
let immedidate = setImmediate((lastTaskId: number) => {
let delayToRun = Date.now() - startTime;
execResult = `${execResult}:${taskId}_OnTime`;
if (delayToRun > kAllowedScheduleDiffInMS) {
execResult = `${execResult}(X)`;
}
if (taskId == lastTaskId) {
if (execResult == correctResult) {
resolve(`OK:${execResult}`)
}
else {
reject(`FAIL:${execResult} vs ${correctResult}`)
}
}
}, lastTaskId);
if (taskId == 1) {
clearImmediate(immedidate);
}
}
});
return promise;
}
export function setTimeoutTest(taskGroupId: number) : Promise<string> {
const kTaskGroupSize = 4;
const kAllowedScheduleDiffInMS = 200;
setTimeout(() => {}, 10); // Just a warm up.
let correctResult = "";
let lastTaskId = 0;
for (let taskId = 0; taskId < kTaskGroupSize; taskId++) {
if (taskId != 1) {
correctResult = `${correctResult}:${taskId}_OnTime`;
lastTaskId = taskId;
}
}
let promise = new Promise<string>((resolve, reject) => {
let execResult = "";
for (let taskId = 0; taskId < kTaskGroupSize; taskId++) {
let wait = 300 * (taskGroupId * kTaskGroupSize + taskId + 1);
let startTime = Date.now();
let timeout = setTimeout((lastTaskId: number) => {
let waitToRun = Date.now() - startTime;
execResult = `${execResult}:${taskId}_OnTime`;
if (Math.abs(waitToRun - wait) > kAllowedScheduleDiffInMS) {
execResult = `${execResult}(X)`;
}
if (taskId == lastTaskId) {
if (execResult == correctResult) {
resolve(`OK:${execResult}`)
}
else {
reject(`FAIL:${execResult} .vs. ${correctResult}`)
}
}
}, wait, lastTaskId);
if (taskId == 1) {
clearTimeout(timeout);
}
}
});
return promise;
}
export function setIntervalTest(taskGroupId: number, duration: number, count: number) : Promise<string> {
const kAllowedScheduleDiffInMS = 200;
let correctResult = "";
for (let i = 0; i < count; ++i) {
correctResult += `:${i}_OnTime`
}
let repeatCount = 0;
let execResult = "";
let startTime = Date.now();
let interval = setInterval(() => {
let wait = Date.now() - startTime;
execResult += `:${repeatCount}_OnTime`;
++repeatCount;
let avgScheduleDiff = Math.abs(wait - repeatCount * duration) / repeatCount;
if (avgScheduleDiff > kAllowedScheduleDiffInMS) {
execResult += `(X)`;
}
}, duration);
let promise = new Promise<string>((resolve, reject) => {
setTimeout(() => {
if (execResult == correctResult) {
resolve(`OK:${execResult}`)
}
else {
reject(`FAIL:${execResult} .vs. ${correctResult}`)
}
}, duration * (count + 2.6));
});
setTimeout(()=> {
clearInterval(interval);
}, Math.ceil(duration * (count + 0.8)));
return promise;
}
declare var __in_napa: boolean;
if (typeof __in_napa === 'undefined') {
let assert = require('assert');
const NUMBER_OF_WORKERS = 3;
const kTaskGroupCount = 3;
let zone = napa.zone.create('zone', { workers: NUMBER_OF_WORKERS });
describe("napajs/timers", () => {
describe("setImmediate/clearImmediate", function() {
let promises: Promise<napa.zone.Result>[] = [];
for (let groupId = 0; groupId < kTaskGroupCount; groupId++) {
let res = zone.execute('./timer-test', 'setImmediateTest', [groupId]);
promises.push(res);
}
for (let groupId = 0; groupId < kTaskGroupCount; groupId++) {
it(`Immediate test group:${groupId} should return string prefixed with OK`,
async function() {
let result = (await promises[groupId]).value;
assert(result.startsWith('OK'), `${result}`);
}
);
}
});
describe("setTimeout/clearTimeout", function() {
let promises: Promise<napa.zone.Result>[] = [];
for (let groupId = 0; groupId < kTaskGroupCount; groupId++) {
let res = zone.execute('./timer-test', 'setTimeoutTest', [groupId]);
promises.push(res);
}
for (let groupId = 0; groupId < kTaskGroupCount; groupId++) {
it(`Timeout test group:${groupId} should return string prefixed with OK`,
async function() {
let result = (await promises[groupId]).value;
assert(result.startsWith('OK'), `${result}`);
}
).timeout(3000);;
}
});
describe("setInterval/clearInterval", function() {
it(`Interval test should return string prefixed with OK`,
async function() {
let promise = zone.execute('./timer-test', 'setIntervalTest', ["0", 500, 4]);
let result = (await promise).value;
assert(result.startsWith('OK'), `${result}`);
}
).timeout(6000);
});
});
}

Просмотреть файл

@ -61,7 +61,7 @@ public:
_idleNotificationCallback(_id);
}
void Schedule(std::shared_ptr<Task> task) {
void Schedule(std::shared_ptr<Task> task, SchedulePhase phase=SchedulePhase::DefaultPhase) {
auto testTask = std::dynamic_pointer_cast<TestTask>(task);
testTask->SetCurrentWorkerId(_id);