Created
September 17, 2023 05:24
-
-
Save jacobdam/0979b899a6647b862a2268d62307f83d to your computer and use it in GitHub Desktop.
WIP - Simple implementation of workflow engine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import assert from "node:assert"; | |
import { AsyncLocalStorage } from "node:async_hooks"; | |
// database | |
interface WorkflowInst { | |
completedSteps: number; | |
recordedStepCalls: { name: string; params: any }[]; // for debug only | |
} | |
const db: { [_: string]: WorkflowInst } = {}; | |
async function getWorkflowInstanceFromDB( | |
instId: string | |
): Promise<WorkflowInst | null> { | |
return db[instId] || null; | |
} | |
async function saveWorkflowInstanceToDB( | |
instId: string, | |
inst: WorkflowInst | |
): Promise<void> { | |
db[instId] = inst; | |
} | |
interface WorkflowContext { | |
currentStep: number; | |
instId: string; | |
workflowInst: WorkflowInst; | |
} | |
// workflow engine | |
const asyncLocalStorage = new AsyncLocalStorage(); | |
async function runWorkflow<TParams>( | |
instId: string, | |
workflowFn: (_: TParams) => void, | |
params: TParams | |
): Promise<void> { | |
const ctx: WorkflowContext = { | |
currentStep: -1, | |
instId, | |
workflowInst: (await getWorkflowInstanceFromDB(instId)) || { | |
completedSteps: 0, | |
recordedStepCalls: [], | |
}, | |
}; | |
await saveWorkflowInstanceToDB(instId, ctx.workflowInst); | |
await asyncLocalStorage.run(ctx, () => { | |
return workflowFn(params); | |
}); | |
} | |
async function _runStep<TParams>( | |
stepFn: (_: TParams) => void, | |
params: TParams | |
): Promise<void> { | |
const ctx: WorkflowContext = asyncLocalStorage.getStore() as any; | |
ctx.currentStep++; | |
if (ctx.currentStep < ctx.workflowInst.completedSteps) { | |
// ensure the completed step run previously have same stepName and params | |
const recordedStepCall = | |
ctx.workflowInst.recordedStepCalls[ctx.currentStep]; | |
assert.equal( | |
recordedStepCall.name, | |
(stepFn as any).stepName, | |
"step function must be the same" | |
); | |
assert.deepEqual( | |
recordedStepCall.params, | |
params, | |
"step params must be the same" | |
); | |
return; | |
} | |
await stepFn(params); | |
ctx.workflowInst.recordedStepCalls.push({ | |
name: (stepFn as any).stepName, | |
params, | |
}); | |
ctx.workflowInst.completedSteps++; | |
await saveWorkflowInstanceToDB(ctx.instId, ctx.workflowInst); | |
} | |
// annotation syntax: | |
// @Step() | |
// async function someStep(params: { foo: string }) { | |
// } | |
function Step<T>(stepFn: (_: T) => void, fnName?: string) { | |
const fn = function step(params: T) { | |
_runStep(stepFn, params); | |
}; | |
fn.stepName = fnName || stepFn.name; | |
return fn; | |
} | |
function log(...args: unknown[]) { | |
const ctx: WorkflowContext = asyncLocalStorage.getStore() as any; | |
console.log(ctx.instId, ...args); | |
} | |
// user code | |
function sleep(miliseconds: number): Promise<void> { | |
return new Promise((resolve) => setTimeout(resolve, miliseconds)); | |
} | |
async function workflow1(params: string) { | |
log("workflow1", "start", params); | |
await step1(params); | |
if (params === "bar") { | |
await step2(params); | |
} | |
log("workflow1", "end", params); | |
} | |
const step1 = Step(async function step1(params: string) { | |
log("step1", "start", params); | |
await sleep(1000); | |
log("step1", "end", params); | |
}); | |
const step2 = Step(async function step2(params: string) { | |
log("step2", params); | |
}); | |
runWorkflow("wf1-01", workflow1, "foo"); | |
runWorkflow("wf1-02", workflow1, "bar"); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment