Last active
December 13, 2021 03:28
-
-
Save jacwright/3f36557023a92e0d1f41abf53a87de7e to your computer and use it in GitHub Desktop.
Concurrency: Easy, Fast, Correct — Choose three
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Simplify concurrency using the strategies described in | |
* https://blog.cloudflare.com/durable-objects-easy-fast-correct-choose-three/ using input gates and output gates by | |
* allowing certain actions to block other actions or to block the responses of other actions. | |
* To "block" an action or response means to defer it until after the blocking actions have been processed. This | |
* allows the state of an object to be consistent and avoid race conditions which may be hard to discover. | |
* It is a good idea to use this in conjunction with a cache for blockable actions (e.g. a storage mechanism) to ensure | |
* the blocking doesn't slow down the system. | |
* | |
* Definitions: | |
* - "blockable" methods won't execute until after being unblocked. They will be deferred to run afterwards. | |
* - "blockableResponse" methods will execute but won't deliver a response until after being unblocked. | |
* - "blocking" methods will block blockable calls and responses. | |
* | |
* In the article linked above, `this.storage.get` and `this.storage.put` are blocking, `fetch` has a blockable | |
* response, and `this.getUniqueNumber()` is a blockable method. | |
* | |
* Note that blockable methods also have blockable responses. If you return a response while a storage operation is in | |
* progress, the response will return after the storage operation completes. If the storage operation fails, the | |
* response will be the error. | |
* | |
* Usage - with decorators (recommended): | |
* | |
* ```ts | |
* import { simplifiedConcurrency } from 'simplified-concurrency'; | |
* | |
* const { blockable, blocking, blockableResponse } = simplifiedConcurrency(); | |
* | |
* class MyDurableObject { | |
* storage: Storage; | |
* | |
* constructor() { | |
* this.storage = new Storage(); | |
* } | |
* | |
* @blockable | |
* async getUniqueNumber() { | |
* await this.fetch("https://example.com/api1"); | |
* let val = await this.storage.get("counter"); | |
* this.storage.put("counter", val + 1); | |
* return val; | |
* } | |
* | |
* @blockableResponse | |
* async fetch(input: RequestInfo, init?: RequestInit): Promise<Response> { | |
* return fetch(input, init); | |
* } | |
* } | |
* | |
* // Simple storage with cache | |
* class Storage { | |
* cache: Map<string, any>; | |
* | |
* constructor() { | |
* this.cache = new Map(); | |
* } | |
* | |
* @blocking | |
* async get(key) { | |
* if (this.cache.has(key)) { | |
* return this.cache.get(key); | |
* } else { | |
* // if fetch is used in a blocking method, be sure to use the global fetch and not the blockable-response fetch | |
* // provided which would never return since it is blocked by the method calling it. | |
* const response = await fetch(BACKEND_URL + key); | |
* return await response.json(); | |
* } | |
* } | |
* | |
* @blocking | |
* async put(key, value) { | |
* this.cache.set(key, value); | |
* | |
* const response = await fetch(BACKEND_URL + key, { | |
* method: 'POST', | |
* headers: { 'Content-Type': 'application/json' }}, | |
* body: JSON.stringify(value) | |
* }); | |
* | |
* return await response.json(); | |
* } | |
* } | |
* ``` | |
* | |
* These APIs work in the browser and in Node.js as they are just plain JavaScript promises. | |
*/ | |
export function simplifiedConcurrency() { | |
const blockingCalls = new Set<number>(); | |
const callsJustBlocked = new Map<number, Promise<any>>(); | |
const deferred: Function[] = []; | |
let call = 0; | |
function reset() { | |
// Reset for testing | |
blockingCalls.clear(); | |
callsJustBlocked.clear(); | |
deferred.length = 0; | |
call = 0; | |
} | |
/** | |
* Wrap a function which is blockable. | |
* Also, a Typescript decorator for functions which are blockable. | |
* Examples: | |
* | |
* Wrapped function: | |
* ```ts | |
* // function | |
* const increment = blockable(async (amount: number) => { | |
* const value = await getFromStorage('value'); // blocking call | |
* setToStorage('value', value + amount); // blocking call | |
* return value + amount; // won't return until blocking is finished | |
* }); | |
* | |
* // class | |
* class Controller { | |
* constructor() { | |
* this.increment = blockable(this.increment); | |
* } | |
* | |
* async increment(amount: number) { | |
* const value = await this.storage.get('value'); // blocking call | |
* this.storage.set('value', value + amount); // blocking call | |
* return value + amount; // won't return until blocking is finished | |
* } | |
* } | |
* ``` | |
* | |
* Decorator: | |
* ```ts | |
* class Controller { | |
* | |
* @blockable | |
* async increment(amount: number) { | |
* const value = await this.storage.get('value'); // blocking call | |
* this.storage.set('value', value + amount); // blocking call | |
* return value + amount; // won't return until blocking is finished | |
* } | |
* } | |
* ``` | |
*/ | |
function blockable<T extends Function>(target: T): T; | |
function blockable(target: any, propertyKey: string, descriptor: PropertyDescriptor): void; | |
function blockable(target: any, propertyKey?: string, descriptor?: PropertyDescriptor) { | |
return wrap(target, descriptor, func => | |
function(...args: any[]) { | |
if (!blockingCalls.size) { | |
return func.apply(this, args).then(outputGate); | |
} else { | |
return new Promise((resolve, reject) => { | |
deferred.push(() => func.apply(this, args).then(outputGate).then(resolve, reject)); | |
}); | |
} | |
} | |
); | |
} | |
/** | |
* Wrap a function which returns a response which is blockable (e.g. fetch). | |
* Also, a Typescript decorator for functions whose response should be blocked when needed. | |
* Example: | |
* ```ts | |
* class RemoteAPI { | |
* @blockableResponse | |
* async sendEmail(options: EmailOptions) { | |
* ... | |
* } | |
* } | |
* ``` | |
*/ | |
function blockableResponse<T extends Function>(target: T): T; | |
function blockableResponse(target: any, propertyKey: string, descriptor: PropertyDescriptor): void; | |
function blockableResponse(target: any, propertyKey?: string, descriptor?: PropertyDescriptor) { | |
return wrap(target, descriptor, func => | |
function(...args: any[]) { | |
return deferResponse(func.apply(this, args)); | |
} | |
); | |
} | |
/** | |
* Wrap a function which blocks. | |
* Also, a Typescript decorator for functions which block. | |
* Examples: | |
* | |
* Wrapped function: | |
* ```ts | |
* // function | |
* const getFromStorage = blocking(async (key: string) => { | |
* ... | |
* }); | |
* | |
* // class | |
* class Storage { | |
* constructor() { | |
* this.get = blocking(this.get); | |
* } | |
* | |
* async get(key: string) { | |
* ... | |
* } | |
* } | |
* ``` | |
* | |
* Decorator: | |
* ```ts | |
* class Storage { | |
* | |
* @blocking | |
* async get(key: string) { | |
* ... | |
* } | |
* } | |
* ``` | |
*/ | |
function blocking<T extends Function>(target: T): T; | |
function blocking(target: any, propertyKey: string, descriptor: PropertyDescriptor): void; | |
function blocking(target: any, propertyKey?: string, descriptor?: PropertyDescriptor) { | |
return wrap(target, descriptor, func => | |
function(...args: any[]) { | |
return blockFor(func.apply(this, args)); | |
} | |
); | |
} | |
/** | |
* Block while waiting for the callback to resolve. | |
*/ | |
async function blockWhile(callback: Function): Promise<any> { | |
return blockFor(callback()); | |
} | |
/** | |
* Defers the response of this call if a blocking call is in progress. | |
*/ | |
function deferResponse<T>(promise: Promise<T>): Promise<T> { | |
return promise.then(onFulfilled, onRejected); | |
} | |
async function blockFor<T>(promise: Promise<T>) { | |
const thisCall = ++call; | |
blockingCalls.add(thisCall); | |
callsJustBlocked.set(thisCall, promise); | |
afterAll().then(() => callsJustBlocked.delete(thisCall)); | |
function finish() { | |
if (!blockingCalls.has(thisCall)) return; | |
blockingCalls.delete(thisCall); | |
callsJustBlocked.delete(thisCall); | |
if (!blockingCalls.size) afterAll().then(process); | |
} | |
let response: any; | |
try { | |
response = await promise; | |
} catch (e) { | |
finish(); | |
throw e; | |
} | |
finish(); | |
return response; | |
} | |
function onFulfilled(result: any) { | |
if (blockingCalls.size) { | |
return new Promise(resolve => deferred.push(() => resolve(result))); | |
} else { | |
return result; | |
} | |
} | |
function onRejected(reason: any) { | |
if (blockingCalls.size) { | |
return new Promise((resolve, reject) => deferred.push(() => reject(reason))); | |
} else { | |
return Promise.reject(reason); | |
} | |
} | |
// Process deferred calls and responses, pausing when blocked again. | |
function process() { | |
while (!blockingCalls.size && deferred.length) { | |
deferred.shift()(); | |
} | |
} | |
function wrap(target: any, descriptor: PropertyDescriptor, wrapper: (func: Function) => Function): any { | |
const origFunc = descriptor && descriptor.value || target; | |
if (typeof origFunc !== 'function') throw new TypeError('Blocking method wrappers can only be used on functions'); | |
const func = wrapper(origFunc); | |
if (descriptor) { | |
descriptor.value = func; | |
} else { | |
return func; | |
} | |
} | |
// Don't let a blockable function return its result until all blocking calls it initiated are finished. | |
function outputGate(result: any) { | |
if (callsJustBlocked.size) { | |
const promises = Array.from(callsJustBlocked.values()); | |
callsJustBlocked.forEach((promise, call) => blockingCalls.delete(call)); | |
callsJustBlocked.clear(); | |
afterAll().then(process); | |
return Promise.all(promises).then(() => result); | |
} else { | |
return result; | |
} | |
} | |
return { | |
blockable, | |
blockableResponse, | |
blocking, | |
fetch, | |
blockWhile, | |
deferResponse, | |
reset, | |
} | |
} | |
function tick() { | |
return Promise.resolve(); | |
} | |
async function afterAll() { | |
for (let i = 0; i < 10; i++) await tick(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment