Rewrite P-graph to support concurrency and priorities (#2)

* Add an additional test

* Jest debugging

* Fix readme, add another test for public API

* Remove unused var

* Add new test enforcement

* Add a concurrency test

* Remove scoping

* Add test for concurrency, remove scoping

* New API, update tests to match API

* Add test that fails for cycle

* Add tests for priority, update readme

* Update readme

* New implementation, with test parity of the old implementation

* Fix throws test

* Cycle detection

* Implement priority

* Change files

* Fix bug in mocks, add one more concurrency test

* Actually fix the test

* Fix bug in cycle detection

* Update README.md

Co-authored-by: Kenneth Chau <34725+kenotron@users.noreply.github.com>

* maxConcurrency -> concurrency

* Change files

* Major bump

* Don't use the word concurrency

* Forgot to save

* Add comments to concurrency

* No recursion on cycle calculation

* Unroll recursion for set cumulative order

Co-authored-by: Kenneth Chau <34725+kenotron@users.noreply.github.com>
This commit is contained in:
Christian Gonzalez 2020-07-14 16:57:07 -04:00 коммит произвёл GitHub
Родитель d5d321356f
Коммит 1077e092c2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
14 изменённых файлов: 775 добавлений и 327 удалений

17
.vscode/launch.json поставляемый Normal file
Просмотреть файл

@ -0,0 +1,17 @@
{
// Use IntelliSense to learn about possible Node.js debug attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Debug Jest Tests",
"type": "node",
"request": "launch",
"runtimeArgs": ["--inspect-brk", "${workspaceRoot}/node_modules/jest/bin/jest.js", "--runInBand"],
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen",
"port": 9229
}
]
}

126
README.md
Просмотреть файл

@ -10,125 +10,43 @@ $ npm install p-graph
## Usage
The p-graph library takes in a `graph` argument. To start, create a graph of functions that return promises (let's call them Run Functions), then run them through the pGraph API:
The p-graph library takes in a map of of nodes and a list of dependencies. The keys in the map are unique string identifiers for each node in the graph. The value of the map is the definition of the task, including the function that should be executed by that task in it's run argument. The dependencies list is an array of tuples, each tuple contains the two values that must correspond to ids in the node map. The run function corresponding to the first item in the tuple must complete before the second item in the tuple can begin.
The return value of pGraph is a class with a `run()` function. Calling the `run()` function will return a promise that resolves after all the tasks in the graph have finished completed. Tasks are run in dependency order.
```js
const { default: pGraph } = require("p-graph"); // ES6 import also works: import pGraph from 'p-graph';
const putOnShirt = () => Promise.resolve("put on your shirt");
const putOnShorts = () => Promise.resolve("put on your shorts");
const putOnJacket = () => Promise.resolve("put on your jacket");
const putOnShoes = () => Promise.resolve("put on your shoes");
const tieShoes = () => Promise.resolve("tie your shoes");
const nodeMap = new Map([
["putOnShirt", { run: () => Promise.resolve("put on your shirt") })],
["putOnShorts", { run: () => Promise.resolve("put on your shorts")})],
["putOnJacket", { run: () => Promise.resolve("put on your jacket")})],
["putOnShoes", { run: () => Promise.resolve("put on your shoes")}],
["tieShoes", { run: () => Promise.resolve("tie your shoes")}],
]);
const graph = [
[putOnShoes, tieShoes],
[putOnShirt, putOnJacket],
[putOnShorts, putOnJacket],
[putOnShorts, putOnShoes],
const dependencies: DependencyList = [
// You need to put your shoes on before you tie them!
["putOnShoes", "tieShoes"],
["putOnShirt", "putOnJacket"],
["putOnShorts", "putOnJacket"],
["putOnShorts", "putOnShoes"],
];
await pGraph(graph, { concurrency: 3 }).run(); // returns a promise that will resolve when all the tasks are done from this graph in order
await pGraph(nodeMap, dependencies).run();
```
### Ways to define a graph
## Concurrency Limiter
1. Use a dependency array
There are some contexts where you may want to limit the number of functions running concurrently. One example would be to prevent overloading the CPU with too many parallel tasks. The concurrency argument to `run` will limit the number of functions that start running at a given time. If no concurrency option is set, the concurrency is not limited and tasks are run as soon as they are unblocked.
```js
const putOnShirt = () => Promise.resolve("put on your shirt");
const putOnShorts = () => Promise.resolve("put on your shorts");
const putOnJacket = () => Promise.resolve("put on your jacket");
const putOnShoes = () => Promise.resolve("put on your shoes");
const tieShoes = () => Promise.resolve("tie your shoes");
const graph = [
[putOnShoes, tieShoes],
[putOnShirt, putOnJacket],
[putOnShorts, putOnJacket],
[putOnShorts, putOnShoes],
];
await pGraph(graph);
await pGraph(graph).run({ concurrency: 3 });
```
2. Use a dependency array with a list of named functions
## Priority
```js
const funcs = new Map();
funcs.set("putOnShirt", () => Promise.resolve("put on your shirt"));
funcs.set("putOnShorts", () => Promise.resolve("put on your shorts"));
funcs.set("putOnJacket", () => Promise.resolve("put on your jacket"));
funcs.set("putOnShoes", () => Promise.resolve("put on your shoes"));
funcs.set("tieShoes", () => Promise.resolve("tie your shoes"));
const graph = [
[putOnShoes, tieShoes],
[putOnShirt, putOnJacket],
[putOnShorts, putOnJacket],
[putOnShorts, putOnShoes],
];
await pGraph(namedFunctions, graph);
```
3. Use a dependency map with a list of named functions
```js
const funcs = new Map();
funcs.set("putOnShirt", () => Promise.resolve("put on your shirt"));
funcs.set("putOnShorts", () => Promise.resolve("put on your shorts"));
funcs.set("putOnJacket", () => Promise.resolve("put on your jacket"));
funcs.set("putOnShoes", () => Promise.resolve("put on your shoes"));
funcs.set("tieShoes", () => Promise.resolve("tie your shoes"));
const depMap = new Map();
depMap.set(tieShoes, new Set([putOnShoes]));
depMap.set(putOnJacket, new Set([putOnShirt, putOnShorts]));
depMap.set(putOnShoes, new Set([putOnShorts]));
depMap.set(putOnShorts, new Set());
depMap.set(putOnShirt, new Set());
await pGraph(namedFunctions, graph);
```
### Using the ID as argument to the same function
In many cases, the jobs that need to run are the same where the only difference is the arguments for the function. In that case, you can treat the IDs as arguments as they are passed into the Run Function.
```ts
type Id = unknown;
type RunFunction = (id: Id) => Promise<unknown>;
```
As you can see, the ID can be anything. It will be passed as the argument for your Run Function. This is a good option if having a large number of functions inside a graph is prohibitive in memory sensitive situations.
```js
const funcs = new Map();
const thatImportantTask = (id) => Promise.resolve(id);
funcs.set("putOnShirt", thatImportantTask);
funcs.set("putOnShorts", thatImportantTask);
funcs.set("putOnJacket", thatImportantTask);
funcs.set("putOnShoes", thatImportantTask);
funcs.set("tieShoes", thatImportantTask);
```
## Scopes and filtering
After a graph are sent to the `pGraph` function, the graph is executed with the `run()` function. The `run()` takes in an argument that lets you filter which tasks to end. This allows you to run tasks up to a certain point in the graph.
```js
// graph is one of the three options up top
// depMap is the dependency map where the key is the ID for the Run Function
// - the ID CAN be the Run Function itself if graph is specified as the dependency array format
await pGraph(graph).run((depMap) => {
return [...depMap.keys()].filter((id) => id.startsWith("b"));
});
```
There are situations where task runner must pick a subset of the currently unblocked tasks to put on the queue. By default, tasks are considered to all be equally important and equally likely to be picked to run once all the tasks they depend on are complete. If you wish to control the ordering of tasks, consider using the priority option when defining a task node. When the task scheduler is picking tasks to run, it will favor tasks with a higher priority over tasks with a lower priority. Tasks will always execute in dependency order.
# Contributing

Просмотреть файл

@ -0,0 +1,8 @@
{
"type": "major",
"comment": "Add support for maxConcurrency and allow for setting execution priorities on functions",
"packageName": "p-graph",
"email": "1581488+christiango@users.noreply.github.com",
"dependentChangeType": "patch",
"date": "2020-07-14T18:45:49.739Z"
}

4
prettier.config.js Normal file
Просмотреть файл

@ -0,0 +1,4 @@
module.exports = {
printWidth: 140,
tabWidth: 2
};

Просмотреть файл

@ -1,49 +1,122 @@
import { NamedFunctions, DepGraphMap, ScopeFunction, Id } from "./types";
import { RunOptions, PGraphNodeMap, DependencyList, PGraphNodeWithDependencies } from "./types";
import { PriorityQueue } from "./PriorityQueue";
import { getNodeCumulativePriorities } from "./getNodeCumulativePriorities";
import { graphHasCycles } from "./graphHasCycles";
export class PGraph {
private promises: Map<Id, Promise<unknown>> = new Map();
private readonly pGraphDependencyMap = new Map<string, PGraphNodeWithDependencies>();
namedFunctions: NamedFunctions;
graph: DepGraphMap;
/**
* Tracks all the nodes that are ready to be executed since it is not depending on the results of any non completed tasks.
*/
private readonly nodesWithNoDependencies: string[];
constructor(namedFunctions, graph: DepGraphMap) {
this.namedFunctions = namedFunctions;
this.graph = graph;
constructor(nodeMap: PGraphNodeMap, dependencies: DependencyList) {
[...nodeMap.entries()].forEach(([key, node]) => {
this.pGraphDependencyMap.set(key, { ...node, dependsOn: new Set(), dependedOnBy: new Set() });
});
this.promises = new Map();
dependencies.forEach(([subjectId, dependentId]) => {
const subjectNode = this.pGraphDependencyMap.get(subjectId);
const dependentNode = this.pGraphDependencyMap.get(dependentId);
if (!subjectNode) {
throw new Error(`Dependency graph referenced node with id ${subjectId}, which was not in the node list`);
}
if (!dependentNode) {
throw new Error(`Dependency graph referenced node with id ${dependentId}, which was not in the node list`);
}
subjectNode.dependedOnBy.add(dependentId);
dependentNode.dependsOn.add(subjectId);
});
this.nodesWithNoDependencies = getNodesWithNoDependencies(this.pGraphDependencyMap);
if (this.nodesWithNoDependencies.length == 0 && nodeMap.size > 0) {
throw new Error("We could not find a node in the graph with no dependencies, this likely means there is a cycle including all nodes");
}
if (graphHasCycles(this.pGraphDependencyMap, this.nodesWithNoDependencies)) {
throw new Error("The dependency graph has a cycle in it");
}
}
/**
* Runs the promise graph with scoping
* @param scope
* Runs all the tasks in the promise graph in dependency order
* @param options - An optional configuration for running the tasks
*/
run(scope?: ScopeFunction) {
const scopedPromises = scope
? scope(this.graph).map((id) => this.execute(id))
: [...this.graph.keys()].map((id) => this.execute(id));
run(options?: RunOptions): Promise<void> {
const concurrency = options?.concurrency;
return Promise.all(scopedPromises);
}
private execute(id: Id) {
if (this.promises.has(id)) {
return this.promises.get(id);
if (concurrency !== undefined && concurrency < 0) {
throw new Error(`concurrency must be either undefined or a positive integer, received ${options?.concurrency}`);
}
let execPromise: Promise<unknown> = Promise.resolve();
const nodeCumulativePriorities = getNodeCumulativePriorities(this.pGraphDependencyMap, this.nodesWithNoDependencies);
const priorityQueue = new PriorityQueue<string>();
const deps = this.graph.get(id);
this.nodesWithNoDependencies.forEach((itemId) => priorityQueue.insert(itemId, nodeCumulativePriorities.get(itemId)!));
if (deps) {
execPromise = execPromise.then(() =>
Promise.all([...deps].map((depId) => this.execute(depId)))
);
}
let currentlyRunningTaskCount = 0;
execPromise = execPromise.then(() => this.namedFunctions.get(id)(id));
const scheduleTask = async () => {
const taskToRunId = priorityQueue.removeMax();
this.promises.set(id, execPromise);
if (!taskToRunId) {
throw new Error("Tried to schedule a task when there were no pending tasks!");
}
const taskToRun = this.pGraphDependencyMap.get(taskToRunId)!;
return execPromise;
const taskFnPromise = taskToRun.run();
currentlyRunningTaskCount += 1;
await taskFnPromise;
currentlyRunningTaskCount -= 1;
// Let's remove this task from all dependent task's dependency array
taskToRun.dependedOnBy.forEach((dependentId) => {
const dependentNode = this.pGraphDependencyMap.get(dependentId)!;
dependentNode.dependsOn.delete(taskToRunId);
// If the task that just completed was the last remaining dependency for a node, add it to the set of unblocked nodes
if (dependentNode.dependsOn.size === 0) {
priorityQueue.insert(dependentId, nodeCumulativePriorities.get(dependentId)!);
}
});
};
return new Promise((resolve, reject) => {
const trySchedulingTasks = () => {
if (priorityQueue.isEmpty() && currentlyRunningTaskCount === 0) {
// We are done running all tasks, let's resolve the promise done
resolve();
return;
}
while (!priorityQueue.isEmpty() && (concurrency === undefined || currentlyRunningTaskCount < concurrency)) {
scheduleTask()
.then(() => trySchedulingTasks())
.catch((e) => reject(e));
}
};
trySchedulingTasks();
});
}
}
/**
* Given a pGraphDependency map, return the ids of all the nodes that do not have any dependencies.
*/
function getNodesWithNoDependencies(pGraphDependencyMap: Map<string, PGraphNodeWithDependencies>): string[] {
const result: string[] = [];
[...pGraphDependencyMap.entries()].forEach(([key, node]) => {
if (node.dependsOn.size === 0) {
result.push(key);
}
});
return result;
}

61
src/PriorityQueue.ts Normal file
Просмотреть файл

@ -0,0 +1,61 @@
interface PriorityQueueItem<T> {
item: T;
priority: number;
}
export class PriorityQueue<T> {
private array: PriorityQueueItem<T>[] = [];
private swapElements(firstIndex: number, secondIndex: number) {
const firstItem = this.array[firstIndex];
const secondItem = this.array[secondIndex];
this.array[firstIndex] = secondItem;
this.array[secondIndex] = firstItem;
}
public isEmpty() {
return this.array.length === 0;
}
public insert(item: T, priority: number) {
// The index to check to ensure the max heap property is preserved
let indexToCheck = this.array.length;
this.array.push({ item, priority });
while (indexToCheck > 0) {
const parentIndex = Math.floor((indexToCheck - 1) / 2);
if (this.array[indexToCheck].priority > this.array[parentIndex].priority) {
this.swapElements(indexToCheck, parentIndex);
indexToCheck = parentIndex;
} else {
break;
}
}
}
public removeMax(): T | undefined {
const result = this.array.shift();
let indexToCheck = 0;
while (indexToCheck < this.array.length) {
const leftChildIndex = indexToCheck * 2 + 1;
const rightChildIndex = indexToCheck * 2 + 2;
const biggerIndex = this.array[leftChildIndex] > this.array[rightChildIndex] ? leftChildIndex : rightChildIndex;
if (this.array[biggerIndex] > this.array[indexToCheck]) {
this.swapElements(indexToCheck, biggerIndex);
indexToCheck = biggerIndex;
} else {
break;
}
}
return result?.item;
}
}

Просмотреть файл

@ -1,48 +0,0 @@
import { PGraph } from "../PGraph";
import { NamedFunctions, DepGraphMap } from "../types";
describe("PGraph", () => {
it("should allow a full graph to be created", async () => {
const fns: NamedFunctions = new Map();
const graph: DepGraphMap = new Map();
const mockFn = jest.fn((id) => Promise.resolve());
fns.set("fn1", mockFn);
fns.set("fn2", mockFn);
graph.set("fn1", new Set(["fn2"]));
const pGraph = new PGraph(fns, graph);
await pGraph.run();
expect(mockFn).toHaveBeenCalledTimes(2);
expect(mockFn).toHaveBeenNthCalledWith(1, "fn2");
expect(mockFn).toHaveBeenNthCalledWith(2, "fn1");
});
it("should throw when one of the promises threw", async () => {
const fns: NamedFunctions = new Map();
const graph: DepGraphMap = new Map();
const mockFn = jest.fn((id) => Promise.resolve());
const failFn = jest.fn((id) => {
throw new Error("expected failure");
});
fns.set("fn1", mockFn);
fns.set("fn2", mockFn);
fns.set("fail", failFn);
graph.set("fn1", new Set(["fn2", "fail"]));
const pGraph = new PGraph(fns, graph);
expect.assertions(1);
try {
await pGraph.run();
} catch (e) {
expect(e).toBeTruthy();
}
});
});

Просмотреть файл

@ -1,78 +1,421 @@
import pGraph from "../index";
import { DepGraphArray } from "../types";
import { PGraphNodeMap, PGraphNode, DependencyList } from "../types";
interface MockFunctionDefinition {
/** A friendly name for the function */
name: string;
/** How many ticks this function should take to simulate the duration of the function execution */
duration: number;
/** Priority value to pass to the PGraphNode that is crated */
priority?: number;
}
/** A record of a function start or end event that can be composed to create an ordered log of function calls **/
interface MockFunctionCallRecord {
/** The name of the function */
name: string;
/** Denotes if this is when the function started or ended execution */
state: "start" | "end";
}
class FunctionScheduler {
private currentlyRunningFunctions: { name: string; ticksRemaining: number; resolve: () => void }[] = [];
private tickScheduled: boolean = false;
public callRecords: MockFunctionCallRecord[] = [];
public startExecutingFunction(definition: MockFunctionDefinition): Promise<unknown> {
const { name, duration } = definition;
this.callRecords.push({ name, state: "start" });
const promise = new Promise((resolve) => {
this.currentlyRunningFunctions.push({ name, ticksRemaining: duration, resolve });
});
this.ensureTickScheduled();
return promise;
}
private ensureTickScheduled() {
if (!this.tickScheduled) {
Promise.resolve().then(() => this.tick());
this.tickScheduled = true;
}
}
private tick() {
this.tickScheduled = false;
this.currentlyRunningFunctions.forEach((item) => {
item.ticksRemaining = item.ticksRemaining - 1;
});
const finishedItems = this.currentlyRunningFunctions.filter((item) => item.ticksRemaining === 0);
this.currentlyRunningFunctions = this.currentlyRunningFunctions.filter((item) => item.ticksRemaining !== 0);
if (finishedItems.length > 0) {
finishedItems.forEach((item) => {
item.resolve();
this.callRecords.push({ name: item.name, state: "end" });
});
}
if (this.currentlyRunningFunctions.length > 0) {
this.ensureTickScheduled();
}
}
}
function defineMockNode(definition: MockFunctionDefinition, functionScheduler: FunctionScheduler): [string, PGraphNode] {
return [definition.name, { run: () => functionScheduler.startExecutingFunction(definition), priority: definition.priority }];
}
declare global {
namespace jest {
interface Matchers<R> {
/**
* Enforces that a particular schedule does not schedule secondTaskName until firstTaskName is complete
*/
toHaveScheduleOrdering(firstTaskName: string, secondTaskName: string): R;
/**
* Enforces that a particular schedule does not schedule secondTaskName to start before firstTaskName has started
*/
toHaveStartedBefore(firstTaskName: string, secondTaskName: string): R;
/**
* Enforces that a specific task was executed
*/
toHaveScheduledTask(taskName: string): R;
}
}
}
expect.extend({
toHaveScheduleOrdering(callRecords: MockFunctionCallRecord[], firstTaskName: string, secondTaskName: string) {
const firstIndex = callRecords.findIndex((item) => item.name === firstTaskName && item.state === "end");
const secondIndex = callRecords.findIndex((item) => item.name === secondTaskName && item.state === "start");
const pass = firstIndex !== -1 && secondIndex !== -1 && firstIndex < secondIndex;
return {
message: () => `expected ${secondTaskName} to be scheduled after ${firstTaskName} completed`,
pass,
};
},
toHaveStartedBefore(callRecords: MockFunctionCallRecord[], firstTaskName: string, secondTaskName: string) {
const firstIndex = callRecords.findIndex((item) => item.name === firstTaskName && item.state === "start");
const secondIndex = callRecords.findIndex((item) => item.name === secondTaskName && item.state === "start");
const pass = firstIndex !== -1 && secondIndex !== -1 && firstIndex < secondIndex;
return {
message: () => `expected ${secondTaskName} to be started after ${firstTaskName} has started`,
pass,
};
},
toHaveScheduledTask(callRecords: MockFunctionCallRecord[], taskName: string) {
const startIndex = callRecords.findIndex((item) => item.name === taskName && item.state === "end");
const endIndex = callRecords.findIndex((item) => item.name === taskName && item.state === "start");
const pass = startIndex !== -1 && endIndex !== -1;
return {
message: () => `expected to have scheduled task ${taskName}`,
pass,
};
},
});
const computeMaxConcurrency = (callRecords: MockFunctionCallRecord[]) => {
let currentConcurrency = 0;
let maxConcurrencySoFar = 0;
callRecords.forEach((record) => {
currentConcurrency += record.state === "start" ? 1 : -1;
maxConcurrencySoFar = Math.max(currentConcurrency, maxConcurrencySoFar);
});
return maxConcurrencySoFar;
};
describe("Public API", () => {
let calls = [];
it("should accept the dependency graph and execute tasks in order", async () => {
const functionScheduler = new FunctionScheduler();
// Example graph from: https://www.npmjs.com/package/toposort
const putOnShirt = () =>
Promise.resolve("put on your shirt").then((v) => {
calls.push(v);
});
const putOnShorts = () =>
Promise.resolve("put on your shorts").then((v) => {
calls.push(v);
});
const putOnJacket = () =>
Promise.resolve("put on your jacket").then((v) => {
calls.push(v);
});
const putOnShoes = () =>
Promise.resolve("put on your shoes").then((v) => {
calls.push(v);
});
const tieShoes = () =>
Promise.resolve("tie your shoes").then((v) => {
calls.push(v);
});
beforeEach(() => {
calls = [];
});
it("should accept an array dep graph", async () => {
const graph: DepGraphArray = [
[putOnShoes, tieShoes],
[putOnShirt, putOnJacket],
[putOnShorts, putOnJacket],
[putOnShorts, putOnShoes],
];
await pGraph(graph).run();
expect(calls).toEqual([
"put on your shirt",
"put on your shorts",
"put on your jacket",
"put on your shoes",
"tie your shoes",
const nodeMap: PGraphNodeMap = new Map([
defineMockNode({ name: "putOnShirt", duration: 1 }, functionScheduler),
defineMockNode({ name: "putOnShorts", duration: 1 }, functionScheduler),
defineMockNode({ name: "putOnJacket", duration: 1 }, functionScheduler),
defineMockNode({ name: "putOnShoes", duration: 1 }, functionScheduler),
defineMockNode({ name: "tieShoes", duration: 1 }, functionScheduler),
]);
});
it("should accept an array dep graph", async () => {
const graph: DepGraphArray = [
[putOnShoes, tieShoes],
[putOnShirt, putOnJacket],
[putOnShorts, putOnJacket],
[putOnShorts, putOnShoes],
const dependencies: DependencyList = [
["putOnShoes", "tieShoes"],
["putOnShirt", "putOnJacket"],
["putOnShorts", "putOnJacket"],
["putOnShorts", "putOnShoes"],
];
await pGraph(graph).run();
await pGraph(nodeMap, dependencies).run();
expect(calls.indexOf("tie your shoes")).toBeGreaterThan(
calls.indexOf("put on your shoes")
);
const { callRecords } = functionScheduler;
expect(callRecords).toHaveScheduleOrdering("putOnShoes", "tieShoes");
expect(callRecords).toHaveScheduleOrdering("putOnShirt", "putOnJacket");
expect(callRecords).toHaveScheduleOrdering("putOnShorts", "putOnJacket");
expect(callRecords).toHaveScheduleOrdering("putOnShorts", "putOnShoes");
});
expect(calls.indexOf("put on your jacket")).toBeGreaterThan(
calls.indexOf("put on your shirt")
);
it("throws an exception when the dependency graph has a cycle starting from the root", async () => {
const nodeMap: PGraphNodeMap = new Map([
["A", { run: () => Promise.resolve() }],
["B", { run: () => Promise.resolve() }],
["C", { run: () => Promise.resolve() }],
]);
expect(calls.indexOf("put on your jacket")).toBeGreaterThan(
calls.indexOf("put on your shorts")
);
const dependencies: DependencyList = [
["A", "B"],
["B", "C"],
["C", "A"],
];
expect(calls.indexOf("put on your shoes")).toBeGreaterThan(
calls.indexOf("put on your shorts")
);
expect(() => pGraph(nodeMap, dependencies)).toThrow();
});
it("throws an exception when the dependency graph has a cycle", async () => {
// This is almost the same as the last test, except the root node is not a part of the cycle
const nodeMap: PGraphNodeMap = new Map([
["A", { run: () => Promise.resolve() }],
["B", { run: () => Promise.resolve() }],
["C", { run: () => Promise.resolve() }],
["D", { run: () => Promise.resolve() }],
]);
const dependencies: DependencyList = [
["A", "B"],
["B", "C"],
["C", "D"],
["D", "B"],
];
expect(() => pGraph(nodeMap, dependencies)).toThrow();
});
it("resolves an empty dependnecy graph", async () => {
const nodeMap: PGraphNodeMap = new Map();
const dependencies: DependencyList = [];
expect(pGraph(nodeMap, dependencies).run()).resolves.toBeUndefined();
});
it("throws an exception when run is invoked and a task rejects its promise", async () => {
const nodeMap: PGraphNodeMap = new Map([
["A", { run: () => Promise.resolve() }],
["B", { run: () => Promise.resolve() }],
["C", { run: () => Promise.reject("C rejected") }],
]);
// A
// B C
const dependencies: DependencyList = [
["A", "B"],
["A", "C"],
];
await expect(pGraph(nodeMap, dependencies).run()).rejects.toEqual("C rejected");
});
it("throws when one of the dependencies references a node not in the node map", async () => {
const nodeMap: PGraphNodeMap = new Map([
["A", { run: () => Promise.resolve() }],
["B", { run: () => Promise.resolve() }],
]);
// A
// B C
const dependencies: DependencyList = [
["A", "B"],
["A", "C"],
];
expect(() => pGraph(nodeMap, dependencies)).toThrow();
});
it("should run all dependencies for disconnected graphs", async () => {
const functionScheduler = new FunctionScheduler();
const nodeMap: PGraphNodeMap = new Map([
defineMockNode({ name: "A", duration: 1 }, functionScheduler),
defineMockNode({ name: "B", duration: 1 }, functionScheduler),
defineMockNode({ name: "C", duration: 1 }, functionScheduler),
defineMockNode({ name: "D", duration: 1 }, functionScheduler),
]);
// A D
// B C
const dependencies: DependencyList = [
["A", "B"],
["A", "C"],
];
await pGraph(nodeMap, dependencies).run();
const { callRecords } = functionScheduler;
expect(callRecords).toHaveScheduledTask("A");
expect(callRecords).toHaveScheduledTask("B");
expect(callRecords).toHaveScheduledTask("C");
expect(callRecords).toHaveScheduledTask("D");
});
it("should be able to run more than one task at a time", async () => {
const functionScheduler = new FunctionScheduler();
const nodeMap: PGraphNodeMap = new Map([
defineMockNode({ name: "A", duration: 1 }, functionScheduler),
defineMockNode({ name: "B", duration: 1 }, functionScheduler),
defineMockNode({ name: "C", duration: 1 }, functionScheduler),
]);
// A
// B C
const dependencies: DependencyList = [
["A", "B"],
["A", "C"],
];
await pGraph(nodeMap, dependencies).run();
// B and C should run concurrently
expect(computeMaxConcurrency(functionScheduler.callRecords)).toEqual(2);
});
it("should not exceed maximum concurrency", async () => {
const functionScheduler = new FunctionScheduler();
const funcs = new Map([
defineMockNode({ name: "A", duration: 1 }, functionScheduler),
defineMockNode({ name: "B", duration: 1 }, functionScheduler),
defineMockNode({ name: "C", duration: 1 }, functionScheduler),
defineMockNode({ name: "D", duration: 1 }, functionScheduler),
defineMockNode({ name: "E", duration: 1 }, functionScheduler),
]);
// A
// B C D E
const dependencies: DependencyList = [
["A", "B"],
["A", "C"],
["A", "D"],
["A", "E"],
];
await pGraph(funcs, dependencies).run({ concurrency: 3 });
expect(computeMaxConcurrency(functionScheduler.callRecords)).toBeLessThanOrEqual(3);
});
it("correctly schedules tasks that have more than one dependency", async () => {
const functionScheduler = new FunctionScheduler();
const funcs = new Map([
defineMockNode({ name: "A", duration: 1 }, functionScheduler),
defineMockNode({ name: "B", duration: 1 }, functionScheduler),
defineMockNode({ name: "C", duration: 1 }, functionScheduler),
defineMockNode({ name: "D", duration: 1 }, functionScheduler),
defineMockNode({ name: "E", duration: 1 }, functionScheduler),
]);
// All nodes depend on A, D depends on C and B as well
const dependencies: DependencyList = [
["A", "B"],
["A", "C"],
["A", "D"],
["A", "E"],
["C", "D"],
["B", "D"],
];
await pGraph(funcs, dependencies).run();
expect(functionScheduler.callRecords).toHaveScheduleOrdering("A", "B");
expect(functionScheduler.callRecords).toHaveScheduleOrdering("A", "C");
expect(functionScheduler.callRecords).toHaveScheduleOrdering("A", "D");
expect(functionScheduler.callRecords).toHaveScheduleOrdering("A", "E");
expect(functionScheduler.callRecords).toHaveScheduleOrdering("B", "D");
expect(functionScheduler.callRecords).toHaveScheduleOrdering("C", "D");
});
it("should schedule high priority tasks and dependencies before lower priority tasks", async () => {
const functionScheduler = new FunctionScheduler();
const funcs = new Map([
defineMockNode({ name: "A", duration: 1 }, functionScheduler),
defineMockNode({ name: "B", duration: 1 }, functionScheduler),
defineMockNode({ name: "C", duration: 1 }, functionScheduler),
defineMockNode({ name: "D", duration: 1 }, functionScheduler),
defineMockNode({ name: "E", duration: 1 }, functionScheduler),
defineMockNode({ name: "F", duration: 1, priority: 16 }, functionScheduler),
]);
// A
// B C D
// |E F|
const dependencies: DependencyList = [
["A", "B"],
["A", "C"],
["A", "D"],
["C", "E"],
["C", "F"],
];
// Set concurrency to 1 to make it easier to validate execution order
await pGraph(funcs, dependencies).run({ concurrency: 1 });
// A -> C -> F is the critical path, it should be built first
expect(functionScheduler.callRecords).toHaveScheduleOrdering("C", "B");
expect(functionScheduler.callRecords).toHaveScheduleOrdering("C", "D");
expect(functionScheduler.callRecords).toHaveScheduleOrdering("F", "E");
expect(functionScheduler.callRecords).toHaveScheduleOrdering("F", "B");
expect(functionScheduler.callRecords).toHaveScheduleOrdering("F", "D");
});
it("should schedule high priority tasks and dependencies before lower priority tasks when maxConcurrency is greater than 1", async () => {
const functionScheduler = new FunctionScheduler();
const funcs = new Map([
defineMockNode({ name: "A", duration: 1 }, functionScheduler),
defineMockNode({ name: "B", duration: 16, priority: 16 }, functionScheduler),
defineMockNode({ name: "C", duration: 4, priority: 4 }, functionScheduler),
defineMockNode({ name: "D", duration: 4, priority: 4 }, functionScheduler),
defineMockNode({ name: "E", duration: 12, priority: 12 }, functionScheduler),
defineMockNode({ name: "F", duration: 16, priority: 16 }, functionScheduler),
]);
// A
// B C D
// |E F|
const dependencies: DependencyList = [
["A", "B"],
["A", "C"],
["A", "D"],
["C", "E"],
["C", "F"],
];
// Set concurrency to 1 to make it easier to validate execution order
await pGraph(funcs, dependencies).run({ concurrency: 2 });
// A -> C -> F is the critical path, it should be built first
expect(computeMaxConcurrency(functionScheduler.callRecords)).toBeLessThanOrEqual(2);
expect(functionScheduler.callRecords).toHaveStartedBefore("C", "B");
expect(functionScheduler.callRecords).toHaveStartedBefore("C", "D");
expect(functionScheduler.callRecords).toHaveStartedBefore("B", "D");
expect(functionScheduler.callRecords).toHaveStartedBefore("F", "E");
});
});

Просмотреть файл

@ -1,31 +0,0 @@
import {
DepGraphArray,
NamedFunctions,
DepGraphMap,
RunFunction,
} from "./types";
export function depArrayToNamedFunctions(array: DepGraphArray) {
const namedFunctions: NamedFunctions = new Map();
// dependant depends on subject (Child depends on Parent means Child is dependent, Parent is subject)
for (const [subject, dependent] of array) {
namedFunctions.set(subject, subject as RunFunction);
namedFunctions.set(dependent, dependent as RunFunction);
}
return namedFunctions;
}
export function depArrayToMap(array: DepGraphArray) {
const graph: DepGraphMap = new Map();
// dependant depends on subject (Child depends on Parent means Child is dependent, Parent is subject)
for (const [subjectId, dependentId] of array) {
if (!graph.has(dependentId)) {
graph.set(dependentId, new Set([subjectId]));
} else {
graph.get(dependentId).add(subjectId);
}
}
return graph;
}

Просмотреть файл

@ -0,0 +1,74 @@
import { PGraphNodeWithDependencies } from "./types";
/** Creates a map of node ids to a set of all the nodes this node depends on. This creates a new copy of the set to enable duplication */
function getNewDependsOnMap(pGraphDependencyMap: Map<string, PGraphNodeWithDependencies>): Map<string, Set<string>> {
return new Map([...pGraphDependencyMap.entries()].map(([key, value]) => [key, new Set(value.dependsOn)]));
}
function topologicalSort(
pGraphDependencyMap: Map<string, PGraphNodeWithDependencies>,
nodesWithNoDependencies: readonly string[]
): string[] {
const sortedList: string[] = [];
const dependsOnMap = getNewDependsOnMap(pGraphDependencyMap);
const nodesWithNoDependenciesClone = [...nodesWithNoDependencies];
while (nodesWithNoDependenciesClone.length > 0) {
const currentId = nodesWithNoDependenciesClone.pop()!;
sortedList.push(currentId);
const node = pGraphDependencyMap.get(currentId)!;
// Update the depends on maps of all outgoing edges
node.dependedOnBy.forEach((childId) => {
const childNode = dependsOnMap.get(childId)!;
childNode.delete(currentId);
// If this item is now unblocked, put it on the unblocked list
if (childNode.size === 0) {
nodesWithNoDependenciesClone.push(childId);
}
});
}
return sortedList;
}
/**
* Returns a JS map that has the "cumulative" priority for each node, which is defined as the priority of the current node plus the maximum cumulative priority amongst all children.
* This is helpful for identifying which nodes to schedule first in order to get to higher priority nodes more quickly.
*/
export function getNodeCumulativePriorities(
pGraphDependencyMap: Map<string, PGraphNodeWithDependencies>,
nodesWithNoDependencies: string[]
): Map<string, number> {
const nodeCumulativePriorities = new Map<string, number>();
const stack = topologicalSort(pGraphDependencyMap, nodesWithNoDependencies);
while (stack.length > 0) {
const currentNodeId = stack.pop()!;
const node = pGraphDependencyMap.get(currentNodeId)!;
// The default priority for a node is zero
const currentNodePriority = node.priority || 0;
const maxChildCumulativePriority = Math.max(
...[...node.dependedOnBy.keys()].map((childId) => {
const childCumulativePriority = nodeCumulativePriorities.get(childId);
if (childCumulativePriority === undefined) {
throw new Error(`Expected to have already computed the cumulative priority for node ${childId}`);
}
return childCumulativePriority;
}),
0
);
const result = currentNodePriority + maxChildCumulativePriority;
nodeCumulativePriorities.set(currentNodeId, result);
}
return nodeCumulativePriorities;
}

26
src/graphHasCycles.ts Normal file
Просмотреть файл

@ -0,0 +1,26 @@
import { PGraphNodeWithDependencies } from "./types";
/**
* Checks for any cycles in the dependency graph, returning false if no cycles were detected.
*/
export function graphHasCycles(pGraphDependencyMap: Map<string, PGraphNodeWithDependencies>, nodesWithNoDependencies: string[]): boolean {
const stack: { nodeId: string; visitedNodes: Set<string> }[] = [];
nodesWithNoDependencies.forEach((root) => stack.push({ nodeId: root, visitedNodes: new Set() }));
while (stack.length > 0) {
const { nodeId, visitedNodes } = stack.pop()!;
// If we have already seen this node, we've found a cycle
if (visitedNodes.has(nodeId)) {
return true;
}
visitedNodes.add(nodeId);
const node = pGraphDependencyMap.get(nodeId)!;
[...node.dependedOnBy.keys()].forEach((childId) => stack.push({ nodeId: childId, visitedNodes: new Set(visitedNodes) }));
}
return false;
}

Просмотреть файл

@ -1,47 +1,8 @@
import { DepGraphArray, NamedFunctions, DepGraphMap } from "./types";
import { PGraph } from "./PGraph";
import { depArrayToNamedFunctions, depArrayToMap } from "./depConverters";
import { PGraphNodeMap, DependencyList } from "./types";
function pGraph(namedFunctions: NamedFunctions, graph: DepGraphMap);
function pGraph(namedFunctions: NamedFunctions, graph: DepGraphArray);
function pGraph(graph: DepGraphArray);
function pGraph(...args: any[]) {
if (args.length < 1 || args.length > 2) {
throw new Error("Incorrect number of arguments");
}
let namedFunctions: NamedFunctions;
let graph: DepGraphMap;
if (args.length === 1) {
if (!Array.isArray(args[0])) {
throw new Error(
"Unexpected graph definition format. Expecting graph in the form of [()=>Promise, ()=>Promise][]"
);
}
const depArray = args[0] as DepGraphArray;
namedFunctions = depArrayToNamedFunctions(depArray);
graph = depArrayToMap(depArray);
} else if (args.length === 2) {
if (Array.isArray(args[0])) {
const depArray = args[0] as DepGraphArray;
namedFunctions = depArrayToNamedFunctions(depArray);
graph = depArrayToMap(depArray);
} else if (args[0] instanceof Map && Array.isArray(args[1])) {
const depArray = args[1] as DepGraphArray;
namedFunctions = args[0];
graph = depArrayToMap(depArray);
} else if (args[0] instanceof Map && args[1] instanceof Map) {
namedFunctions = args[0];
graph = args[1];
} else {
throw new Error("Unexpected arguments");
}
}
return new PGraph(namedFunctions, graph);
function pGraph(nodeMap: PGraphNodeMap, dependencies: DependencyList) {
return new PGraph(nodeMap, dependencies);
}
export default pGraph;

Просмотреть файл

@ -1,6 +1,46 @@
export type RunFunction = (id: Id) => Promise<unknown>;
export type Id = string | number | RunFunction;
export type NamedFunctions = Map<Id, RunFunction>;
export type DepGraphMap = Map<Id, Set<Id>>;
export type ScopeFunction = (graph: DepGraphMap) => Id[];
export type DepGraphArray = [Id, Id][];
/**
* A description of a node in p-graph
*/
export interface PGraphNode {
/** The function that will be executed for this graph node */
run: () => Promise<unknown>;
/**
* A priority to help the scheduler decide which tasks to pick when many are available to run.
* Default value is zero
*/
priority?: number;
}
/**
* Defines the set of p-graph nodes, with each key in this map representing a unique identifier for the node
*/
export type PGraphNodeMap = Map<string, PGraphNode>;
/**
* Describes a dependency between two nodes in the p-graph. For each tuple in the array, the first task must complete before the second one begins
*/
export type DependencyList = [string, string][];
/**
* The optional arguments to pass to the run function
*/
export interface RunOptions {
/** The maximum amount of promises that can be executing at the same time. When not provided, we do not limit the number of concurrent tasks and run tasks as soon as they are unblocked */
concurrency?: number;
}
/**
* An internally used representation of the dependency graph nodes that includes all nodes that this node depends on plus all the nodes that depend on this node.
*/
export interface PGraphNodeWithDependencies extends PGraphNode {
/**
* The set of nodes that this node depends on. This node should not be executed until all the nodes in this list have been executed to completion.
*/
dependsOn: Set<string>;
/**
* The set of nodes that cannot start execution until this node has completed execution.
*/
dependedOnBy: Set<string>;
}

Просмотреть файл

@ -7,7 +7,9 @@
"esModuleInterop": true,
"allowSyntheticDefaultImports": true,
"lib": ["ES2017"],
"outDir": "lib"
"outDir": "lib",
"noUnusedLocals": true,
"strict": true
},
"include": ["src"]
}