Created
May 16, 2017 07:12
-
-
Save nsk-mironov/280e5c5e82c6ce1c430d40150a16fe26 to your computer and use it in GitHub Desktop.
ObservableMap.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.Observable | |
import io.reactivex.ObservableSource | |
import io.reactivex.Observer | |
import io.reactivex.internal.fuseable.HasUpstreamObservableSource | |
import io.reactivex.internal.fuseable.QueueFuseable | |
import io.reactivex.internal.observers.BasicFuseableObserver | |
abstract class BaseObservableMap<T : Any, U : Any> protected constructor(private val source: ObservableSource<T>) : Observable<U>(), HasUpstreamObservableSource<T> { | |
override fun source(): ObservableSource<T> { | |
return source | |
} | |
override fun subscribeActual(observer: Observer<in U>) { | |
source.subscribe(MapObserver(observer)) | |
} | |
protected abstract fun onMap(value: T): U | |
private inner class MapObserver(actual: Observer<in U>) : BasicFuseableObserver<T, U>(actual) { | |
override fun onNext(value: T) { | |
if (done) { | |
return | |
} | |
if (sourceMode != QueueFuseable.NONE) { | |
actual.onNext(null) | |
return | |
} | |
val v = try { | |
onMap(value) | |
} catch (ex: Throwable) { | |
fail(ex) | |
return | |
} | |
actual.onNext(v) | |
} | |
override fun requestFusion(mode: Int): Int { | |
return transitiveBoundaryFusion(mode) | |
} | |
override fun poll(): U { | |
return onMap(qs.poll()) | |
} | |
} | |
companion object { | |
inline fun <T : Any, U : Any> create(source: ObservableSource<T>, crossinline mapper: (T) -> U): Observable<U> { | |
return object : BaseObservableMap<T, U>(source) { | |
override fun onMap(value: T) = mapper(value) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment