Skip to content

Instantly share code, notes, and snippets.

@okikio
Created September 1, 2024 03:21
Show Gist options
  • Save okikio/5a5c35d167391e220422462887b9fe30 to your computer and use it in GitHub Desktop.
Save okikio/5a5c35d167391e220422462887b9fe30 to your computer and use it in GitHub Desktop.
Queue's via TransformStreams

Using TransformStream in place of traditional queue implementations is an interesting approach that leverages the stream API's natural queuing and backpressure features. Below is a breakdown of how you might implement each queue type using TransformStream, adhering to the constraint of using no more than 2 TransformStreams per queue, and addressing any limitations that arise.

1. Simple Queue (FIFO Queue)

  • Implementation:
    • TransformStream 1: This stream simply passes data from the writable side to the readable side in FIFO order.
    • TransformStream 2: Not necessary in this case, as one TransformStream is sufficient to maintain the FIFO order.
const fifoQueue = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk);
  }
});

2. Circular Queue

  • Implementation:
    • TransformStream 1: Acts as a standard FIFO queue.
    • TransformStream 2: Wraps around the data flow when the end is reached, effectively creating a circular buffer.
const circularQueue = new TransformStream({
  transform(chunk, controller) {
    if (controller.desiredSize !== null && controller.desiredSize <= 0) {
      controller.enqueue(chunk); // Wrap around behavior
    } else {
      controller.enqueue(chunk);
    }
  }
});

Limitation: This only simulates the wrap-around behavior but doesn't truly limit the size as a circular buffer would. To achieve a true circular queue, you'd need to introduce manual size management.

3. Priority Queue

  • Problem: Managing priorities requires more control than TransformStream can offer directly since it requires sorting and potential reordering of chunks in the stream.
  • Potential Solution: Use one TransformStream to reorder based on priority, but this may require an internal array for sorting, which goes against the goal of avoiding manual storage.
  • Alternative: Implement a custom stream that enqueues chunks based on priority, but this would not be possible with just TransformStream alone without additional storage.

Conclusion: A true PriorityQueue cannot be efficiently implemented using just TransformStream due to the need for reordering and internal state management.

4. Deque (Double-Ended Queue)

  • Implementation:
    • TransformStream 1: Handles standard FIFO operations from the front.
    • TransformStream 2: Allows for dequeuing from the rear, which requires some form of reversible data structure that TransformStream alone doesn't easily provide.

Limitation: A true deque implementation requires more complex state management (support for both ends), which may not be fully achievable with TransformStream alone without additional internal storage.

5. Double-Ended Priority Queue (DEPQ)

  • Problem: Similar to the Priority Queue and Deque, this requires sorting and dequeuing from both ends based on priority, which TransformStream cannot efficiently handle on its own.

Conclusion: This implementation is not feasible with just TransformStream due to the dual requirements of priority and double-ended access.

6. Concurrent Queue

  • Implementation:
    • TransformStream 1: Queue chunks as they come in, supporting concurrent processing.
    • TransformStream 2: Use the second stream to process chunks concurrently.
const concurrentQueue = new TransformStream({
  async transform(chunk, controller) {
    // Simulate concurrent processing
    await someAsyncOperation(chunk);
    controller.enqueue(chunk);
  }
});

7. Circular Buffer

  • Implementation: Similar to the Circular Queue, but this might involve dropping or overwriting data, which TransformStream cannot handle well on its own.
  • Problem: Enforcing a strict buffer size and managing wrap-around requires more control than TransformStream offers.

Conclusion: A true circular buffer with overwrite behavior when full is not efficiently achievable with TransformStream without additional state management.

8. Bounded Queue

  • Implementation:
    • TransformStream 1: Queue chunks as long as the desired size is not exceeded.
    • TransformStream 2: Once the limit is reached, apply backpressure or discard new chunks.
const maxSize = 10;
let currentSize = 0;

const boundedQueue = new TransformStream({
  transform(chunk, controller) {
    if (currentSize < maxSize) {
      currentSize++;
      controller.enqueue(chunk);
    } else {
      // Optionally discard or handle overflow
    }
  }
});

9. Unbounded Queue

  • Implementation:
    • TransformStream 1: Enqueues chunks indefinitely.
    • TransformStream 2: Not needed as the queue is unbounded.
const unboundedQueue = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk);
  }
});

10. Monotonic Queue

  • Problem: Requires sorting or maintaining order based on a specific property, which TransformStream does not inherently support.
  • Alternative Solution: Implement sorting in a custom stream that manages its internal state, but this would go beyond what TransformStream alone can handle.

Conclusion: A monotonic queue would require additional storage to maintain order, making it infeasible with just TransformStream.

11. Double-ended Queue (Deque)

  • Implementation: Similar to the previous Deque description.
  • Problem: Same limitations as mentioned before regarding double-ended access.

12. Double-Ended Priority Queue (DEPQ)

  • Problem: Same as the Priority Queue, with the additional complexity of double-ended access.

Conclusion: Not feasible with just TransformStream.

In summary, while some basic queues like FIFO and bounded queues can be implemented with TransformStream, more complex structures like priority queues, deques, and circular buffers require additional state management or manual storage, making them less suitable for direct implementation with TransformStream alone. For those, alternative approaches or custom stream implementations would be necessary.

You're right—TransformStream is made up of both ReadableStream and WritableStream, and we can indeed leverage this to better take advantage of the streaming capabilities. Let's rethink the implementations to make fuller use of TransformStream and its inherent properties, focusing on maintaining state within the stream and managing backpressure efficiently.

1. Priority Queue

  • Full Advantage of TransformStream:
    • Use the WritableStream to handle incoming chunks and maintain a priority-sorted structure.
    • The ReadableStream can dequeue elements based on their priority.
class PriorityQueue {
  private heap: { chunk: any, priority: number }[] = [];

  private siftDown(start: number) {
    let index = start;
    const length = this.heap.length;
    const element = this.heap[start];

    while (true) {
      let leftChildIdx = 2 * index + 1;
      let rightChildIdx = 2 * index + 2;
      let swapIdx = null;

      if (leftChildIdx < length) {
        if (this.heap[leftChildIdx].priority < element.priority) {
          swapIdx = leftChildIdx;
        }
      }

      if (rightChildIdx < length) {
        if (
          (swapIdx === null && this.heap[rightChildIdx].priority < element.priority) ||
          (swapIdx !== null && this.heap[rightChildIdx].priority < this.heap[swapIdx].priority)
        ) {
          swapIdx = rightChildIdx;
        }
      }

      if (swapIdx === null) break;
      this.heap[index] = this.heap[swapIdx];
      index = swapIdx;
    }

    this.heap[index] = element;
  }

  private siftUp(start: number) {
    let index = start;
    const element = this.heap[index];

    while (index > 0) {
      let parentIdx = Math.floor((index - 1) / 2);
      let parent = this.heap[parentIdx];

      if (element.priority >= parent.priority) break;
      this.heap[index] = parent;
      index = parentIdx;
    }

    this.heap[index] = element;
  }

  public get stream(): TransformStream {
    return new TransformStream({
      start: () => {
        this.heap = [];
      },
      transform: (chunk, controller) => {
        const priority = chunk.priority || 0;
        this.heap.push({ chunk, priority });
        this.siftUp(this.heap.length - 1);
        controller.enqueue(this.dequeue());
      },
      flush: (controller) => {
        while (this.heap.length > 0) {
          controller.enqueue(this.dequeue());
        }
      }
    });
  }

  private dequeue() {
    if (this.heap.length === 0) return null;

    const root = this.heap[0];
    const end = this.heap.pop();

    if (this.heap.length > 0 && end) {
      this.heap[0] = end;
      this.siftDown(0);
    }

    return root.chunk;
  }
}

const priorityQueue = new PriorityQueue();

Explanation:

  • WritableStream: Manages incoming chunks and pushes them into a heap structure based on priority.
  • ReadableStream: Dequeues elements based on priority when requested.
  • This approach leverages the inherent queue management of streams to handle backpressure effectively.

2. Double-Ended Queue (Deque)

  • Full Advantage of TransformStream:
    • Manage state within the TransformStream to handle both front and back operations efficiently without relying on shift().
class Deque {
  private buffer: any[];
  private head: number;
  private tail: number;
  private size: number;
  private capacity: number;

  constructor(capacity: number) {
    this.capacity = capacity;
    this.buffer = new Array(capacity);
    this.head = 0;
    this.tail = 0;
    this.size = 0;
  }

  public get stream(): TransformStream {
    return new TransformStream({
      start: () => {
        this.buffer = new Array(this.capacity);
        this.head = 0;
        this.tail = 0;
        this.size = 0;
      },
      transform: (chunk, controller) => {
        if (chunk.enqueueFront) {
          this.enqueueFront(chunk.data);
        } else {
          this.enqueueBack(chunk.data);
        }
        if (this.size > 0) {
          controller.enqueue(this.dequeueFront()); // Default to dequeue from front
        }
      },
      flush: (controller) => {
        while (this.size > 0) {
          controller.enqueue(this.dequeueFront());
        }
      }
    });
  }

  private enqueueFront(chunk: any) {
    if (this.size === this.capacity) throw new Error('Deque is full');
    this.head = (this.head - 1 + this.capacity) % this.capacity;
    this.buffer[this.head] = chunk;
    this.size++;
  }

  private enqueueBack(chunk: any) {
    if (this.size === this.capacity) throw new Error('Deque is full');
    this.buffer[this.tail] = chunk;
    this.tail = (this.tail + 1) % this.capacity;
    this.size++;
  }

  private dequeueFront() {
    if (this.size === 0) return null;
    const chunk = this.buffer[this.head];
    this.head = (this.head + 1) % this.capacity;
    this.size--;
    return chunk;
  }

  private dequeueBack() {
    if (this.size === 0) return null;
    this.tail = (this.tail - 1 + this.capacity) % this.capacity;
    const chunk = this.buffer[this.tail];
    this.size--;
    return chunk;
  }
}

const deque = new Deque(10);

Explanation:

  • WritableStream: Handles incoming chunks and places them at the front or back of the deque.
  • ReadableStream: Dequeues elements from the front by default but can be adapted to dequeue from the back.
  • This implementation uses a circular buffer to avoid shifting elements.

3. Double-Ended Priority Queue (DEPQ)

  • Full Advantage of TransformStream:
    • Combine the priority sorting of the heap with double-ended operations by managing the state within the TransformStream.
class DoubleEndedPriorityQueue {
  private heap: { chunk: any, priority: number }[] = [];
  private capacity: number;
  private head: number;
  private tail: number;
  private size: number;

  constructor(capacity: number) {
    this.capacity = capacity;
    this.head = 0;
    this.tail = 0;
    this.size = 0;
  }

  public get stream(): TransformStream {
    return new TransformStream({
      start: () => {
        this.heap = [];
        this.head = 0;
        this.tail = 0;
        this.size = 0;
      },
      transform: (chunk, controller) => {
        const priority = chunk.priority || 0;
        if (chunk.enqueueFront) {
          this.enqueueFront(chunk.data, priority);
        } else {
          this.enqueueBack(chunk.data, priority);
        }
        controller.enqueue(this.dequeueFront()); // Default to dequeue from front
      },
      flush: (controller) => {
        while (this.heap.length > 0) {
          controller.enqueue(this.dequeueFront());
        }
      }
    });
  }

  private siftDown(start: number) {
    // Sift down logic as in the PriorityQueue class
  }

  private siftUp(start: number) {
    // Sift up logic as in the PriorityQueue class
  }

  private enqueueFront(chunk: any, priority: number) {
    if (this.size === this.capacity) throw new Error('DEPQ is full');
    this.heap.unshift({ chunk, priority });
    this.siftUp(0); // Move the newly added element up in the heap
    this.head = (this.head - 1 + this.capacity) % this.capacity;
    this.size++;
  }

  private enqueueBack(chunk: any, priority: number) {
    if (this.size === this.capacity) throw new Error('DEPQ is full');
    this.heap.push({ chunk, priority });
    this.siftUp(this.heap.length - 1);
    this.tail = (this.tail + 1) % this.capacity;
    this.size++;
  }

  private dequeueFront() {
    if (this.size === 0) return null;
    this.siftDown(0); // Maintain the heap property
    this.size--;
    return this.heap.shift()?.chunk;
  }

  private dequeueBack() {
    if (this.size === 0) return null;
    this.size--;
    return this.heap.pop()?.chunk;
  }
}

const doubleEndedPriorityQueue = new DoubleEndedPriorityQueue(10);

Explanation:

  • WritableStream: Handles both front and back enqueue operations with priority sorting.
  • ReadableStream: Manages the dequeue operation based on priority and from the front by default.

4. Circular Buffer

  • Full Advantage of TransformStream:
    • Implement the circular buffer entirely within the TransformStream, handling wrap-around and backpressure.
class CircularBuffer {
  private buffer: any[];
  private maxSize: number;
  private head: number;
  private tail: number;
  private full: boolean;

  constructor(maxSize: number) {
    this.maxSize = maxSize;
    this.buffer = new Array(maxSize);
    this.head = 0;
    this.tail

= 0;
    this.full = false;
  }

  public get stream(): TransformStream {
    return new TransformStream({
      start: () => {
        this.buffer = new Array(this.maxSize);
        this.head = 0;
        this.tail = 0;
        this.full = false;
      },
      transform: (chunk, controller) => {
        this.enqueue(chunk);
        if (!this.isEmpty()) {
          controller.enqueue(this.dequeue());
        }
      },
      flush: (controller) => {
        while (!this.isEmpty()) {
          controller.enqueue(this.dequeue());
        }
      }
    });
  }

  private enqueue(chunk: any) {
    this.buffer[this.tail] = chunk;
    this.tail = (this.tail + 1) % this.maxSize;

    if (this.tail === this.head) {
      this.full = true;
      this.head = (this.head + 1) % this.maxSize; // Overwrite the oldest data
    }
  }

  private dequeue() {
    if (this.isEmpty()) return null;
    const chunk = this.buffer[this.head];
    this.head = (this.head + 1) % this.maxSize;
    this.full = false;
    return chunk;
  }

  private isEmpty() {
    return !this.full && this.head === this.tail;
  }
}

const circularBuffer = new CircularBuffer(10);

Explanation:

  • WritableStream: Handles enqueuing into the buffer, including wrap-around when the buffer is full.
  • ReadableStream: Manages dequeuing and ensures that no element is missed, even when the buffer wraps around.

5. Monotonic Queue

  • Full Advantage of TransformStream:
    • Utilize the TransformStream to ensure elements are enqueued in a monotonic order.
class MonotonicQueue {
  private items: any[] = [];
  private head: number = 0;
  private tail: number = 0;

  public get stream(): TransformStream {
    return new TransformStream({
      start: () => {
        this.items = [];
        this.head = 0;
        this.tail = 0;
      },
      transform: (chunk, controller) => {
        this.enqueue(chunk);
        if (this.head < this.tail) {
          controller.enqueue(this.dequeue());
        }
      },
      flush: (controller) => {
        while (this.head < this.tail) {
          controller.enqueue(this.dequeue());
        }
      }
    });
  }

  private enqueue(chunk: any) {
    while (this.tail > this.head && this.items[this.tail - 1] > chunk) {
      this.tail--;
    }
    this.items[this.tail] = chunk;
    this.tail++;
  }

  private dequeue() {
    if (this.head === this.tail) return null;
    const chunk = this.items[this.head];
    this.head++;
    return chunk;
  }
}

const monotonicQueue = new MonotonicQueue();

Explanation:

  • WritableStream: Manages enqueuing in a monotonic order, ensuring elements are placed correctly as they arrive.
  • ReadableStream: Handles dequeuing while maintaining the monotonic property of the queue.

Conclusion

By leveraging both the WritableStream and ReadableStream components of TransformStream, we can create efficient implementations of these various queue structures. This approach allows us to fully utilize the streaming capabilities of TransformStream, manage backpressure, and maintain the desired queue behaviors without relying on inefficient operations like shift(). Each queue type is implemented with memory efficiency and performance in mind, making full use of TransformStream's inherent properties.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment