ThreadPool improvements and additional cleanup
This commit is contained in:
Родитель
f563ff252e
Коммит
81cd7f167e
|
@ -15,13 +15,11 @@ export * from "../sharing/structs/identifiableStruct";
|
|||
export * from "../sharing/structs/shareable";
|
||||
export * from "../sharing/structs/sharedStruct";
|
||||
export * from "../sharing/structs/taggedStruct";
|
||||
export * from "../sharing/structs/wrapper";
|
||||
export * from "../sharing/collections/deque";
|
||||
export * from "../sharing/collections/hashData";
|
||||
export * from "../sharing/collections/sharedLinkedList";
|
||||
export * from "../sharing/collections/concurrentMap";
|
||||
export * from "../sharing/collections/sharedMap";
|
||||
export * from "../sharing/collections/sharedResizableArray";
|
||||
export * from "../sharing/collections/sharedSet";
|
||||
export * from "../sharing/collections/xxhash32";
|
||||
export * from "../sharing/sharedDiagnostics";
|
||||
|
|
|
@ -1551,6 +1551,57 @@ function createCreateProgramOptions(rootNames: readonly string[], options: Compi
|
|||
};
|
||||
}
|
||||
|
||||
// - [ ] host.trace()
|
||||
// - [ ] host.realpath()
|
||||
// - [ ] host.getCanonicalFileName()
|
||||
// - [ ] host.getSourceFile()
|
||||
// - [ ] host.useCaseSensitiveFileNames()
|
||||
// - [ ] host.hasInvalidatedLibResolutions()
|
||||
// - [ ] host.resolveTypeReferenceDirectiveReferences()
|
||||
// - [ ] host.resolveTypeReferenceDirectives()
|
||||
// - [ ] host.resolveLibrary()
|
||||
// - [ ] host.hasInvalidatedResolutions();
|
||||
// - [ ] getCreateSourceFileOptions() - calls into module resolution with a full host, will be tricky...
|
||||
// - [ ] tracing
|
||||
// - [x] supportedExtensionsWithJsonIfResolveJsonModule
|
||||
// - [x] supportedExtensions
|
||||
// - [x] currentDirectory
|
||||
// - [ ] performance?
|
||||
// - [ ] useSourceOfProjectReferenceRedirect
|
||||
// - [ ] mapFromToProjectReferenceRedirectSource
|
||||
// - [ ] mapFromFileToProjectReferenceRedirects
|
||||
// - [ ] resolvedProjectReferences
|
||||
// - [ ] projectReferenceRedirects
|
||||
// - [ ] options
|
||||
// - [ ] filesByName
|
||||
// - [ ] fileReasons
|
||||
// - [ ] fileProcessingDiagnostics
|
||||
// - [ ] sourceFilesFoundSearchingNodeModules
|
||||
// - [ ] modulesWithElidedImports
|
||||
// - [ ] packageIdToSourceFile
|
||||
// - [ ] redirectTargetsMap
|
||||
// - [ ] sourceFileToPackageName
|
||||
// - [ ] processingOtherFiles
|
||||
// - [ ] filesByNameIgnoreCase
|
||||
// - [ ] skipDefaultLib
|
||||
// - [ ] processingDefaultLibFiles
|
||||
// - [ ] resolvedTypeReferenceDirectives
|
||||
// - [ ] host.readFile
|
||||
// - [ ] getSourceFile
|
||||
// - [ ] resolvedLibReferences
|
||||
// - [ ] resolvedLibProcessing
|
||||
// - [ ] moduleResolutionCache
|
||||
// - [ ] structureIsReused
|
||||
// - [ ] oldProgram - what do we need from oldProgram? Can we generate a shared snapshot?
|
||||
// - [ ] usesUriStyleNodeCoreModules
|
||||
@Shared()
|
||||
// @ts-ignore
|
||||
class SharedProgramState extends SharedStructBase {
|
||||
@Shared() currentDirectory!: string;
|
||||
@Shared() supportedExtensions!: SharedArray<SharedArray<Extension>>;
|
||||
@Shared() supportedExtensionsWithJsonIfResolveJsonModule!: SharedArray<SharedArray<Extension>>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new 'Program' instance. A Program is an immutable collection of 'SourceFile's and a 'CompilerOptions'
|
||||
* that represent a compilation unit.
|
||||
|
|
|
@ -62,7 +62,9 @@ type EXIST = typeof EXIST;
|
|||
type NOT_EXIST = typeof NOT_EXIST;
|
||||
|
||||
/**
|
||||
* A concurrent Map-like object. Based on https://github.com/dotnet/runtime/blob/main/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentDictionary.cs
|
||||
* A concurrent Map-like object. Based on [ConcurrentDictionary][] from .NET Core.
|
||||
*
|
||||
* [ConcurrentDictionary]: https://github.com/dotnet/runtime/blob/main/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentDictionary.cs
|
||||
* @internal
|
||||
*/
|
||||
@Shared()
|
||||
|
@ -298,7 +300,7 @@ export class ConcurrentMap<K extends NonNullable<Shareable>, V extends NonNullab
|
|||
tables.countsPerLock[lockNo]--;
|
||||
}
|
||||
else {
|
||||
// performing unconditional set or update update
|
||||
// performing unconditional set or update
|
||||
if (isLockFree(replacementValue)) {
|
||||
node.value = replacementValue;
|
||||
}
|
||||
|
|
|
@ -1,126 +1,257 @@
|
|||
import { emptyArray } from "../../core";
|
||||
import { Debug } from "../../debug";
|
||||
import { AtomicValue } from "../../threading/atomicValue";
|
||||
import { workerThreads } from "../../workerThreads";
|
||||
import { isShareablePrimitive } from "../structs/shareable";
|
||||
import { Shared, SharedStructBase } from "../structs/sharedStruct";
|
||||
|
||||
/**
|
||||
* A ring buffer that can be resized.
|
||||
*/
|
||||
@Shared()
|
||||
class Ring<T extends Shareable> extends SharedStructBase {
|
||||
@Shared() readonly size: number;
|
||||
@Shared() readonly mask: number;
|
||||
@Shared() readonly segment: SharedArray<T | undefined>;
|
||||
class Ring<T extends NonNullable<Shareable>> extends SharedStructBase {
|
||||
@Shared() private readonly mask: AtomicValue<number>;
|
||||
@Shared() private readonly segment: SharedArray<T | undefined>;
|
||||
|
||||
constructor(size: number) {
|
||||
super();
|
||||
const mask = size - 1;
|
||||
if (size & mask) throw new RangeError("Must be a power of 2");
|
||||
|
||||
this.size = size;
|
||||
this.mask = mask;
|
||||
this.mask = new AtomicValue<number>(mask);
|
||||
this.segment = new SharedArray(size);
|
||||
}
|
||||
|
||||
static size<T extends Shareable>(self: Ring<T>) {
|
||||
return Atomics.load(self, "size");
|
||||
static size<T extends NonNullable<Shareable>>(self: Ring<T>) {
|
||||
return Ring.mask(self) + 1;
|
||||
}
|
||||
|
||||
static get<T extends Shareable>(self: Ring<T>, i: number) {
|
||||
return Atomics.load(self.segment, i & Atomics.load(self, "mask"));
|
||||
static mask<T extends NonNullable<Shareable>>(self: Ring<T>) {
|
||||
return AtomicValue.load(self.mask);
|
||||
}
|
||||
|
||||
static put<T extends Shareable>(self: Ring<T>, i: number, value: T) {
|
||||
Atomics.store(self.segment, i & Atomics.load(self, "mask"), value);
|
||||
static get<T extends NonNullable<Shareable>>(self: Ring<T>, i: number) {
|
||||
return Atomics.load(self.segment, i & AtomicValue.load(self.mask));
|
||||
}
|
||||
|
||||
static grow<T extends Shareable>(self: Ring<T>, bottom: number, top: number) {
|
||||
const size = Atomics.load(self, "size");
|
||||
const newSize = (size << 1) >>> 0;
|
||||
static set<T extends NonNullable<Shareable>>(self: Ring<T>, i: number, value: T | undefined) {
|
||||
Atomics.store(self.segment, i & AtomicValue.load(self.mask), value);
|
||||
}
|
||||
|
||||
static exchange<T extends NonNullable<Shareable>>(self: Ring<T>, i: number, value: T | undefined) {
|
||||
return Atomics.exchange(self.segment, i & AtomicValue.load(self.mask), value);
|
||||
}
|
||||
|
||||
static compareExchange<T extends NonNullable<Shareable>>(self: Ring<T>, i: number, expectedValue: T | undefined, replacementValue: T | undefined) {
|
||||
return Atomics.compareExchange(self.segment, i & AtomicValue.load(self.mask), expectedValue, replacementValue);
|
||||
}
|
||||
|
||||
static grow<T extends NonNullable<Shareable>>(self: Ring<T>, bottom: number, top: number, capacity: number) {
|
||||
const size = Ring.size(self);
|
||||
let newSize = (size << 1) >>> 0; // convert to u32
|
||||
if (newSize < size) throw new RangeError();
|
||||
const a: Ring<T> = new Ring(newSize);
|
||||
while (newSize < capacity) {
|
||||
newSize = (newSize << 1) >>> 0; // convert to u32
|
||||
if (newSize < size) throw new RangeError();
|
||||
}
|
||||
const a = new Ring<T>(newSize);
|
||||
for (let i = top; i < bottom; i++) {
|
||||
Ring.put(a, i, Ring.get(self, i));
|
||||
Ring.set(a, i, Ring.get(self, i)!);
|
||||
}
|
||||
return a;
|
||||
}
|
||||
|
||||
static shrink<T extends Shareable>(self: Ring<T>, bottom: number, top: number) {
|
||||
const size = Atomics.load(self, "size");
|
||||
static shrink<T extends NonNullable<Shareable>>(self: Ring<T>, bottom: number, top: number) {
|
||||
const size = Ring.size(self);
|
||||
if (size <= 1) return self;
|
||||
const a: Ring<T> = new Ring(size >>> 1);
|
||||
const a = new Ring<T>(size >>> 1);
|
||||
for (let i = top; i < bottom; i++) {
|
||||
Ring.put(a, i, Ring.get(self, i));
|
||||
Ring.set(a, i, Ring.get(self, i)!);
|
||||
}
|
||||
return a;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Object that holds nonprimitive values in the Deque.
|
||||
*
|
||||
* Array elements are not reset when popped/stolen, which may prevent GC for large object graphs like a SourceFile and
|
||||
* its children. Instead, we box nonprimitives so that we can safely unset them later, after we've read from the array
|
||||
* and adjusted the top/bottom pointers of the deque. This also helps to avoid the ABA problem since we're not
|
||||
* attempting a `compareExchange` against a value that may have been removed and then re-added to the deque as two
|
||||
* independent operations in other threads.
|
||||
*/
|
||||
@Shared()
|
||||
class Box<T extends ShareableNonPrimitive> extends SharedStructBase {
|
||||
@Shared() value: T | undefined;
|
||||
constructor(value: T) {
|
||||
super();
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
static box<T extends NonNullable<Shareable>>(value: T) {
|
||||
return isShareablePrimitive(value) ? value : new Box(value);
|
||||
}
|
||||
|
||||
static take<T extends NonNullable<Shareable>>(value: Extract<T, ShareablePrimitive> | Box<Extract<T, ShareableNonPrimitive>> | undefined): T | undefined {
|
||||
return isShareablePrimitive(value) ?
|
||||
value as Extract<T, ShareablePrimitive> | undefined :
|
||||
Atomics.exchange(value as Box<Extract<T, ShareableNonPrimitive>>, "value", /*value*/ undefined) as Extract<T, ShareableNonPrimitive> | undefined;
|
||||
}
|
||||
|
||||
static peek<T extends NonNullable<Shareable>>(value: Extract<T, ShareablePrimitive> | Box<Extract<T, ShareableNonPrimitive>> | undefined): T | undefined {
|
||||
return isShareablePrimitive(value) ?
|
||||
value as Extract<T, ShareablePrimitive> | undefined :
|
||||
Atomics.load(value as Box<Extract<T, ShareableNonPrimitive>>, "value") as Extract<T, ShareableNonPrimitive> | undefined;
|
||||
}
|
||||
}
|
||||
|
||||
const kDequeInitialCapacity = 1 << 5;
|
||||
const kDequeTrimFactor = 3;
|
||||
|
||||
/**
|
||||
* Chase-Lev Deque based on "Dynamic Circular Work-Stealing Deque"
|
||||
* @see {@link https://dl.acm.org/doi/10.1145/1073970.1073974}
|
||||
* @template {Shareable} T
|
||||
* @internal
|
||||
*/
|
||||
@Shared()
|
||||
export class Deque<T extends Shareable> extends SharedStructBase {
|
||||
@Shared() readonly top: number;
|
||||
@Shared() readonly bottom: number;
|
||||
@Shared() readonly array: Ring<T>;
|
||||
export class Deque<T extends NonNullable<Shareable>> extends SharedStructBase {
|
||||
@Shared() readonly threadId = workerThreads?.threadId ?? 0;
|
||||
@Shared() private readonly top = new AtomicValue<number>(0);
|
||||
@Shared() private readonly bottom = new AtomicValue<number>(0);
|
||||
@Shared() private readonly array = new AtomicValue(new Ring<Extract<T, ShareablePrimitive> | Box<Extract<T, ShareableNonPrimitive>>>(kDequeInitialCapacity));
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
this.top = 0;
|
||||
this.bottom = 0;
|
||||
this.array = new Ring(kDequeInitialCapacity);
|
||||
/**
|
||||
* Push a value onto the bottom of the deque. Pushes can only be performed by the thread that owns the deque.
|
||||
*/
|
||||
static push<T extends NonNullable<Shareable>>(self: Deque<T>, value: T) {
|
||||
Deque.checkOwner(self);
|
||||
const bottom = AtomicValue.load(self.bottom);
|
||||
const top = AtomicValue.load(self.top);
|
||||
const array = Deque.reserve(self, bottom, top, (bottom - top) + 1);
|
||||
Ring.set(array, bottom, Box.box(value));
|
||||
AtomicValue.store(self.bottom, wrappingAdd(bottom, 1));
|
||||
}
|
||||
|
||||
static pushBottom<T extends Shareable>(self: Deque<T>, value: T) {
|
||||
const bottom = Atomics.load(self, "bottom");
|
||||
const top = Atomics.load(self, "top");
|
||||
let array = Atomics.load(self, "array");
|
||||
if (bottom - top > Ring.size(array) - 1) {
|
||||
array = Ring.grow(array, bottom, top);
|
||||
Atomics.store(self, "array", array);
|
||||
/**
|
||||
* Pushes multiple values onto the bottom of the deque. Pushes can only be performed by the thread that owns the
|
||||
* deque.
|
||||
*/
|
||||
static pushMany<T extends NonNullable<Shareable>>(self: Deque<T>, values: ArrayLike<T>) {
|
||||
Deque.checkOwner(self);
|
||||
if (values.length > 0) {
|
||||
const bottom = AtomicValue.load(self.bottom);
|
||||
const top = AtomicValue.load(self.top);
|
||||
const array = Deque.reserve(self, bottom, top, (bottom - top) + values.length);
|
||||
for (let i = 0; i < values.length; i++) {
|
||||
Ring.set(array, bottom + i, Box.box(values[i]));
|
||||
}
|
||||
AtomicValue.store(self.bottom, wrappingAdd(bottom, values.length));
|
||||
}
|
||||
Ring.put(array, bottom, value);
|
||||
Atomics.store(self, "bottom", bottom + 1);
|
||||
}
|
||||
|
||||
static popBottom<T extends Shareable>(self: Deque<T>) {
|
||||
const bottom = Atomics.load(self, "bottom") - 1;
|
||||
const array = Atomics.load(self, "array");
|
||||
Atomics.store(self, "bottom", bottom);
|
||||
const top = Atomics.load(self, "top");
|
||||
/**
|
||||
* Tries to pop a value off the bottom of the deque. Pops can only be performed by the thread that owns the deque.
|
||||
* Other threads should {@link steal} instead.
|
||||
*/
|
||||
static pop<T extends NonNullable<Shareable>>(self: Deque<T>) {
|
||||
Deque.checkOwner(self);
|
||||
const bottom = wrappingSub(AtomicValue.load(self.bottom), 1);
|
||||
AtomicValue.store(self.bottom, bottom); // take bottom
|
||||
|
||||
const top = AtomicValue.load(self.top);
|
||||
const size = bottom - top;
|
||||
if (size < 0) {
|
||||
Atomics.store(self, "bottom", top);
|
||||
if (size < 0) { // deque was already empty, reset to canonical empty state
|
||||
AtomicValue.store(self.bottom, top);
|
||||
return undefined;
|
||||
}
|
||||
let value = Ring.get(array, bottom);
|
||||
|
||||
const value = Ring.get(AtomicValue.load(self.array), bottom);
|
||||
if (bottom - top > 0) { // deque had more than one element, so we are safe from a concurrent steal.
|
||||
Deque.trim(self, bottom, top); // trim excess unused elements
|
||||
return Debug.checkDefined(Box.take(value));
|
||||
}
|
||||
|
||||
// The deque had only one element and is becoming empty, so we may be in a race against any concurrent steal.
|
||||
const result = top === AtomicValue.compareExchange(self.top, top, wrappingAdd(top, 1));
|
||||
|
||||
// If we lose the race, `top` will already be `top + 1`. Reset to the canonical empty state by also setting
|
||||
// bottom to `top + 1`
|
||||
AtomicValue.store(self.bottom, wrappingAdd(top, 1));
|
||||
return result ? Debug.checkDefined(Box.take(value)) : undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to steal the top item in the deque. Steals should generally be performed by threads other than the thread
|
||||
* that owns the deque.
|
||||
*/
|
||||
static steal<T extends NonNullable<Shareable>>(self: Deque<T>) {
|
||||
const top = AtomicValue.load(self.top);
|
||||
const bottom = AtomicValue.load(self.bottom);
|
||||
const size = bottom - top;
|
||||
if (size <= 0) { // queue is empty
|
||||
return undefined;
|
||||
}
|
||||
const value = Ring.get(AtomicValue.load(self.array), top); // take value before CAS
|
||||
if (top === AtomicValue.compareExchange(self.top, top, wrappingAdd(top, 1))) {
|
||||
return Box.take(value);
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to steal a chunk of items from the deque, up to a max of about half of the items in the deque.
|
||||
*/
|
||||
static stealMany<T extends NonNullable<Shareable>>(self: Deque<T>, count: number): readonly T[] {
|
||||
let taken: T[] | undefined;
|
||||
const top = AtomicValue.load(self.top);
|
||||
const bottom = AtomicValue.load(self.bottom);
|
||||
const size = bottom - top;
|
||||
if (size > 0) {
|
||||
Deque.trim(self, bottom, top);
|
||||
return value;
|
||||
count = Math.min(count, ((size + 1) << 1) >>> 0);
|
||||
for (let i = 0; i < count; i++) {
|
||||
if (i > 0 && AtomicValue.load(self.bottom) - top <= 0) {
|
||||
break;
|
||||
}
|
||||
const value = Ring.get(AtomicValue.load(self.array), top);
|
||||
const result = top === AtomicValue.compareExchange(self.top, top, wrappingAdd(top, 1));
|
||||
if (!result) {
|
||||
break;
|
||||
}
|
||||
taken ??= [];
|
||||
taken.push(Debug.checkDefined(Box.take(value)));
|
||||
}
|
||||
}
|
||||
const result = top === Atomics.compareExchange(self, "top", top, top + 1);
|
||||
Atomics.store(self, "bottom", top + 1);
|
||||
return result ? value : undefined;
|
||||
return taken ?? emptyArray;
|
||||
}
|
||||
|
||||
static steal<T extends Shareable>(self: Deque<T>) {
|
||||
const top = Atomics.load(self, "top");
|
||||
const bottom = Atomics.load(self, "bottom");
|
||||
const array = Atomics.load(self, "array");
|
||||
if (bottom - top <= 0) {
|
||||
return undefined;
|
||||
}
|
||||
const value = Ring.get(array, top);
|
||||
if (top !== Atomics.compareExchange(self, "top", top, top + 1)) {
|
||||
return undefined;
|
||||
}
|
||||
return value;
|
||||
private static checkOwner<T extends NonNullable<Shareable>>(self: Deque<T>) {
|
||||
const ownerThreadId = Atomics.load(self, "threadId");
|
||||
const threadId = workerThreads?.threadId ?? 0;
|
||||
if (ownerThreadId !== threadId) Debug.fail(`Wrong thread. Expected ${ownerThreadId} but got ${threadId}`);
|
||||
}
|
||||
|
||||
private static trim<T extends Shareable>(self: Deque<T>, bottom: number, top: number) {
|
||||
const array = Atomics.load(self, "array");
|
||||
private static reserve<T extends NonNullable<Shareable>>(self: Deque<T>, bottom: number, top: number, capacity: number) {
|
||||
let array = AtomicValue.load(self.array);
|
||||
if (capacity > Ring.mask(array)) {
|
||||
array = Ring.grow(array, bottom, top, capacity);
|
||||
AtomicValue.store(self.array, array);
|
||||
}
|
||||
return array;
|
||||
}
|
||||
|
||||
private static trim<T extends NonNullable<Shareable>>(self: Deque<T>, bottom: number, top: number) {
|
||||
const array = AtomicValue.load(self.array);
|
||||
if (bottom - top < Ring.size(array) / kDequeTrimFactor) {
|
||||
Atomics.store(self, "array", Ring.shrink(array, bottom, top));
|
||||
AtomicValue.store(self.array, Ring.shrink(array, bottom, top));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function wrappingAdd(a: number, b: number) {
|
||||
return (a + b) | 0; // wrap i32 overflow
|
||||
}
|
||||
|
||||
function wrappingSub(a: number, b: number) {
|
||||
return (a - b) | 0; // wrap i32 overflow
|
||||
}
|
||||
|
|
|
@ -1,47 +0,0 @@
|
|||
import { Identifiable } from "../structs/identifiableStruct";
|
||||
import { Shared, SharedStructBase } from "../structs/sharedStruct";
|
||||
import { Tag, Tagged } from "../structs/taggedStruct";
|
||||
|
||||
// TODO: consider constructing ropes of shared arrays rather than reallocating and copying.
|
||||
|
||||
/**
|
||||
* The default `SharedArray` implementation depends on a fixed-length array. A ResizableSharedArray is an abstraction
|
||||
* over `SharedArray` that allows us to emulate resizing.
|
||||
* @internal
|
||||
*/
|
||||
@Shared()
|
||||
export class SharedResizableArray<T extends Shareable> extends Identifiable(Tagged(SharedStructBase, Tag.ResizableArray)) {
|
||||
@Shared() items: SharedArray<T>;
|
||||
|
||||
constructor(initialSize = 0) {
|
||||
super();
|
||||
this.items = new SharedArray(initialSize);
|
||||
}
|
||||
|
||||
static size<T extends Shareable>(self: SharedResizableArray<T>) {
|
||||
return self.items.length;
|
||||
}
|
||||
|
||||
static get<T extends Shareable>(self: SharedResizableArray<T>, index: number): T {
|
||||
return self.items[index];
|
||||
}
|
||||
|
||||
static set<T extends Shareable>(self: SharedResizableArray<T>, index: number, value: T): SharedResizableArray<T> {
|
||||
if (index >= self.items.length) {
|
||||
this.resize(self, index + 1);
|
||||
}
|
||||
self.items[index] = value;
|
||||
return self;
|
||||
}
|
||||
|
||||
static resize<T extends Shareable>(self: SharedResizableArray<T>, newLength: number) {
|
||||
if (self.items.length !== newLength) {
|
||||
const newArray = new SharedArray<T>(newLength);
|
||||
const minSize = Math.min(self.items.length, newLength);
|
||||
for (let i = 0; i < minSize; i++) {
|
||||
newArray[i] = self.items[i];
|
||||
}
|
||||
}
|
||||
return self;
|
||||
}
|
||||
}
|
|
@ -1,17 +1,10 @@
|
|||
import { __String, SymbolFlags } from "../types";
|
||||
import { SharedResizableArray } from "./collections/sharedResizableArray";
|
||||
import { SharedMap } from "./collections/sharedMap";
|
||||
import { Identifiable } from "./structs/identifiableStruct";
|
||||
import { SharedDeclaration } from "./sharedNode";
|
||||
import { Identifiable } from "./structs/identifiableStruct";
|
||||
import { Shared, SharedStructBase } from "./structs/sharedStruct";
|
||||
import { Tag, Tagged } from "./structs/taggedStruct";
|
||||
|
||||
declare global {
|
||||
interface OtherShareablePrimitive {
|
||||
__String: __String;
|
||||
}
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export interface SharedSymbolTable extends SharedMap<__String, SharedSymbol> {
|
||||
}
|
||||
|
@ -21,7 +14,7 @@ export interface SharedSymbolTable extends SharedMap<__String, SharedSymbol> {
|
|||
export class SharedSymbol extends Identifiable(Tagged(SharedStructBase, Tag.Symbol)) {
|
||||
@Shared() flags!: SymbolFlags;
|
||||
@Shared() escapedName!: __String;
|
||||
@Shared() declarations: SharedResizableArray<SharedDeclaration> | undefined;
|
||||
@Shared() declarations: SharedArray<SharedDeclaration> | undefined;
|
||||
@Shared() valueDeclaration: SharedDeclaration | undefined;
|
||||
@Shared() members: SharedSymbolTable | undefined;
|
||||
@Shared() exports: SharedSymbolTable | undefined;
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
// Brand used to identify a fake shared struct, so that we can emulate the restriction that shared struct fields
|
||||
|
||||
import { isNull } from "../../core";
|
||||
|
||||
// can only contain shareable primitives and other shared structs.
|
||||
|
|
|
@ -130,6 +130,10 @@ function getOrCreateMetadata(target: AbstractConstructor) {
|
|||
/**
|
||||
* A decorator used to mark a `class` or a non-static public class field as "shared". This is intended to be used to
|
||||
* emulate syntax for shared structs and provide a mechanism to associate types with shared struct fields.
|
||||
*
|
||||
* NOTE: This only works as long as we use either `useDefineForClassFields: false` or `experimentalDecorators: true`,
|
||||
* as you cannot reconfigure a class field introduced by a shared struct. If we change to `useDefineForClassFields: true`
|
||||
* we must replace all decorated fields with `declare` fields, which do not work with ES native decorators at present.
|
||||
* @internal
|
||||
*/
|
||||
export function Shared(): SharedClassDecorator & SharedFieldDecorator;
|
||||
|
@ -219,7 +223,7 @@ export function Shared(options?: SharedClassOptions) {
|
|||
sharedFields.push(context.name);
|
||||
}
|
||||
|
||||
function decorator(...args:
|
||||
function decorator(...args:
|
||||
| [target: AbstractConstructor]
|
||||
| [target: AbstractConstructor, context: ClassDecoratorContext]
|
||||
| [target: undefined, context: ClassFieldDecoratorContext]
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
export { };
|
||||
|
||||
// NOTE: These types relate to the origin trial implementation of the shared structs proposal and may not be indicative
|
||||
// of the final proposal. To use these types you must pass `--shared-string-table --harmony-struct` to NodeJS.
|
||||
|
||||
export { };
|
||||
|
||||
// the following brands are used to distinguish Shared Structs-related objects from other objects since they are not
|
||||
// interchangeable with `object`:
|
||||
declare const kGlobalSharedStructBrand: unique symbol;
|
||||
|
|
|
@ -2,19 +2,38 @@ import { hasProperty } from "../../core";
|
|||
import { isShareableNonPrimitive } from "./shareable";
|
||||
import { Shared } from "./sharedStruct";
|
||||
|
||||
/**
|
||||
* Since shared structs do not support `instanceof` across threads, we often need a way to distingush one shared struct
|
||||
* from another cheaply. To accomplish this, we can use the `Tagged` mixin to inject a `__tag__` field that we can use
|
||||
* along with `[Symbol.hasInstance]` to make `instanceof` work.
|
||||
* @internal
|
||||
*/
|
||||
export function Tagged<F extends abstract new (...args: any) => SharedStruct, TTag extends Tag>(base: F, tag: TTag): Tagged<F, TTag> {
|
||||
@Shared({ abstract: true })
|
||||
abstract class TaggedStruct extends base {
|
||||
static readonly __tag__ = tag;
|
||||
@Shared() readonly __tag__ = tag;
|
||||
|
||||
static [Symbol.hasInstance](value: unknown): boolean {
|
||||
return isTaggedStruct(value, tag);
|
||||
}
|
||||
}
|
||||
return TaggedStruct as Tagged<F, TTag>;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export type Tagged<F extends abstract new (...args: any) => SharedStruct, TTag extends Tag> =
|
||||
(F extends abstract new (...args: infer A) => (infer R extends TaggedStruct<infer _>) ? (abstract new (...args: A) => Omit<R, "__tag__">) : F) & TaggedStructConstructor<TTag>
|
||||
|
||||
/** @internal */
|
||||
export const enum Tag {
|
||||
Mutex,
|
||||
SharedMutex,
|
||||
Condition,
|
||||
ManualResetEvent,
|
||||
CountdownEvent,
|
||||
Thread,
|
||||
Map,
|
||||
Set,
|
||||
ResizableArray,
|
||||
ConcurrentMap,
|
||||
Semaphore,
|
||||
NodeArray,
|
||||
Node,
|
||||
Symbol,
|
||||
|
@ -41,29 +60,6 @@ export type TaggedStructConstructor<TTag extends Tag> = (abstract new (...args:
|
|||
readonly __tag__: TTag
|
||||
};
|
||||
|
||||
/** @internal */
|
||||
export type Tagged<F extends abstract new (...args: any) => SharedStruct, TTag extends Tag> =
|
||||
(F extends abstract new (...args: infer A) => (infer R extends TaggedStruct<infer _>) ? (abstract new (...args: A) => Omit<R, "__tag__">) : F) & TaggedStructConstructor<TTag>
|
||||
|
||||
/**
|
||||
* Since shared structs do not support instanceof, we often need a way to distingush one shared struct from another to
|
||||
* create the correct proxy class in our membrane. To accomplish this, we can use the Tagged mixin to inject a
|
||||
* `__tag__` field that we can use instead of `instanceof`.
|
||||
* @internal
|
||||
*/
|
||||
export function Tagged<F extends abstract new (...args: any) => SharedStruct, TTag extends Tag>(base: F, tag: TTag): Tagged<F, TTag> {
|
||||
@Shared({ abstract: true })
|
||||
abstract class TaggedStruct extends base {
|
||||
static readonly __tag__ = tag;
|
||||
@Shared() readonly __tag__ = tag;
|
||||
|
||||
static [Symbol.hasInstance](value: unknown): boolean {
|
||||
return isTaggedStruct(value, tag);
|
||||
}
|
||||
}
|
||||
return TaggedStruct as Tagged<F, TTag>;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export function isTaggedStructObject<TTag extends Tag>(value: ShareableNonPrimitive, tag?: TTag): value is TaggedStruct<TTag> {
|
||||
return hasProperty(value, "__tag__") && (tag === undefined || (value as TaggedStruct<Tag>).__tag__ === tag);
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import { spin } from "./spinWait";
|
||||
import { Shared, SharedStructBase } from "../sharing/structs/sharedStruct";
|
||||
import { spin } from "./spin";
|
||||
|
||||
/**
|
||||
* Atomically read and write a shared value. While this isn't strictly necessary as `Atomics` methods can be used
|
||||
|
@ -53,10 +53,6 @@ export class AtomicValue<T extends Shareable> extends SharedStructBase {
|
|||
return Atomics.compareExchange(self, "unsafeValue", expectedValue, replacementValue);
|
||||
}
|
||||
|
||||
static compareAndSet<T extends Shareable>(self: AtomicValue<T>, expectedValue: T, replacementValue: T) {
|
||||
return expectedValue === AtomicValue.compareExchange(self, expectedValue, replacementValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs 32-bit signed integer addition via an atomic read-modify-write operation.
|
||||
* @returns the value prior to addition.
|
||||
|
@ -64,7 +60,7 @@ export class AtomicValue<T extends Shareable> extends SharedStructBase {
|
|||
static add(self: AtomicValue<number>, value: number) {
|
||||
let spinCounter = 0;
|
||||
let currentValue = AtomicValue.load(self);
|
||||
while (currentValue !== AtomicValue.compareExchange(self, currentValue, (currentValue + value) >> 0)) {
|
||||
while (currentValue !== AtomicValue.compareExchange(self, currentValue, (currentValue + value) | 0 /* i32 wrapping add */)) {
|
||||
spinCounter = spin(spinCounter);
|
||||
currentValue = AtomicValue.load(self);
|
||||
}
|
||||
|
@ -78,7 +74,7 @@ export class AtomicValue<T extends Shareable> extends SharedStructBase {
|
|||
static sub(self: AtomicValue<number>, value: number) {
|
||||
let spinCounter = 0;
|
||||
let currentValue = AtomicValue.load(self);
|
||||
while (currentValue !== AtomicValue.compareExchange(self, currentValue, (currentValue - value) >> 0)) {
|
||||
while (currentValue !== AtomicValue.compareExchange(self, currentValue, (currentValue - value) | 0 /* i32 wrapping subtract */)) {
|
||||
spinCounter = spin(spinCounter);
|
||||
currentValue = AtomicValue.load(self);
|
||||
}
|
||||
|
@ -87,7 +83,7 @@ export class AtomicValue<T extends Shareable> extends SharedStructBase {
|
|||
|
||||
/**
|
||||
* Atomically increments a 32-bit signed integer value.
|
||||
* @returns the value prior to subtraction.
|
||||
* @returns the pre-incremented value.
|
||||
*/
|
||||
static increment(self: AtomicValue<number>) {
|
||||
return AtomicValue.add(self, 1);
|
||||
|
@ -95,83 +91,9 @@ export class AtomicValue<T extends Shareable> extends SharedStructBase {
|
|||
|
||||
/**
|
||||
* Atomically decrements a 32-bit signed integer value.
|
||||
* @returns the value prior to subtraction.
|
||||
* @returns the pre-decremented value.
|
||||
*/
|
||||
static decrement(self: AtomicValue<number>) {
|
||||
return AtomicValue.sub(self, 1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A non-shared wrapper for an {@link AtomicValue}.
|
||||
* @internal
|
||||
*/
|
||||
export class AtomicRef<T extends Shareable> implements Disposable {
|
||||
private _value: AtomicValue<T> | undefined;
|
||||
|
||||
constructor(value: AtomicValue<T> | undefined) {
|
||||
this._value = value;
|
||||
}
|
||||
|
||||
get disposed() {
|
||||
return !this._value;
|
||||
}
|
||||
|
||||
get value() {
|
||||
if (!this._value) throw new ReferenceError("Object is disposed");
|
||||
return AtomicValue.load(this._value);
|
||||
}
|
||||
|
||||
set value(value) {
|
||||
if (!this._value) throw new ReferenceError("Object is disposed");
|
||||
AtomicValue.store(this._value, value);
|
||||
}
|
||||
|
||||
unsafeGet() {
|
||||
if (!this._value) throw new ReferenceError("Object is disposed");
|
||||
return this._value.unsafeValue;
|
||||
}
|
||||
|
||||
unsafeSet(value: T) {
|
||||
if (!this._value) throw new ReferenceError("Object is disposed");
|
||||
this._value.unsafeValue = value;
|
||||
}
|
||||
|
||||
exchange(value: T) {
|
||||
if (!this._value) throw new ReferenceError("Object is disposed");
|
||||
return AtomicValue.exchange(this._value, value);
|
||||
}
|
||||
|
||||
compareExchange(expectedValue: T, replacementValue: T) {
|
||||
if (!this._value) throw new ReferenceError("Object is disposed");
|
||||
return AtomicValue.compareExchange(this._value, expectedValue, replacementValue);
|
||||
}
|
||||
|
||||
increment(this: AtomicRef<number>) {
|
||||
if (!this._value) throw new ReferenceError("Object is disposed");
|
||||
return AtomicValue.increment(this._value);
|
||||
}
|
||||
|
||||
decrement(this: AtomicRef<number>) {
|
||||
if (!this._value) throw new ReferenceError("Object is disposed");
|
||||
return AtomicValue.decrement(this._value);
|
||||
}
|
||||
|
||||
add(this: AtomicRef<number>, value: number) {
|
||||
if (!this._value) throw new ReferenceError("Object is disposed");
|
||||
return AtomicValue.add(this._value, value);
|
||||
}
|
||||
|
||||
sub(this: AtomicRef<number>, value: number) {
|
||||
if (!this._value) throw new ReferenceError("Object is disposed");
|
||||
return AtomicValue.sub(this._value, value);
|
||||
}
|
||||
|
||||
dispose() {
|
||||
this._value = undefined;
|
||||
}
|
||||
|
||||
[Symbol.dispose]() {
|
||||
this._value = undefined;
|
||||
}
|
||||
}
|
|
@ -1,7 +1,6 @@
|
|||
import { Debug } from "../debug";
|
||||
import { Identifiable } from "../sharing/structs/identifiableStruct";
|
||||
import { Shared, SharedStructBase } from "../sharing/structs/sharedStruct";
|
||||
import { isTaggedStruct, Tag, Tagged } from "../sharing/structs/taggedStruct";
|
||||
import { Tag, Tagged } from "../sharing/structs/taggedStruct";
|
||||
import { Lockable } from "./lockable";
|
||||
|
||||
let tryLock: (self: Mutex, cacheKey?: object) => boolean;
|
||||
|
@ -26,6 +25,8 @@ export class Mutex extends Identifiable(Tagged(SharedStructBase, Tag.Mutex)) {
|
|||
@Shared() private _locked = false;
|
||||
|
||||
static {
|
||||
// maintain a to avoid the overhead of reallocating callback functions to interact with the callback-based
|
||||
// API in `Atomics.Mutex`.
|
||||
const callbackCache = new WeakMap<object, CallbackCache>();
|
||||
|
||||
// we reuse the same lockTaken variable for each call to tryLock in a thread because its not
|
||||
|
@ -86,6 +87,11 @@ export class Mutex extends Identifiable(Tagged(SharedStructBase, Tag.Mutex)) {
|
|||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to lock the mutex and invoke `cb` in the context of the lock. The mutex will always be unlocked when the method
|
||||
* returns, regardless as to whether `cb` throws an exception.
|
||||
* @returns `true` if the lock was taken and the callback executed; otherwise `false`.
|
||||
*/
|
||||
static tryLock(self: Mutex, cb: () => void): boolean {
|
||||
if (!tryLock(self, /*cacheKey*/ undefined)) {
|
||||
return false;
|
||||
|
@ -114,38 +120,30 @@ export class Mutex extends Identifiable(Tagged(SharedStructBase, Tag.Mutex)) {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps the mutex in a {@link Lockable} wrapper object for use with other APIs.
|
||||
*/
|
||||
static asLockable(self: Mutex): Lockable {
|
||||
return new LockableMutex(self);
|
||||
}
|
||||
|
||||
static [Symbol.hasInstance](value: unknown): value is Mutex {
|
||||
return isTaggedStruct(value, Tag.Mutex);
|
||||
}
|
||||
}
|
||||
|
||||
class LockableMutex implements Lockable {
|
||||
private _mutex: Mutex;
|
||||
private _ownsLock = false;
|
||||
|
||||
constructor(mutex: Mutex) {
|
||||
this._mutex = mutex;
|
||||
}
|
||||
|
||||
tryLock(): boolean {
|
||||
Debug.assert(!this._ownsLock, "cannot take a lock you aleady own.");
|
||||
this._ownsLock = tryLock(this._mutex, this);
|
||||
return this._ownsLock;
|
||||
return tryLock(this._mutex, this);
|
||||
}
|
||||
|
||||
lock(): void {
|
||||
Debug.assert(!this._ownsLock, "cannot take a lock you aleady own.");
|
||||
lock(this._mutex, this);
|
||||
this._ownsLock = true;
|
||||
}
|
||||
|
||||
unlock(): void {
|
||||
Debug.assert(this._ownsLock, "cannot release a lock you do not own.");
|
||||
unlock(this._mutex, this);
|
||||
this._ownsLock = false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,11 @@ import { Lockable } from "./lockable";
|
|||
import { Mutex } from "./mutex";
|
||||
import { SharedMutex } from "./sharedMutex";
|
||||
|
||||
/** @internal */
|
||||
/**
|
||||
* An RAII wrapper for block-scoped locking of multiple mutexes using a deadlock prevention algorithm. Inspired by
|
||||
* `std::scoped_lock` in C++.
|
||||
* @internal
|
||||
*/
|
||||
export class ScopedLock {
|
||||
private _mutexes: readonly Lockable[] | undefined;
|
||||
|
||||
|
@ -14,8 +18,8 @@ export class ScopedLock {
|
|||
for (const mutex of Array.from(mutexes)) {
|
||||
array.push(
|
||||
mutex instanceof Mutex ? Mutex.asLockable(mutex) :
|
||||
mutex instanceof SharedMutex ? SharedMutex.asLockable(mutex) :
|
||||
mutex);
|
||||
mutex instanceof SharedMutex ? SharedMutex.asLockable(mutex) :
|
||||
mutex);
|
||||
}
|
||||
|
||||
let remaining = array.length;
|
||||
|
|
|
@ -1,102 +0,0 @@
|
|||
// import { Shared, SharedStructBase } from "../_namespaces/ts";
|
||||
// import { AtomicValue } from "../sharing/atomicValue";
|
||||
|
||||
// @Shared()
|
||||
// class DataStruct<T extends Shareable> extends SharedStructBase {
|
||||
// @Shared() internalCounter = new AtomicValue<number>(0);
|
||||
// @Shared() object: T;
|
||||
|
||||
// constructor(object: T) {
|
||||
// super();
|
||||
// this.object = object;
|
||||
// }
|
||||
|
||||
// static releaseRef<T extends Shareable>(self: DataStruct<T>) {
|
||||
// if (AtomicValue.add(self.internalCounter, 1) == -1) {
|
||||
// DataStruct.destroy(self);
|
||||
// }
|
||||
// }
|
||||
|
||||
// static destroy<T extends Shareable>(self: DataStruct<T>) {
|
||||
// self.internalCounter = undefined!;
|
||||
// self.object = undefined!;
|
||||
// }
|
||||
// }
|
||||
|
||||
// @Shared()
|
||||
// class DataPtrStruct<T extends Shareable> extends SharedStructBase {
|
||||
// @Shared() externalCounter: number;
|
||||
// @Shared() ptr: DataStruct<T> | undefined;
|
||||
// constructor(ptr: DataStruct<T> | undefined, externalCounter = 0) {
|
||||
// super();
|
||||
// this.ptr = ptr;
|
||||
// this.externalCounter = externalCounter;
|
||||
// }
|
||||
// }
|
||||
|
||||
// class DataGuard<T extends Shareable> {
|
||||
// private _ptr: DataStruct<T> | undefined;
|
||||
|
||||
// constructor(ptr: DataStruct<T> | undefined) {
|
||||
// this._ptr = ptr;
|
||||
// }
|
||||
|
||||
// get isValid() { return !!this._ptr; }
|
||||
|
||||
// get object() {
|
||||
// if (!this._ptr) throw new ReferenceError("Object is disposed");
|
||||
// return this._ptr.object;
|
||||
// }
|
||||
|
||||
// replaceWith(other: DataGuard<T>) {
|
||||
// if (this._ptr) {
|
||||
// DataStruct.releaseRef(this._ptr);
|
||||
// }
|
||||
// this._ptr = other._ptr;
|
||||
// other._ptr = undefined;
|
||||
// return this;
|
||||
// }
|
||||
|
||||
// move() {
|
||||
// const obj = new DataGuard<T>(this._ptr);
|
||||
// this._ptr = undefined;
|
||||
// return obj;
|
||||
// }
|
||||
|
||||
// [Symbol.dispose]() {
|
||||
// if (this._ptr) {
|
||||
// DataStruct.releaseRef(this._ptr);
|
||||
// this._ptr = undefined;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// @Shared()
|
||||
// class SharedGuard<T extends Shareable> extends SharedStructBase {
|
||||
// @Shared() private data_ptr = new AtomicValue<DataPtrStruct<T>>(new DataPtrStruct<T>(undefined));
|
||||
|
||||
// private static release<T extends Shareable>(old_data_ptr: DataPtrStruct<T>) {
|
||||
// if (!old_data_ptr.ptr) return;
|
||||
// const external = old_data_ptr.externalCounter;
|
||||
// if (AtomicValue.sub(old_data_ptr.ptr.internalCounter, external) === external - 1) {
|
||||
// DataStruct.destroy(old_data_ptr.ptr);
|
||||
// }
|
||||
// else {
|
||||
// DataStruct.releaseRef(old_data_ptr.ptr);
|
||||
// }
|
||||
// }
|
||||
|
||||
// static destroy<T extends Shareable>(self: SharedGuard<T>) {
|
||||
// const old_data_ptr = AtomicValue.load(self.data_ptr);
|
||||
// SharedGuard.release(old_data_ptr);
|
||||
// }
|
||||
|
||||
// static acquire<T extends Shareable>(self: SharedGuard<T>) {
|
||||
// let new_data_ptr: DataPtrStruct<T>;
|
||||
// const old_data_ptr = AtomicValue.load(self.data_ptr);
|
||||
// do {
|
||||
// new_data_ptr = new DataPtrStruct(old_data_ptr.ptr, old_data_ptr.externalCounter + 1);
|
||||
// }
|
||||
// while (!AtomicValue.compareAndSwap(self.data_ptr, old_data_ptr, new_data_ptr));
|
||||
// }
|
||||
// }
|
|
@ -4,7 +4,10 @@ import { Debug } from "../debug";
|
|||
import { SharedLockable } from "./sharedLockable";
|
||||
import { SharedMutex } from "./sharedMutex";
|
||||
|
||||
/** @internal */
|
||||
/**
|
||||
* Establishes a non-exclusive lock on a shared mutex, inspired by `std::shared_lock` in C++.
|
||||
* @internal
|
||||
*/
|
||||
export class SharedLock<T extends SharedLockable | SharedMutex> {
|
||||
private _mutex: T | undefined;
|
||||
private _lockable: SharedLockable | undefined;
|
||||
|
|
|
@ -26,7 +26,10 @@ interface CallbackCache {
|
|||
unlockShared?: () => void;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
/**
|
||||
* A mutex that allows both exclusive and non-exclusive locking. Inspired by `std::shared_mutex` in C++.
|
||||
* @internal
|
||||
*/
|
||||
@Shared()
|
||||
export class SharedMutex extends Identifiable(Tagged(SharedStructBase, Tag.SharedMutex)) {
|
||||
@Shared() private _mutex = new Atomics.Mutex();
|
||||
|
|
|
@ -2,7 +2,10 @@ let mutex: Atomics.Mutex;
|
|||
let condition: Atomics.Condition;
|
||||
let timeout: number;
|
||||
|
||||
/** @internal */
|
||||
/**
|
||||
* Puts the current thread to sleep for the specified number of milliseconds.
|
||||
* @internal
|
||||
*/
|
||||
export function sleep(ms: number) {
|
||||
mutex ??= new Atomics.Mutex();
|
||||
condition ??= new Atomics.Condition();
|
||||
|
|
|
@ -3,18 +3,6 @@ import { sleep } from "./sleep";
|
|||
|
||||
const cpuCount = sys.cpuCount?.() ?? 1;
|
||||
|
||||
export class SpinWait {
|
||||
private _count = 0;
|
||||
|
||||
reset() {
|
||||
this._count = 0;
|
||||
}
|
||||
|
||||
spinOnce() {
|
||||
this._count = spin(this._count);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Periodically puts the current thread to sleep in an effort to reduce lock contention.
|
||||
* @param currentCount an unsigned 32-bit integer value used to determine the current spin count.
|
||||
|
@ -26,6 +14,8 @@ export class SpinWait {
|
|||
* spinCounter = spin(spinCounter);
|
||||
* }
|
||||
* ```
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
export function spin(currentCount: number) {
|
||||
currentCount >>>= 0;
|
|
@ -1,4 +1,4 @@
|
|||
import { combinePaths, getAnyExtensionFromPath, getBaseFileName, getDirectoryPath, isTaggedStruct, removeExtension, sys, Tag, Tagged } from "../_namespaces/ts";
|
||||
import { combinePaths, getAnyExtensionFromPath, getBaseFileName, getDirectoryPath, removeExtension, sys, Tag, Tagged } from "../_namespaces/ts";
|
||||
import { isNodeLikeSystem, noop } from "../core";
|
||||
import { Debug } from "../debug";
|
||||
import { Deque } from "../sharing/collections/deque";
|
||||
|
@ -6,12 +6,16 @@ import { Shared, SharedStructBase } from "../sharing/structs/sharedStruct";
|
|||
import { Worker, workerThreads, WorkerThreadsHost } from "../workerThreads";
|
||||
import { Condition } from "./condition";
|
||||
import { Mutex } from "./mutex";
|
||||
import { SpinWait } from "./spinWait";
|
||||
import { spin } from "./spin";
|
||||
import { UniqueLock } from "./uniqueLock";
|
||||
|
||||
// temporary options to control various behaviors of the thread pool while experimenting.
|
||||
const WORK_STEALING = true;
|
||||
const PER_THREAD_QUEUE = true;
|
||||
|
||||
/**
|
||||
* A task scheduled on the thread pool.
|
||||
*/
|
||||
@Shared()
|
||||
class Task extends SharedStructBase {
|
||||
@Shared() readonly name: string;
|
||||
|
@ -24,51 +28,118 @@ class Task extends SharedStructBase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A task queue for a thread pool thread. The thread that owns the pool and the worker thread that owns the queue can
|
||||
* push new tasks. Only the worker thread that owns the queue can pop tasks, while other threads may steal tasks.
|
||||
*
|
||||
* The queue is operated using two {@link Deque|deques}: An input deque maintained by the thread that owns the thread
|
||||
* pool, and an output {@link Deque|deque} maintained by the worker thread that owns the pool. When the worker thread
|
||||
* requests a task, if no tasks are in the output queue, a chunk of tasks are stolen from the input queue and added to
|
||||
* the output queue.
|
||||
*/
|
||||
@Shared()
|
||||
class TaskQueue extends SharedStructBase {
|
||||
@Shared() private mutex = new Mutex();
|
||||
@Shared() private condition = new Condition();
|
||||
@Shared() private dequeue = new Deque<Task>();
|
||||
@Shared() private inputDeque = new Deque<Task>();
|
||||
@Shared() outputDeque: Deque<Task> | undefined; // output deque is assigned (and owned) by the worker thread.
|
||||
@Shared() done = false;
|
||||
|
||||
static tryDequeue(self: TaskQueue) {
|
||||
return Deque.popBottom(self.dequeue);
|
||||
/**
|
||||
* Push a new task onto the queue. This can only be invoked from either the thread that owns the thread pool or the
|
||||
* worker thread that owns the queue.
|
||||
*/
|
||||
static push(self: TaskQueue, task: Task) {
|
||||
Debug.assert(!self.done, "Cannot push new work if the queue is done.");
|
||||
const threadId = workerThreads?.threadId ?? 0;
|
||||
|
||||
// only the thead that originated the thread pool can add work to the input deque.
|
||||
// only the thread that owns the queue can add work to the output deque.
|
||||
const deque =
|
||||
threadId === self.inputDeque.threadId ? self.inputDeque :
|
||||
threadId === self.outputDeque?.threadId ? self.outputDeque :
|
||||
Debug.fail("wrong thread.");
|
||||
|
||||
Deque.push(deque, task);
|
||||
Condition.notify(self.condition, 1); // wake up the thread if it is sleeping.
|
||||
}
|
||||
|
||||
static dequeue(self: TaskQueue) {
|
||||
/**
|
||||
* Try to take any immediately available work off the deque. This can only be invoked from the worker thread that
|
||||
* owns the queue.
|
||||
*/
|
||||
static tryPop(self: TaskQueue) {
|
||||
Debug.assert(self.outputDeque);
|
||||
while (true) {
|
||||
// try to take work off our own queue
|
||||
const done = self.done;
|
||||
const task = Deque.pop(self.outputDeque);
|
||||
if (task || done) {
|
||||
return task;
|
||||
}
|
||||
|
||||
// try to take a chunk of work off of the input deque
|
||||
const stolen = Deque.stealMany(self.inputDeque, 32);
|
||||
if (stolen.length) {
|
||||
Deque.pushMany(self.outputDeque, stolen);
|
||||
|
||||
// we've successfully moved a chunk of work off of the input deque,
|
||||
// spin and try to pop from the output deque again
|
||||
continue;
|
||||
}
|
||||
|
||||
// we failed to take work from the deque and there's no immediate work to be had
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Pop a work item off the deque. If no work is available, the thread is suspended until either new work becomes
|
||||
* available or the queue exits. This can only be invoked from the worker thread that owns the queue.
|
||||
*/
|
||||
static pop(self: TaskQueue) {
|
||||
// We read 'done' before reading 'task' to avoid a race. If we read 'done' after we try to pop a task, we may be
|
||||
// preempted by a thread that both adds a new task and sets 'done' before were to read it. By reading 'done'
|
||||
// first we will have ensured that the queue was actually 'done' by the time we read any remaining tasks.
|
||||
const done = self.done;
|
||||
const task = TaskQueue.tryDequeue(self);
|
||||
const task = TaskQueue.tryPop(self);
|
||||
if (task || done) {
|
||||
return task;
|
||||
}
|
||||
|
||||
using lck = new UniqueLock(self.mutex);
|
||||
while (true) {
|
||||
const done = self.done;
|
||||
const task = TaskQueue.tryDequeue(self);
|
||||
const done = self.done; // read 'done' before 'task' to avoid a race, see above for rationale.
|
||||
const task = TaskQueue.tryPop(self);
|
||||
if (task || done) {
|
||||
return task;
|
||||
}
|
||||
|
||||
// no work to do, put the thread to sleep
|
||||
Condition.wait(self.condition, lck);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to steal a task from this thread's output deque. If no task was available, try to steal a task from this
|
||||
* threads input deque.
|
||||
*/
|
||||
static steal(self: TaskQueue) {
|
||||
return Deque.steal(self.dequeue);
|
||||
}
|
||||
|
||||
static enqueue(self: TaskQueue, task: Task) {
|
||||
Debug.assert(!self.done);
|
||||
Deque.pushBottom(self.dequeue, task);
|
||||
Condition.notify(self.condition, 1);
|
||||
return self.outputDeque && Deque.steal(self.outputDeque) || Deque.steal(self.inputDeque);
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the queue as 'done'. No more tasks can be added to the queue.
|
||||
*/
|
||||
static done(self: TaskQueue) {
|
||||
self.done = true;
|
||||
Condition.notify(self.condition);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A task scheduler for a thread pool. Tasks are scheduled on a given thread in a round-robin fashion.
|
||||
*/
|
||||
@Shared()
|
||||
class TaskScheduler extends SharedStructBase {
|
||||
@Shared() queues: SharedArray<TaskQueue>;
|
||||
|
@ -84,6 +155,9 @@ class TaskScheduler extends SharedStructBase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the scheduler, marking all task queues as done.
|
||||
*/
|
||||
static shutdown(self: TaskScheduler) {
|
||||
const queueCount = self.queues.length;
|
||||
for (let i = 0; i < queueCount; i++) {
|
||||
|
@ -91,31 +165,41 @@ class TaskScheduler extends SharedStructBase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a task on the thread poool.
|
||||
*/
|
||||
static scheduleTask(self: TaskScheduler, task: Task) {
|
||||
const queueCount = self.queues.length;
|
||||
const spin = new SpinWait();
|
||||
let spinCounter = 0;
|
||||
|
||||
let queueId: number;
|
||||
while (true) {
|
||||
const previousValue = Atomics.load(self, "nextTaskId");
|
||||
const nextValue = (previousValue + 1) >>> 0;
|
||||
const nextValue = (previousValue + 1) >>> 0; // u32 wrapping add
|
||||
if (Atomics.compareExchange(self, "nextTaskId", previousValue, nextValue) === previousValue) {
|
||||
queueId = previousValue % queueCount;
|
||||
break;
|
||||
}
|
||||
spin.spinOnce();
|
||||
// spin when there is contention.
|
||||
spinCounter = spin(spinCounter);
|
||||
}
|
||||
|
||||
TaskQueue.enqueue(self.queues[queueId], task);
|
||||
// TODO: wake all queues to steal work?
|
||||
TaskQueue.push(self.queues[queueId], task);
|
||||
|
||||
// TODO: wake all threads to steal work? If the selected thread already has work and the other threads are
|
||||
// sleeping, then the other threads won't be able to take up the slack.
|
||||
}
|
||||
|
||||
/**
|
||||
* Take a task off of the selected queue. Must only be called by the thread that owns the queue indicated by
|
||||
* provided `queueId`.
|
||||
*/
|
||||
static takeTask(self: TaskScheduler, queueId: number) {
|
||||
const queues = self.queues;
|
||||
|
||||
// first, try to take work from our queue
|
||||
const done = queues[queueId].done;
|
||||
const task = TaskQueue.tryDequeue(queues[queueId]);
|
||||
const task = TaskQueue.tryPop(queues[queueId]);
|
||||
if (task || done) {
|
||||
return task;
|
||||
}
|
||||
|
@ -132,8 +216,9 @@ class TaskScheduler extends SharedStructBase {
|
|||
}
|
||||
}
|
||||
|
||||
// finally, if that fails, perform a blocking dequeue on our queue
|
||||
return TaskQueue.dequeue(queues[queueId]);
|
||||
// finally, if that fails, perform a blocking dequeue on our queue. blocking isn't preferable, however, as
|
||||
// there may be work added to other queues that cannot be stolen while this thread is suspended.
|
||||
return TaskQueue.pop(queues[queueId]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -149,7 +234,6 @@ export class ThreadPool implements Disposable {
|
|||
private _workers: Worker[] = [];
|
||||
private _threads: Thread[] = [];
|
||||
private _scheduler: TaskScheduler | undefined;
|
||||
|
||||
private _listening = false;
|
||||
private _onUncaughtException = () => {
|
||||
if (this._scheduler) {
|
||||
|
@ -157,6 +241,12 @@ export class ThreadPool implements Disposable {
|
|||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Creates a new {@link ThreadPool} instance.
|
||||
* @param poolSize The number of thread pool threads to add to the pool.
|
||||
* @param host The {@link WorkerThreadsHost} associated with the pool.
|
||||
* @param generateCpuProfile Whether to generate a CPU profile in the thread pool thread.
|
||||
*/
|
||||
constructor(poolSize: number, host = workerThreads ?? Debug.fail("Worker threads not available."), generateCpuProfile?: string) {
|
||||
Debug.assert(poolSize >= 1);
|
||||
Debug.assert(host.isMainThread(), "A new thread pool can only be created on the main thread.");
|
||||
|
@ -215,7 +305,7 @@ export class ThreadPool implements Disposable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Immediately stop all threads in the thread pool and wait for them to terminate.
|
||||
* Immediately abort all threads in the thread pool and asynchronously wait for them to terminate.
|
||||
*/
|
||||
async abort() {
|
||||
if (!this._scheduler) {
|
||||
|
@ -250,6 +340,9 @@ export class ThreadPool implements Disposable {
|
|||
}
|
||||
|
||||
private _startListening() {
|
||||
// An uncaught exception will normally crash the process. However, open thread pool threads that are currently
|
||||
// sleeping may still prevent the process from exiting even if the worker is `unref`'d . In NodeJS we can
|
||||
// monitor for uncaught exceptions in the main thread and terminate all workers.
|
||||
if (isNodeLikeSystem()) {
|
||||
if (this._listening) {
|
||||
return;
|
||||
|
@ -291,6 +384,9 @@ const enum ThreadState {
|
|||
Exited,
|
||||
}
|
||||
|
||||
/**
|
||||
* Data structure representing the state of a thread pool thread.
|
||||
*/
|
||||
@Shared()
|
||||
class Thread extends Tagged(SharedStructBase, Tag.Thread) {
|
||||
@Shared() private mutex = new Mutex();
|
||||
|
@ -307,16 +403,19 @@ class Thread extends Tagged(SharedStructBase, Tag.Thread) {
|
|||
this.generateCpuProfile = generateCpuProfile;
|
||||
}
|
||||
|
||||
/**
|
||||
* Waiting for a thread to exit gracefully.
|
||||
*/
|
||||
static join(self: Thread) {
|
||||
using lck = new UniqueLock(self.mutex);
|
||||
Condition.wait(self.condition, lck, () => self.state === ThreadState.Exited);
|
||||
}
|
||||
|
||||
static [Symbol.hasInstance](value: unknown) {
|
||||
return isTaggedStruct(value, Tag.Thread);
|
||||
}
|
||||
|
||||
static {
|
||||
/**
|
||||
* The run loop for a thread pool thread. Whenever a task becomes available it is passed to `processTask` to be
|
||||
* handled by the thread.
|
||||
*/
|
||||
function runLoop(thread: Thread, processTask: (name: string, arg: Shareable) => void) {
|
||||
let task: Task | undefined;
|
||||
while (task = TaskScheduler.takeTask(thread.scheduler, thread.queueId)) {
|
||||
|
@ -327,11 +426,13 @@ class Thread extends Tagged(SharedStructBase, Tag.Thread) {
|
|||
runThread = function (processTask) {
|
||||
Debug.assert(workerThreads?.isWorkerThread() && workerThreads.workerData instanceof Thread, "This function may only be called from a thread pool thread.");
|
||||
const thread = workerThreads.workerData;
|
||||
|
||||
const state = thread.state;
|
||||
Debug.assert(state === ThreadState.NotStarted);
|
||||
Debug.assert(state === Atomics.compareExchange(thread, "state", state, ThreadState.Running));
|
||||
const started = ThreadState.NotStarted === Atomics.compareExchange(thread, "state", ThreadState.NotStarted, ThreadState.Running);
|
||||
Debug.assert(started, "Illegal operation. Thread already started by a different worker.");
|
||||
try {
|
||||
// assign the output deque for this thread's task queue.
|
||||
thread.scheduler.queues[thread.queueId].outputDeque = new Deque();
|
||||
|
||||
// Enable CPU profiling for the thread, if it was requested.
|
||||
if (thread.generateCpuProfile && sys.enableCPUProfiler && sys.disableCPUProfiler) {
|
||||
const dirname = getDirectoryPath(thread.generateCpuProfile);
|
||||
const basename = getBaseFileName(thread.generateCpuProfile);
|
||||
|
@ -351,9 +452,12 @@ class Thread extends Tagged(SharedStructBase, Tag.Thread) {
|
|||
}
|
||||
}
|
||||
catch (e) {
|
||||
Debug.log.trace(e);
|
||||
// Log the error to stdout. There is currently no other way to report the error, and no way to crash the
|
||||
// main thread from a worker.
|
||||
Debug.log.error(e);
|
||||
}
|
||||
finally {
|
||||
// mark the thread as exited and notify the thread that owns the thread pool to unblock any joins.
|
||||
thread.state = ThreadState.Exited;
|
||||
Condition.notify(thread.condition);
|
||||
}
|
||||
|
|
|
@ -5,7 +5,10 @@ import { Lockable } from "./lockable";
|
|||
import { Mutex } from "./mutex";
|
||||
import { SharedMutex } from "./sharedMutex";
|
||||
|
||||
/** @internal */
|
||||
/**
|
||||
* Establishes an exclusive lock on a mutex, inspired by `std::unique_lock` in C++.
|
||||
* @internal
|
||||
*/
|
||||
export class UniqueLock<T extends Mutex | SharedMutex> {
|
||||
private _mutex: T | undefined;
|
||||
private _lockable: Lockable | undefined;
|
||||
|
|
|
@ -5993,6 +5993,14 @@ export const enum InternalSymbolName {
|
|||
*/
|
||||
export type __String = (string & { __escapedIdentifier: void }) | (void & { __escapedIdentifier: void }) | InternalSymbolName;
|
||||
|
||||
declare global {
|
||||
// Due to the intersection in `__String`, the type system cannot determine whether `__String` is actually shareable,
|
||||
// so we must extend the `ShareablePrimitive` open-ended union:
|
||||
interface OtherShareablePrimitive {
|
||||
__String: __String;
|
||||
}
|
||||
}
|
||||
|
||||
/** @deprecated Use ReadonlyMap<__String, T> instead. */
|
||||
export type ReadonlyUnderscoreEscapedMap<T> = ReadonlyMap<__String, T>;
|
||||
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
import { Deque } from "../../_namespaces/ts";
|
||||
|
||||
describe("unittests:: sharing:: workStealingDeque", () => {
|
||||
describe("pushBottom", () => {
|
||||
it("push first item", () => {
|
||||
const deque = new Deque<number>();
|
||||
Deque.push(deque, 1);
|
||||
});
|
||||
});
|
||||
});
|
Загрузка…
Ссылка в новой задаче