Last active
March 1, 2022 23:42
-
-
Save swhitty/759a02ac7513255ba7d7b21efadf3f33 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
import RxSwift | |
import Foundation | |
extension ObservableType { | |
var publisher: RxPublisher<Self> { | |
RxPublisher(self) | |
} | |
} | |
struct RxPublisher<Observable: ObservableType>: Publisher { | |
typealias Output = Observable.Element | |
typealias Failure = Error | |
private let observable: Observable | |
init(_ observable: Observable) { | |
self.observable = observable | |
} | |
func receive<S>(subscriber: S) where S: Subscriber, Error == S.Failure, Output == S.Input { | |
let sub = RxSubscription(observable: observable, subscriber: subscriber) | |
subscriber.receive(subscription: sub) | |
} | |
} | |
private extension RxPublisher { | |
final class RxSubscription<S>: Subscription where S: Subscriber, | |
S.Input == Observable.Element, | |
S.Failure == Error { | |
private var observable: Observable? | |
private var subscriber: S? | |
private var disposable: Disposable? | |
private var didSubscribe: Bool = false | |
private let lock = NSLock() | |
init(observable: Observable, subscriber: S) { | |
self.observable = observable | |
self.subscriber = subscriber | |
} | |
func request(_ demand: Subscribers.Demand) { | |
lock.lock() | |
guard demand > .none, | |
didSubscribe == false, | |
let observable = observable else { | |
lock.unlock() | |
return | |
} | |
didSubscribe = true | |
lock.unlock() | |
let disposable = observable | |
.subscribe(onNext: { [unowned self] in | |
self.receive($0) | |
}, | |
onError: { [unowned self] in | |
self.receive(completion: .failure($0)) | |
}, | |
onCompleted: { [unowned self] in | |
self.receive(completion: .finished) | |
}) | |
lock.lock() | |
if subscriber != nil { | |
self.disposable = disposable | |
} else { | |
// already cancelled | |
disposable.dispose() | |
} | |
lock.unlock() | |
} | |
func receive(_ input: Observable.Element) { | |
lock.lock() | |
let subscriber = self.subscriber | |
lock.unlock() | |
_ = subscriber?.receive(input) | |
} | |
func receive(completion: Subscribers.Completion<Failure>) { | |
lock.lock() | |
let subscriber = self.subscriber | |
lock.unlock() | |
subscriber?.receive(completion: completion) | |
} | |
func cancel() { | |
lock.lock() | |
disposable?.dispose() | |
disposable = nil | |
subscriber = nil | |
observable = nil | |
lock.unlock() | |
} | |
deinit { | |
cancel() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment