From dbe68936785af82610b1a2dafefad0c87e7ea3b3 Mon Sep 17 00:00:00 2001 From: Kenneth Chau <34725+kenotron@users.noreply.github.com> Date: Thu, 22 Oct 2020 12:43:48 -0700 Subject: [PATCH] adding a continue option to p-graph to allow graph to be traversed as far as possible (#6) * adding a continue option to p-graph to allow graph to be traversed as far as possible * Change files * adding fetch-depths fixes * delete extra lines * adding some rationale behind why code is place somewhere "unexpected" --- .github/workflows/pr.yml | 2 + .github/workflows/release.yml | 1 + .../p-graph-2020-10-22-12-22-53-continue.json | 8 ++ src/PGraph.ts | 74 ++++++++++++++----- src/__tests__/index.tests.ts | 36 ++++++++- src/types.ts | 8 ++ 6 files changed, 110 insertions(+), 19 deletions(-) create mode 100644 change/p-graph-2020-10-22-12-22-53-continue.json diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 6e94242..4571839 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -17,6 +17,8 @@ jobs: steps: - uses: actions/checkout@v2 + with: + fetch-depth: 0 - name: Use Node.js ${{ matrix.node-version }} uses: actions/setup-node@v1 with: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 349a9b3..5fe3844 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -18,6 +18,7 @@ jobs: steps: - uses: actions/checkout@v2 with: + fetch-depth: 0 token: ${{ secrets.repo_pat }} - name: Use Node.js ${{ matrix.node-version }} uses: actions/setup-node@v1 diff --git a/change/p-graph-2020-10-22-12-22-53-continue.json b/change/p-graph-2020-10-22-12-22-53-continue.json new file mode 100644 index 0000000..f7d2326 --- /dev/null +++ b/change/p-graph-2020-10-22-12-22-53-continue.json @@ -0,0 +1,8 @@ +{ + "type": "minor", + "comment": "adding a continue option to p-graph to allow graph to be traversed as far as possible", + "packageName": "p-graph", + "email": "kchau@microsoft.com", + "dependentChangeType": "patch", + "date": "2020-10-22T19:22:53.080Z" +} diff --git a/src/PGraph.ts b/src/PGraph.ts index 322e67c..935fbc6 100644 --- a/src/PGraph.ts +++ b/src/PGraph.ts @@ -12,7 +12,7 @@ export class PGraph { constructor(nodeMap: PGraphNodeMap, dependencies: DependencyList) { [...nodeMap.entries()].forEach(([key, node]) => { - this.pGraphDependencyMap.set(key, { ...node, dependsOn: new Set(), dependedOnBy: new Set() }); + this.pGraphDependencyMap.set(key, { ...node, dependsOn: new Set(), dependedOnBy: new Set(), failed: false }); }); dependencies.forEach(([subjectId, dependentId]) => { @@ -68,36 +68,74 @@ export class PGraph { } const taskToRun = this.pGraphDependencyMap.get(taskToRunId)!; - const taskFnPromise = taskToRun.run(); - currentlyRunningTaskCount += 1; + try { + currentlyRunningTaskCount += 1; - await taskFnPromise; - currentlyRunningTaskCount -= 1; - - // Let's remove this task from all dependent task's dependency array - taskToRun.dependedOnBy.forEach((dependentId) => { - const dependentNode = this.pGraphDependencyMap.get(dependentId)!; - dependentNode.dependsOn.delete(taskToRunId); - - // If the task that just completed was the last remaining dependency for a node, add it to the set of unblocked nodes - if (dependentNode.dependsOn.size === 0) { - priorityQueue.insert(dependentId, nodeCumulativePriorities.get(dependentId)!); + if (!taskToRun.failed) { + await taskToRun.run(); } - }); + } catch(e) { + // mark node and its children to be "failed" in the case of continue, we'll traverse, but not run the nodes + taskToRun.failed = true; + throw e; + } finally { + // schedule next round of tasks if options.continue (continue on error) or successfully run task + const shouldScheduleMoreTasks = options?.continue || !taskToRun.failed; + + if (shouldScheduleMoreTasks) { + // "currentlyRunningTaskCount" cannot be decremented on non-continue cases because of async nature of + // the queue runner. The race condition will end up appearing as if there was no failures even though + // there was one + currentlyRunningTaskCount -= 1; + + // Let's remove this task from all dependent task's dependency array + taskToRun.dependedOnBy.forEach((dependentId) => { + const dependentNode = this.pGraphDependencyMap.get(dependentId)!; + + if (taskToRun.failed) { + dependentNode.failed = true; + } + + dependentNode.dependsOn.delete(taskToRunId); + + // If the task that just completed was the last remaining dependency for a node, add it to the set of unblocked nodes + if (dependentNode.dependsOn.size === 0) { + priorityQueue.insert(dependentId, nodeCumulativePriorities.get(dependentId)!); + } + }); + } + } }; return new Promise((resolve, reject) => { + let errors: Error[] = []; + const trySchedulingTasks = () => { if (priorityQueue.isEmpty() && currentlyRunningTaskCount === 0) { // We are done running all tasks, let's resolve the promise done - resolve(); + if (errors.length === 0) { + resolve(); + } else { + reject(errors); + } return; } - + while (!priorityQueue.isEmpty() && (concurrency === undefined || currentlyRunningTaskCount < concurrency)) { scheduleTask() .then(() => trySchedulingTasks()) - .catch((e) => reject(e)); + .catch((e) => { + errors.push(e); + + // if a continue option is set, this merely records what errors have been encountered + // it'll continue down the execution until all the tasks that still works + if (options?.continue) { + trySchedulingTasks(); + } else { + // immediately reject, if not using "continue" option + reject(e); + } + }); } }; diff --git a/src/__tests__/index.tests.ts b/src/__tests__/index.tests.ts index abebf92..9f5c9af 100644 --- a/src/__tests__/index.tests.ts +++ b/src/__tests__/index.tests.ts @@ -228,9 +228,43 @@ describe("Public API", () => { ["A", "C"], ]; - await expect(pGraph(nodeMap, dependencies).run()).rejects.toEqual("C rejected"); + await expect(pGraph(nodeMap, dependencies).run()).rejects.toContain("C rejected"); }); + it("throws an exception, but continues to run the entire graph", async () => { + const runFns = { + A: jest.fn().mockReturnValue(Promise.resolve()), + B: jest.fn().mockReturnValue(Promise.resolve()), + D: jest.fn().mockReturnValue(Promise.resolve()), + E: jest.fn().mockReturnValue(Promise.resolve()), + F: jest.fn().mockReturnValue(Promise.resolve()) + } + + const nodeMap: PGraphNodeMap = new Map([ + ["A", { run: runFns.A }], + ["B", { run: runFns.B }], + ["C", { run: () => Promise.reject("C rejected") }], + ["D", { run: runFns.D }], + ["E", { run: runFns.E }], + ["F", { run: runFns.F }], + ]); + + const dependencies: DependencyList = [ + ["A", "B"], + ["A", "C"], + ["A", "D"], + ["C", "D"], + ["A", "E"], + ["E", "F"], + ]; + + await expect(pGraph(nodeMap, dependencies).run({concurrency: 1, continue: true})).rejects.toContain("C rejected"); + expect(runFns.E).toHaveBeenCalled(); + expect(runFns.F).toHaveBeenCalled(); + expect(runFns.D).not.toHaveBeenCalled(); + }); + + it("throws when one of the dependencies references a node not in the node map", async () => { const nodeMap: PGraphNodeMap = new Map([ ["A", { run: () => Promise.resolve() }], diff --git a/src/types.ts b/src/types.ts index 97adf47..fa51f98 100644 --- a/src/types.ts +++ b/src/types.ts @@ -28,6 +28,9 @@ export type DependencyList = [string, string][]; export interface RunOptions { /** The maximum amount of promises that can be executing at the same time. When not provided, we do not limit the number of concurrent tasks and run tasks as soon as they are unblocked */ concurrency?: number; + + /** Continues the graph even if there's an rejected task */ + continue?: boolean; } /** @@ -43,4 +46,9 @@ export interface PGraphNodeWithDependencies extends PGraphNode { * The set of nodes that cannot start execution until this node has completed execution. */ dependedOnBy: Set; + + /** + * Flag whether this node is failed or not (if so, skip it and mark its children to be skipped) + */ + failed: boolean; }