Skip to content

Instantly share code, notes, and snippets.

@okikio
Last active August 21, 2024 06:10
Show Gist options
  • Save okikio/7843b20b6f00a2236297a4e2469b3183 to your computer and use it in GitHub Desktop.
Save okikio/7843b20b6f00a2236297a4e2469b3183 to your computer and use it in GitHub Desktop.
Splits a single iterator into 2 separate async iterators by some predicate or by whether an error occurs or not.
/**
* An async generator function that pulls values from the source async iterator.
*
* @template T The type of values being yielded from the source iterator.
*
* This generator yields values it receives from an external source (via `yield`).
* It also receives an initial value when invoked, which is yielded first.
*
* ### How it Works
*
* This generator is pulling values rather than pushing them. This means that
* instead of actively generating values on its own, it waits for values to be
* sent into it through calls to `next()` on the generator object.
*
* - **Pulling from Yields**: The generator's `yield` expression waits for an external value.
* That value is pulled from whatever is passed to the generator's `next()` method.
* In this case, the external source (the `splitAsyncIterator` function) sends values
* from the original source iterator to this generator.
*
* ### Memory and Performance
*
* Pulling values from `yield` helps separate the source iterator's values into
* two independent async iterators without duplicating the source data. Instead of
* storing results in arrays or queues, this approach keeps memory usage low by only
* passing each value once to the correct iterator.
*
* - **Priming**: We need to call `next()` on the generator once before we start sending
* values. This "primes" the generator and ensures it’s ready to receive values. Otherwise,
* the generator would not be prepared to handle incoming values.
*
* - **Completion Notification**: Both the valid and error iterators need to be notified
* when the source iterator is done, so they know to terminate properly. Without this,
* they could hang indefinitely waiting for more values.
*
* @param {IteratorResult<T>} initialResult The first result from the source iterator. This value is immediately yielded.
* @returns {AsyncGenerator<T, T | undefined, IteratorResult<T>>} The async generator yielding values pulled from the source.
*
* @example
* async function* sourceIterator() {
* yield 1;
* yield 2;
* return 3;
* }
*
* const result = asyncGeneratorWithPullYield({ value: 1, done: false });
* console.log(await result.next()); // { value: 1, done: false }
* console.log(await result.next({ value: 2, done: false })); // { value: 2, done: false }
* console.log(await result.next({ value: 3, done: true })); // { value: 3, done: true }
*/
async function* asyncGeneratorWithPullYield<T>(initialResult: IteratorResult<T>) {
// If the initial result is done, return immediately
if (initialResult.done) return initialResult.value;
// Yield the initial value
yield initialResult.value;
while (true) {
const result = await (yield) as IteratorResult<T>;
// If the source is done, exit the generator
if (result.done) return result.value;
// Yield the actual value from the source
yield result.value;
}
}
/**
* Splits a source iterator or iterable into two separate async iterators: one for valid values and one for errors.
*
* @template T The type of values being iterated over by the source.
* @template E The type of errors being thrown by the source.
* @param {AsyncIterable<T> | Iterable<T> | AsyncIterator<T> | Iterator<T>} source The original source iterator or iterable to be split.
* @returns {Promise<[AsyncGenerator<T>, AsyncGenerator<E>]>}
* An array containing two async iterators: one for valid values and one for errors.
*
* ### How it Works
*
* This function processes the source iterator or iterable and directs values into two separate async iterators:
* - One for valid values
* - One for errors
*
* Each time the source iterator yields a value, we pass it to the appropriate iterator. If an
* error is thrown during iteration, that error is passed to the error iterator.
*
* ### Priming the Iterators
*
* Both the valid and error iterators are created using `asyncGeneratorWithPullYield`, which
* requires them to be "primed" by calling `next()` once before any values are sent to them. This is because
* the generator function starts paused and expects a value to be sent in the next step. Priming ensures that
* the generator is ready to handle values properly when the source iterator starts producing them.
*
* ### Performance Considerations
*
* This method avoids duplicating the source iterator's values, which keeps memory usage low. However, since
* each value from the source iterator must be processed sequentially, there may be a performance trade-off in
* scenarios where parallelism is desired.
*
* ### Completion Handling
*
* When the source iterator finishes (i.e., returns `done: true`), both sub-async-iterators are notified to
* terminate their processing. Without this notification, they could continue waiting for more values and never
* complete, causing the application to hang.
*
* @example
* async function* sourceIterator() {
* yield 1;
* yield 2;
* yield new Error("This is an error");
* throw new Error("Something went wrong");
* }
*
* const [resolved, errored] = await splitAsyncIterator(sourceIterator());
*
* (async () => {
* for await (const value of resolved) {
* console.log("Resolved:", value); // Logs: 1, 2, Error: This is an error
* }
* })();
*
* (async () => {
* for await (const error of errored) {
* console.log("Errored:", error); // Logs: Error: Something went wrong
* }
* })();
*/
async function splitAsyncIterator<T, E = unknown>(
source: AsyncIterable<T> | Iterable<T> | AsyncIterator<T> | Iterator<T>
): Promise<[AsyncGenerator<T>, AsyncGenerator<E>]> {
const sourceIterator =
(source as AsyncIterable<T>)[Symbol.asyncIterator]?.() ??
(source as Iterable<T>)[Symbol.iterator]?.() ??
source as AsyncIterator<T>;
let initialResult: IteratorResult<T>;
let hasError: E;
try {
// Run the source iterator once to get the initial value
initialResult = await sourceIterator.next();
} catch (error) {
// In case of an exception, treat it as an error result
initialResult = { value: error as E, done: false };
hasError = error as E;
}
let done = initialResult.done;
// Initialize the sub-async-iterators with the initial result
const validIterator = asyncGeneratorWithPullYield<T>(!done && !hasError ? initialResult : { value: undefined, done: true });
const errorIterator = asyncGeneratorWithPullYield<E>(!done && hasError ? initialResult : { value: undefined, done: true });
// Prime both iterators by calling next() once
await validIterator.next();
await errorIterator.next();
while (!done) {
try {
// Get the next value from the source iterator
const result = await sourceIterator.next();
done = result.done;
// Pass the result to the valid iterator
await validIterator.next(result);
} catch (error) {
// Pass the error to the error iterator
await errorIterator.next({ value: error, done: false });
} finally {
if (done) {
// Notify both iterators that the source is done
await validIterator.next({ value: undefined, done: true });
await errorIterator.next({ value: undefined, done: true });
break;
}
}
}
return [validIterator, errorIterator] as const;
}
/**
* Splits a source iterator or iterable into two separate async iterators based on a predicate function.
*
* @template T The type of values being iterated over by the source.
* @template F The type of values being directed to the second iterator. Defaults to type `T`.
*
* @param {AsyncIterable<T> | Iterable<T> | AsyncIterator<T> | Iterator<T>} source The original source iterator or iterable to be split.
* @param {(value: T) => Promise<boolean>} predicate The predicate function that determines
* which values go to the first iterator. If the predicate resolves to `true`, the value
* goes to the first iterator; otherwise, it goes to the second iterator.
*
* @returns {Promise<[AsyncGenerator<T>, AsyncGenerator<F>]>}
* An array containing two async iterators: one for values that satisfy the predicate and
* one for values that do not satisfy the predicate.
*
* ### How it Works
*
* This function separates values from the source iterator into two different async iterators:
* - The first iterator receives values that satisfy the predicate (the predicate returns `true`).
* - The second iterator receives values that do not satisfy the predicate (`false`).
*
* ### Priming the Iterators
*
* Both iterators are primed by calling `next()` once before processing starts. Priming ensures
* that the generators are ready to receive values as soon as the source iterator yields them.
* Without priming, the generators would not be in the correct state to handle incoming values.
*
* ### Error Handling
*
* If an error occurs during the iteration process, the error is thrown in both sub-async-iterators.
* This ensures that both iterators are made aware of the error and can terminate properly.
*
* ### Why Notify Both Iterators of Completion
*
* Once the source iterator is exhausted (`done: true`), both sub-async-iterators are notified
* that the process is complete. This ensures that they do not remain in a waiting state forever.
*
* @example
* async function* sourceIterator() {
* yield 1;
* yield 2;
* yield 3;
* yield 4;
* }
*
* const isEven = async (value: number) => value % 2 === 0;
*
* const [evens, odds] = await splitAsyncIteratorBy<number>(sourceIterator(), isEven);
*
* (async () => {
* for await (const value of evens) {
* console.log("Even:", value); // Logs: 2, 4
* }
* })();
*
* (async () => {
* for await (const value of odds) {
* console.log("Odd:", value); // Logs: 1, 3
* }
* })();
*
* @example
* async function* errorProneIterator() {
* yield 1;
* yield 2;
* throw new Error("An error occurred");
* }
*
* const handleErrors = async (value: number) => {
* try {
* return value > 0; // Send positive values to the first iterator
* } catch (error) {
* return false; // Send errors to the second iterator
* }
* };
*
* const [resolved, errored] = await splitAsyncIteratorBy<number, number | Error>(errorProneIterator(), handleErrors);
*
* (async () => {
* for await (const value of resolved) {
* console.log("Resolved:", value); // Logs: 1, 2
* }
* })();
*
* (async () => {
* for await (const error of errored) {
* console.log("Errored:", error); // Logs the error message
* }
* })();
*/
async function splitAsyncIteratorBy<T, F = T>(
source: AsyncIterable<T> | Iterable<T> | AsyncIterator<T> | Iterator<T>,
predicate: (value: T) => Promise<boolean>
): Promise<[AsyncGenerator<T>, AsyncGenerator<F>]> {
// Extract the iterator from the source
const sourceIterator =
(source as AsyncIterable<T>)[Symbol.asyncIterator]?.() ??
(source as Iterable<T>)[Symbol.iterator]?.() ??
source as AsyncIterator<T>;
// Initialize iterators for true/false values
let trueIterator: AsyncGenerator<T, T | undefined, IteratorResult<T>>;
let falseIterator: AsyncGenerator<F, F | undefined, IteratorResult<F>>;
try {
// Get the initial result from the source iterator
const initialResult = await sourceIterator.next();
// Determine where to send the initial result based on the predicate
const predicateResult = await predicate(initialResult.value);
// Set the iterators based on the predicate result
trueIterator = asyncGeneratorWithPullYield<T>(predicateResult ? initialResult : { value: undefined, done: true });
falseIterator = asyncGeneratorWithPullYield<F>(!predicateResult ? initialResult : { value: undefined, done: true });
} catch (error) {
// Handle errors and ensure both iterators throw
trueIterator = asyncGeneratorWithPullYield<T>({ value: undefined, done: true });
falseIterator = asyncGeneratorWithPullYield<F>({ value: undefined, done: true });
await trueIterator.throw(error);
await falseIterator.throw(error);
return [trueIterator, falseIterator];
}
// Prime both iterators
await trueIterator.next();
await falseIterator.next();
try {
// Iterate through the source and apply the predicate to direct the values
for await (const result of sourceIterator) {
if (await predicate(result)) {
await trueIterator.next(result);
} else {
await falseIterator.next(result as unknown as F);
}
}
} catch (error) {
// If an error occurs, throw the error in both iterators
await trueIterator.throw(error);
await falseIterator.throw(error);
} finally {
// Notify both iterators that the source has completed
await trueIterator.next({ value: undefined, done: true });
await falseIterator.next({ value: undefined, done: true });
}
return [trueIterator, falseIterator];
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment