perks/tasks/lib/exclusive-locks.ts

367 строки
12 KiB
TypeScript

/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { Server, ListenOptions } from 'net';
import { ManualPromise } from './manual-promise';
import { Delay, When } from './task-functions';
import { ExclusiveLockUnavailableException, SharedLockUnavailableException } from './exception';
import { promisify } from './node-promisify';
import { tmpdir } from 'os';
import { unlink as fsUnlink, readFile as fsReadFile, writeFile as fsWriteFile } from 'fs';
import { createHash } from 'crypto';
const unlink = promisify(fsUnlink);
const readFile = promisify(fsReadFile);
const writeFile = promisify(fsWriteFile);
/* eslint-disable */
function sanitize(input: string, replacement: string = '_') {
const illegalCharacters = /[\/\?<>\\:\*\|":]/g;
const controlCharacters = /[\x00-\x1f\x80-\x9f]/g;
const reservedCharacters = /^\.+$/;
const reservedNames = /^(con|prn|aux|nul|com[0-9]|lpt[0-9])(\..*)?$/i;
const trailingSpaces = /[\. ]+$/;
return input
.replace(illegalCharacters, replacement)
.replace(controlCharacters, replacement)
.replace(reservedCharacters, replacement)
.replace(reservedNames, replacement)
.replace(trailingSpaces, replacement);
}
function distill(content: any) {
const hash = createHash('sha256').update(JSON.stringify(content)).digest();
const port = hash.readUInt16BE(2) | 4096; // 4096+
let host = `${(hash.readUInt16BE(4) | 0x10) + 0x7f000000}`;
if (process.platform === 'darwin') {
host = `${0x7f000001}`;
}
return { port, host };
}
/**
* An interface for Exclusive Locking objects.
*/
export interface IExclusive {
/**
* Acquire an exclusive lock to the resource.
*
* @throws ExclusiveLockUnavailableException - if the timeout is reached before the lock can be acquired.
* @param timeoutMS - the length of time in miliiseconds to wait for a lock.
* @param delayMS - the length of time in milliseconds to retry the lock.
* @returns - the release function to release the lock.
*/
acquire(timeoutMS?: number, delayMS?: number): Promise<() => Promise<void>>;
}
/**
* An system-wide Exclusive lock for a named resource.
*
* This is implemented using an exclusive named pipe.
*/
export class Mutex implements IExclusive {
/*@internal*/public readonly pipe: string;
/*@internal*/public readonly options: ListenOptions;
/**
*
* @param name - the resource name to create a Mutex for. Multiple instances (across processes) using the same name are operating on the same lock.
*/
public constructor(private name: string) {
if (process.platform === 'win32') {
this.pipe = `\\\\.\\pipe\\${sanitize(name)}`;
this.options = { path: this.pipe, exclusive: true };
} else {
const pretendName = `${tmpdir()}/pipe_${sanitize(name)}`;
this.options = { ...distill(pretendName), exclusive: true };
this.pipe = `${tmpdir()}/pipe_${sanitize(name)}:${this.options.port}`;
}
}
/**
* Asynchronously acquires the lock. Will wait for up {@link timeoutMS} milliseconds
* @throws ExclusiveLockUnavailableException - if the timeout is reached before the lock can be acquired.
* @param timeoutMS - the length of time in miliiseconds to wait for a lock.
* @param delayMS - the length of time in milliseconds to retry the lock.
* @returns - the release function to release the lock.
*/
public async acquire(timeoutMS: number = 20000, delayMS: number = 100): Promise<() => Promise<void>> {
const fail = Date.now() + timeoutMS;
do {
try {
// try to get the lock to the pipe
const server = new Server();
// possible events after listen
const completed = When(server, 'listening', 'error');
// listening will trigger when we've acquired the pipe handle
server.listen(this.options);
// don't let this keep the process alive.
server.unref();
// wait to see if we can listen to the pipe or fail trying.
await completed;
// yes, we did, setup the release function
const closedServer = new ManualPromise<void>();
// the release function is returned to the consumer
return async () => {
// ensure that releasing twice isn't harmful.
if (!closedServer.isCompleted) {
server.close(() => closedServer.resolve());
await closedServer;
}
};
} catch {
// not really releavent why it failed. Pause for a moment.
await Delay(delayMS);
}
// check if we're past the allowable time.
} while (fail > Date.now());
// we were unable to get the lock before the timeout.
throw new ExclusiveLockUnavailableException(this.name, timeoutMS);
}
}
/**
* A process-local exclusive lock, bound to the object instance.
*/
export class CriticalSection implements IExclusive {
private promise: ManualPromise<void> | undefined;
/**
* @constructor - Creates an instance of a CriticalSection
*
* @param name - a cosmetic name for the 'resource'. Note: multiple CriticalSection instances with the same do not offer exclusivity, it's tied to the object instance.
*/
public constructor(private name: string = 'unnamed') {
}
/**
* Asynchronously acquires the lock. Will wait for up {@link timeoutMS} milliseconds
* @throws ExclusiveLockUnavailableException - if the timeout is reached before the lock can be acquired.
* @param timeoutMS - the length of time in miliiseconds to wait for a lock.
* @param delayMS - unused.
* @returns - the release function to release the lock.
*/
public async acquire(timeoutMS: number = 20000, delayMS: number = 100): Promise<() => Promise<void>> {
// delayMS isn't really relavent in this case, since all aqui
const fail = Date.now() + timeoutMS;
if (this.promise) {
do {
// wait for its release, or we use up what's left of the timeout.
// since multiple consumers can be waiting for the promise to fulfil,
// the previous promise holder can resolve, someone else can take it's place
// before we get a chance to act.
await Promise.race([this.promise, Delay(fail - Date.now())]);
} while (this.promise && fail > Date.now());
}
// check to see if the promise is still around, which indicates
// that we must have timed out.
if (this.promise) {
throw new ExclusiveLockUnavailableException(this.name, timeoutMS);
}
// there is no outstanding promise now, we can create one
const myPromise = new ManualPromise<void>();
this.promise = myPromise;
// the release function is returned to the consumer
return async () => {
// ensure that releasing twice isn't harmful.
if (!myPromise.isCompleted) {
this.promise = undefined;
myPromise.resolve();
}
};
}
}
/**
* Offers a lock where many consumers can acquire, but an exclusive lock can only be gained if
* the consumer is the only one who has a shared lock.
*/
export class SharedLock {
private readonly exclusiveLock: Mutex;
private readonly busyLock: Mutex;
private readonly personalLock: Mutex;
private readonly file: string;
public constructor(private name: string) {
this.exclusiveLock = new Mutex(`${sanitize(name)}.exclusive-lock`);
this.busyLock = new Mutex(`${sanitize(name)}.busy-lock`);
this.personalLock = new Mutex(`${sanitize(name)}.${Math.random() * 10000}.personal-lock`);
this.file = `${tmpdir()}/${sanitize(name)}.lock`;
}
private async readConnections(): Promise<Array<ListenOptions>> {
// get the list of names.
let connections = new Array<ListenOptions>();
try {
const list = JSON.parse(await readFile(this.file, 'utf8'));
for (const each of list) {
if (await this.isLocked(each)) {
connections.push(each);
}
}
} catch {
}
return connections;
}
private async writeConnections(connections: Array<ListenOptions>): Promise<void> {
// write the list of names.
if (connections && connections.length > 0) {
// write the list of names into the file.
await writeFile(this.file, JSON.stringify(connections, null, 2));
} else {
try {
// no names in list, file should be deleted
await unlink(this.file);
} catch {
// shh!
}
}
}
private async isLocked(options: ListenOptions): Promise<boolean> {
const server = new Server();
try {
// possible events after listen
const completed = When(server, 'listening', 'error');
// listening will trigger when we've acquired the pipe handle
server.listen(options);
// wait to see if we can listen to the pipe or fail trying.
await completed;
// the pipe opened! It's not locked
await server.close();
return false;
} catch {
server.close();
}
// the pipe is locked
return true;
}
/**
* Asynchronously acquires a shared lock. Will wait for up {@link timeoutMS} milliseconds
* @throws SharedLockUnavailableException - if the timeout is reached before the lock can be acquired.
* @param timeoutMS - the length of time in miliiseconds to wait for a lock.
* @param delayMS - the polling interval for the exclusive lock during initialization.
* @returns - the release function to release the shared lock.
*/
public async acquire(timeoutMS: number = 20000, delayMS: number = 100): Promise<() => Promise<void>> {
// ensure we're the only one that can muck with things for now.
const releaseBusy = await this.busyLock.acquire(timeoutMS, delayMS);
try {
// get our personal lock
const releasePersonal = await this.personalLock.acquire();
const activeLocks = await this.readConnections();
activeLocks.push(this.personalLock.options);
await this.writeConnections(activeLocks);
await releaseBusy();
return async () => {
// release our personal lock
await releasePersonal();
// try to remove our name from the list
try {
const releaseBusy = await this.busyLock.acquire(timeoutMS, delayMS);
try {
await this.writeConnections(await this.readConnections());
} finally {
// regardless, release the busy lock!
await releaseBusy();
}
} catch {
// if it fails no, worry, someone else can clean it up.
}
};
} catch (e) {
throw new SharedLockUnavailableException(this.name, timeoutMS);
} finally {
// release the busy lock!
await releaseBusy();
}
}
public get activeLockCount(): Promise<number> {
return (async () => {
return (await this.readConnections()).length;
})();
}
public get isExclusiveLocked(): Promise<boolean> {
return (async () => {
try {
// try to lock it
const release = (await this.exclusive(0));
await release();
return false;
} catch {
}
return true;
})();
}
/**
* Asynchronously acquires an exclusive lock. Will wait for up {@link timeoutMS} milliseconds
*
* Will only permit a lock if there are no other shared locks
*
* @throws ExclusibveLockUnavailableException - if the timeout is reached before the lock can be acquired.
* @param timeoutMS - the length of time in miliiseconds to wait for a lock.
* @param delayMS - the polling interval for the exclusive lock during initialization.
* @returns - the release function to release the exclusive lock.
*/
public async exclusive(timeoutMS: number = 20000, delayMS: number = 100): Promise<() => Promise<void>> {
const busyRelease = await this.busyLock.acquire(timeoutMS, delayMS);
// ensure we're the only one that can muck with things for now.
const exclusiveRelease = await this.exclusiveLock.acquire(timeoutMS, delayMS);
try {
// make sure we're the only one who has an shared lock
const activeLocks = await this.readConnections();
if (activeLocks.length === 0 || (activeLocks.length === 1 && JSON.stringify(activeLocks[0]) === JSON.stringify(this.personalLock.options))) {
return async () => {
await exclusiveRelease();
await busyRelease();
};
}
} catch {
}
// we didn't return the exclusive Lock,
// release it and throw...
await exclusiveRelease();
await busyRelease();
throw new ExclusiveLockUnavailableException(this.name, timeoutMS);
}
}