Skip to content

Instantly share code, notes, and snippets.

@mattmassicotte
Last active August 1, 2024 21:03
Show Gist options
  • Save mattmassicotte/bde7a4931a33a36361df0e068a41326a to your computer and use it in GitHub Desktop.
Save mattmassicotte/bde7a4931a33a36361df0e068a41326a to your computer and use it in GitHub Desktop.
import Foundation
struct TimeoutError: Error {
}
extension AsyncThrowingStream.Continuation where Failure == Error {
func timeoutTask(with interval: TimeInterval) -> Task<Void, Error> {
let sleepNS = UInt64(interval * 1_000_000_000)
return Task {
// this will throw on cancel
try await Task.sleep(nanoseconds: sleepNS)
self.finish(throwing: TimeoutError())
}
}
}
extension AsyncThrowingStream where Failure == Error, Element: Sendable {
func timeout(_ interval: TimeInterval) -> Self {
return Self { continuation in
Task {
do {
var timeout = continuation.timeoutTask(with: interval)
for try await value in self {
timeout.cancel()
continuation.yield(value)
timeout = continuation.timeoutTask(with: interval)
}
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}
}
}
}
@jdanthinne
Copy link

@mattmassicotte shouldn't be the code inside the main Task wrapped in a try…catch in order to catch the original stream errors and forward them to continuation.finish(throwing: error)?

@mattmassicotte
Copy link
Author

@jdanthinne yeah wasn't quite right. I've update it with Donny's approach, which I like more. It is still essential that Element conform to Sendable in this version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment