Skip to content

Instantly share code, notes, and snippets.

@swhitty
Last active March 1, 2022 23:42
Show Gist options
  • Save swhitty/759a02ac7513255ba7d7b21efadf3f33 to your computer and use it in GitHub Desktop.
Save swhitty/759a02ac7513255ba7d7b21efadf3f33 to your computer and use it in GitHub Desktop.
import Combine
import RxSwift
import Foundation
extension ObservableType {
var publisher: RxPublisher<Self> {
RxPublisher(self)
}
}
struct RxPublisher<Observable: ObservableType>: Publisher {
typealias Output = Observable.Element
typealias Failure = Error
private let observable: Observable
init(_ observable: Observable) {
self.observable = observable
}
func receive<S>(subscriber: S) where S: Subscriber, Error == S.Failure, Output == S.Input {
let sub = RxSubscription(observable: observable, subscriber: subscriber)
subscriber.receive(subscription: sub)
}
}
private extension RxPublisher {
final class RxSubscription<S>: Subscription where S: Subscriber,
S.Input == Observable.Element,
S.Failure == Error {
private var observable: Observable?
private var subscriber: S?
private var disposable: Disposable?
private var didSubscribe: Bool = false
private let lock = NSLock()
init(observable: Observable, subscriber: S) {
self.observable = observable
self.subscriber = subscriber
}
func request(_ demand: Subscribers.Demand) {
lock.lock()
guard demand > .none,
didSubscribe == false,
let observable = observable else {
lock.unlock()
return
}
didSubscribe = true
lock.unlock()
let disposable = observable
.subscribe(onNext: { [unowned self] in
self.receive($0)
},
onError: { [unowned self] in
self.receive(completion: .failure($0))
},
onCompleted: { [unowned self] in
self.receive(completion: .finished)
})
lock.lock()
if subscriber != nil {
self.disposable = disposable
} else {
// already cancelled
disposable.dispose()
}
lock.unlock()
}
func receive(_ input: Observable.Element) {
lock.lock()
let subscriber = self.subscriber
lock.unlock()
_ = subscriber?.receive(input)
}
func receive(completion: Subscribers.Completion<Failure>) {
lock.lock()
let subscriber = self.subscriber
lock.unlock()
subscriber?.receive(completion: completion)
}
func cancel() {
lock.lock()
disposable?.dispose()
disposable = nil
subscriber = nil
observable = nil
lock.unlock()
}
deinit {
cancel()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment