adding a continue option to p-graph to allow graph to be traversed as far as possible (#6)

* adding a continue option to p-graph to allow graph to be traversed as far as possible

* Change files

* adding fetch-depths fixes

* delete extra lines

* adding some rationale behind why code is place somewhere "unexpected"
This commit is contained in:
Kenneth Chau 2020-10-22 12:43:48 -07:00 коммит произвёл GitHub
Родитель 6daacfc3d5
Коммит dbe6893678
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 110 добавлений и 19 удалений

2
.github/workflows/pr.yml поставляемый
Просмотреть файл

@ -17,6 +17,8 @@ jobs:
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:

1
.github/workflows/release.yml поставляемый
Просмотреть файл

@ -18,6 +18,7 @@ jobs:
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
token: ${{ secrets.repo_pat }}
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1

Просмотреть файл

@ -0,0 +1,8 @@
{
"type": "minor",
"comment": "adding a continue option to p-graph to allow graph to be traversed as far as possible",
"packageName": "p-graph",
"email": "kchau@microsoft.com",
"dependentChangeType": "patch",
"date": "2020-10-22T19:22:53.080Z"
}

Просмотреть файл

@ -12,7 +12,7 @@ export class PGraph {
constructor(nodeMap: PGraphNodeMap, dependencies: DependencyList) {
[...nodeMap.entries()].forEach(([key, node]) => {
this.pGraphDependencyMap.set(key, { ...node, dependsOn: new Set(), dependedOnBy: new Set() });
this.pGraphDependencyMap.set(key, { ...node, dependsOn: new Set(), dependedOnBy: new Set(), failed: false });
});
dependencies.forEach(([subjectId, dependentId]) => {
@ -68,36 +68,74 @@ export class PGraph {
}
const taskToRun = this.pGraphDependencyMap.get(taskToRunId)!;
const taskFnPromise = taskToRun.run();
currentlyRunningTaskCount += 1;
try {
currentlyRunningTaskCount += 1;
await taskFnPromise;
currentlyRunningTaskCount -= 1;
// Let's remove this task from all dependent task's dependency array
taskToRun.dependedOnBy.forEach((dependentId) => {
const dependentNode = this.pGraphDependencyMap.get(dependentId)!;
dependentNode.dependsOn.delete(taskToRunId);
// If the task that just completed was the last remaining dependency for a node, add it to the set of unblocked nodes
if (dependentNode.dependsOn.size === 0) {
priorityQueue.insert(dependentId, nodeCumulativePriorities.get(dependentId)!);
if (!taskToRun.failed) {
await taskToRun.run();
}
});
} catch(e) {
// mark node and its children to be "failed" in the case of continue, we'll traverse, but not run the nodes
taskToRun.failed = true;
throw e;
} finally {
// schedule next round of tasks if options.continue (continue on error) or successfully run task
const shouldScheduleMoreTasks = options?.continue || !taskToRun.failed;
if (shouldScheduleMoreTasks) {
// "currentlyRunningTaskCount" cannot be decremented on non-continue cases because of async nature of
// the queue runner. The race condition will end up appearing as if there was no failures even though
// there was one
currentlyRunningTaskCount -= 1;
// Let's remove this task from all dependent task's dependency array
taskToRun.dependedOnBy.forEach((dependentId) => {
const dependentNode = this.pGraphDependencyMap.get(dependentId)!;
if (taskToRun.failed) {
dependentNode.failed = true;
}
dependentNode.dependsOn.delete(taskToRunId);
// If the task that just completed was the last remaining dependency for a node, add it to the set of unblocked nodes
if (dependentNode.dependsOn.size === 0) {
priorityQueue.insert(dependentId, nodeCumulativePriorities.get(dependentId)!);
}
});
}
}
};
return new Promise((resolve, reject) => {
let errors: Error[] = [];
const trySchedulingTasks = () => {
if (priorityQueue.isEmpty() && currentlyRunningTaskCount === 0) {
// We are done running all tasks, let's resolve the promise done
resolve();
if (errors.length === 0) {
resolve();
} else {
reject(errors);
}
return;
}
while (!priorityQueue.isEmpty() && (concurrency === undefined || currentlyRunningTaskCount < concurrency)) {
scheduleTask()
.then(() => trySchedulingTasks())
.catch((e) => reject(e));
.catch((e) => {
errors.push(e);
// if a continue option is set, this merely records what errors have been encountered
// it'll continue down the execution until all the tasks that still works
if (options?.continue) {
trySchedulingTasks();
} else {
// immediately reject, if not using "continue" option
reject(e);
}
});
}
};

Просмотреть файл

@ -228,9 +228,43 @@ describe("Public API", () => {
["A", "C"],
];
await expect(pGraph(nodeMap, dependencies).run()).rejects.toEqual("C rejected");
await expect(pGraph(nodeMap, dependencies).run()).rejects.toContain("C rejected");
});
it("throws an exception, but continues to run the entire graph", async () => {
const runFns = {
A: jest.fn().mockReturnValue(Promise.resolve()),
B: jest.fn().mockReturnValue(Promise.resolve()),
D: jest.fn().mockReturnValue(Promise.resolve()),
E: jest.fn().mockReturnValue(Promise.resolve()),
F: jest.fn().mockReturnValue(Promise.resolve())
}
const nodeMap: PGraphNodeMap = new Map([
["A", { run: runFns.A }],
["B", { run: runFns.B }],
["C", { run: () => Promise.reject("C rejected") }],
["D", { run: runFns.D }],
["E", { run: runFns.E }],
["F", { run: runFns.F }],
]);
const dependencies: DependencyList = [
["A", "B"],
["A", "C"],
["A", "D"],
["C", "D"],
["A", "E"],
["E", "F"],
];
await expect(pGraph(nodeMap, dependencies).run({concurrency: 1, continue: true})).rejects.toContain("C rejected");
expect(runFns.E).toHaveBeenCalled();
expect(runFns.F).toHaveBeenCalled();
expect(runFns.D).not.toHaveBeenCalled();
});
it("throws when one of the dependencies references a node not in the node map", async () => {
const nodeMap: PGraphNodeMap = new Map([
["A", { run: () => Promise.resolve() }],

Просмотреть файл

@ -28,6 +28,9 @@ export type DependencyList = [string, string][];
export interface RunOptions {
/** The maximum amount of promises that can be executing at the same time. When not provided, we do not limit the number of concurrent tasks and run tasks as soon as they are unblocked */
concurrency?: number;
/** Continues the graph even if there's an rejected task */
continue?: boolean;
}
/**
@ -43,4 +46,9 @@ export interface PGraphNodeWithDependencies extends PGraphNode {
* The set of nodes that cannot start execution until this node has completed execution.
*/
dependedOnBy: Set<string>;
/**
* Flag whether this node is failed or not (if so, skip it and mark its children to be skipped)
*/
failed: boolean;
}