Skip to content

Instantly share code, notes, and snippets.

@swhitty
Last active March 17, 2023 01:15
Show Gist options
  • Save swhitty/e3aecfc516dd7e04db0241fbc4291c3a to your computer and use it in GitHub Desktop.
Save swhitty/e3aecfc516dd7e04db0241fbc4291c3a to your computer and use it in GitHub Desktop.
extension Deferred {
init<T>(operation: @escaping @Sendable () async -> T) where DeferredPublisher == AsyncDeferredPublisher<T, Never> {
self.init { AsyncDeferredPublisher(operation: operation) }
}
init<T>(operation: @escaping @Sendable () async throws -> T) where DeferredPublisher == AsyncDeferredPublisher<T, Error> {
self.init { AsyncDeferredPublisher(operation: operation) }
}
}
// Publisher that executes an async closure one-time-only,
// for each subscriber, when demand > .none
//
// Due to `throws`, Failure can only ever be `Error` or `Never`.
struct AsyncDeferredPublisher<Output, Failure: Error>: Publisher {
private let operation: @Sendable () async throws -> Output
init(operation: @escaping @Sendable () async -> Output) where Failure == Never {
self.operation = operation
}
init(operation: @escaping @Sendable () async throws -> Output) where Failure == Error {
self.operation = operation
}
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
let inner = Inner(downstream: subscriber, operation: operation)
subscriber.receive(subscription: inner)
}
}
private extension AsyncDeferredPublisher {
final class Inner<Downstream: Subscriber>: Subscription where Downstream.Input == Output {
init(downstream: Downstream, operation: @escaping @Sendable () async throws -> Output) {
self.operation = {
do {
_ = downstream.receive(try await operation())
downstream.receive(completion: .finished)
} catch {
guard let failure = error as? Downstream.Failure else {
preconditionFailure("Downstream.Failure must always be either Error or Never")
}
downstream.receive(completion: .failure(failure))
}
}
}
private var operation: (@Sendable () async -> Void)?
private var task: Task<Void, Never>?
private let lock = NSLock()
func request(_ demand: Subscribers.Demand) {
lock.lock()
defer { lock.unlock() }
guard demand > .none,
let operation = operation else { return }
self.operation = nil
task = Task(operation: operation)
}
func cancel() {
lock.lock()
defer { lock.unlock() }
task?.cancel()
task = nil
operation = nil
}
}
}
@priyans05
Copy link

🙌👏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment