Skip to content

Instantly share code, notes, and snippets.

@aballano
Created September 15, 2017 11:35
Show Gist options
  • Save aballano/c9e69d8e94dec3f28620068394ed5ddf to your computer and use it in GitHub Desktop.
Save aballano/c9e69d8e94dec3f28620068394ed5ddf to your computer and use it in GitHub Desktop.
import io.reactivex.CompletableObserver
import io.reactivex.MaybeObserver
import io.reactivex.Observer
import io.reactivex.SingleObserver
import io.reactivex.disposables.Disposable
import io.reactivex.exceptions.CompositeException
import io.reactivex.exceptions.Exceptions
import io.reactivex.functions.Action
import io.reactivex.functions.Consumer
import io.reactivex.internal.functions.Functions
import io.reactivex.plugins.RxJavaPlugins
class NonDisposableObserver<T> private constructor(private val onSuccess: Consumer<in T>,
private val onComplete: Action,
private val onError: Consumer<in Throwable>)
: CompletableObserver, ResultObserver<T> {
override fun onSubscribe(disposable: Disposable) {
// No-op
}
// Observable
override fun onNext(value: T) {
try {
onSuccess.accept(value)
} catch (exception: Exception) {
Exceptions.throwIfFatal(exception)
onError(exception)
}
}
// Single, Maybe
override fun onSuccess(value: T) {
try {
onSuccess.accept(value)
} catch (exception: Exception) {
Exceptions.throwIfFatal(exception)
RxJavaPlugins.onError(exception)
}
}
// Observable, Completable
override fun onComplete() {
try {
onComplete.run()
} catch (exception: Exception) {
Exceptions.throwIfFatal(exception)
RxJavaPlugins.onError(exception)
}
}
// All
override fun onError(throwable: Throwable) {
try {
onError.accept(throwable)
} catch (exception: Exception) {
Exceptions.throwIfFatal(exception)
RxJavaPlugins.onError(CompositeException(throwable, exception))
}
}
companion object {
// Single, Maybe, Observable
@JvmStatic fun <T> create(
onSuccess: Consumer<in T>,
onError: Consumer<in Throwable>): ResultObserver<T> =
NonDisposableObserver(onSuccess, Functions.EMPTY_ACTION, onError)
// Observable
@JvmStatic fun <T> create(
onSuccess: Consumer<in T>,
onComplete: Action,
onError: Consumer<in Throwable>): ResultObserver<T> =
NonDisposableObserver(onSuccess, onComplete, onError)
// Completable
@JvmStatic fun create(
onComplete: Action,
onError: Consumer<in Throwable>): CompletableObserver =
NonDisposableObserver(Functions.emptyConsumer(), onComplete, onError)
}
}
interface ResultObserver<T> : SingleObserver<T>, Observer<T>, MaybeObserver<T>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment