Skip to content

Instantly share code, notes, and snippets.

@pookjw
Last active August 2, 2024 09:33
Show Gist options
  • Save pookjw/d91083e3f9add70cefa5f1345d5bb244 to your computer and use it in GitHub Desktop.
Save pookjw/d91083e3f9add70cefa5f1345d5bb244 to your computer and use it in GitHub Desktop.
extension AsyncStream {
@discardableResult
func yield(_ value: __owned Element) -> AsyncStream<Element>.Continuation.YieldResult {
let context = (Mirror(reflecting: self).descendant("context")! as AnyObject?)!
let storage = (Mirror(reflecting: context).descendant("storage")! as AnyObject?)!
let continuation = unsafeBitCast(storage, to: Self.Continuation.self)
_ = Unmanaged.passRetained(storage)
return continuation.yield(value)
}
}
@main
struct MyScript {
static func main() async {
let stream: AsyncStream<Int> = .init { _ in }
Task.detached {
while true {
try? await Task.sleep(for: .seconds(1))
stream.yield(Int.random(in: 0...100))
}
}
for await value in stream {
print(value)
}
}
}
@pookjw
Copy link
Author

pookjw commented Aug 2, 2024

Other (complicated) consideration

import os

fileprivate final class _AsyncStreamContinuationStorage: @unchecked Sendable {
  static let shared = _AsyncStreamContinuationStorage()

  let lock = OSAllocatedUnfairLock<[UnsafeRawPointer: AnyObject]>.init(initialState: [:])

  private init() {}

  func store<T: Sendable>(stream: AsyncStream<T>, continuation: AsyncStream<T>.Continuation) {
    // AsyncStream._Context
    // AsyncStram Memory Layout
    //  - 0 byte : void * (AsyncStream._Context, 8 byte)
    let context = unsafeBitCast(stream, to: UnsafeRawPointer.self)

    // AsyncStream._Storage
    // AsyncStram.Continuation Memory Layout
    //  - 0 byte : void * (AsyncStream._Storage, 8 byte)
    let storage = unsafeBitCast(continuation, to: AnyObject.self)

    lock.withLock { pointers in
      pointers[context] = storage
    }
  }

  func load<T: Sendable>(stream: AsyncStream<T>) -> AsyncStream<T>.Continuation? {
    let context = unsafeBitCast(stream, to: UnsafeRawPointer.self)

    guard let storage = lock.withLock({ $0[context] }) else {
      return nil
    }

    // AsyncStram.Continuation Memory Layout
    //  - 0 byte : void * (AsyncStream._Storage, 8 byte)
    return unsafeBitCast(storage, to: AsyncStream<T>.Continuation.self)
  }

  func remove<T: Sendable>(stream: AsyncStream<T>) {
    let context = unsafeBitCast(stream, to: UnsafeRawPointer.self)

    lock.withLock { pointers in
      _ = pointers.removeValue(forKey: context)
    }
  }
}

extension AsyncStream {
  static func makeStoredStream(of elementType: Element.Type = Element.self, bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded) -> AsyncStream<Element> {
    let (stream, continuation) = AsyncStream<Element>.makeStream(of: elementType, bufferingPolicy: limit)

    continuation.onTermination = { _ in
      _AsyncStreamContinuationStorage.shared.remove(stream: stream)
    }

    _AsyncStreamContinuationStorage.shared.store(stream: stream, continuation: continuation)

    return stream
  }

  @discardableResult
  func yield(_ value: __owned Element) -> AsyncStream<Element>.Continuation.YieldResult {
    guard let result = _AsyncStreamContinuationStorage.shared.load(stream: self)?.yield(value) else {
      fatalError("AsyncStream is not constructed by makeStoredStream(of:bufferingPolicy:)")
    }

    return result
  }
}


@main
struct MyScript {
  static func main() async {
    let stream: AsyncStream<Int> = {
      AsyncStream<Int>.makeStoredStream()
    }()

    Task.detached {
      while true {
        try? await Task.sleep(for: .seconds(1))
        stream.yield(Int.random(in: 0...100))
      }
    }

    for await value in stream {
      print(value)
    }
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment