Skip to content

Instantly share code, notes, and snippets.

@mrleolink
Last active June 30, 2017 01:06
Show Gist options
  • Save mrleolink/1b1bea707487d7e1e03e76f47a5452b4 to your computer and use it in GitHub Desktop.
Save mrleolink/1b1bea707487d7e1e03e76f47a5452b4 to your computer and use it in GitHub Desktop.
Event Bus implementation powered by RxJava (based on https://dzone.com/articles/how-to-make-an-event-bus-with-rxjava-and-rxandroid)
package net.leolink.android.rxbus;
import android.support.annotation.IntDef;
import android.support.annotation.NonNull;
import android.util.SparseArray;
import java.lang.annotation.Retention;
import java.util.HashMap;
import java.util.Map;
import io.reactivex.Scheduler;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import static java.lang.annotation.RetentionPolicy.SOURCE;
/**
* Event Bus implementation powered by RxJava
*/
public final class RxBus {
private static SparseArray<Subject> sSubjectMap = new SparseArray<>();
private static Map<Object, CompositeDisposable> sDisposablesMap = new HashMap<>();
public static final int EVENT_TYPE_LOGIN_SUCCEEDED = 0;
public static final int EVENT_TYPE_LOGIN_FAILED = 1;
@Retention(SOURCE)
@IntDef({
EVENT_TYPE_LOGIN_SUCCEEDED,
EVENT_TYPE_LOGIN_FAILED
})
@interface EventType {
}
private RxBus() {
// hidden constructor
}
/**
* Get the {@link Subject} or create it if it's not already in memory.
*/
@NonNull
private static <T> Subject<T> getSubject(@EventType int subjectCode) {
Subject<T> subject = (Subject<T>) sSubjectMap.get(subjectCode);
if (subject == null) {
subject = PublishSubject.<T>create().toSerialized();
sSubjectMap.put(subjectCode, subject);
}
return subject;
}
/**
* Get the {@link CompositeDisposable} or create it if it's not already in memory.
*/
@NonNull
private static CompositeDisposable getCompositeSubscription(@NonNull Object object) {
CompositeDisposable compositeSubscription = sDisposablesMap.get(object);
if (compositeSubscription == null) {
compositeSubscription = new CompositeDisposable();
sDisposablesMap.put(object, compositeSubscription);
}
return compositeSubscription;
}
/**
* Subscribe to a type of event. {@code consumer}'s callback will be delivered on main thread.
* <br/><br/>
* <b>Note:</b> Make sure to call {@link RxBus#unregister(Object)} to avoid memory leaks.
*
* @param eventType unique integer to represent an event.
* @param component object which can be used to group (by {@link HashMap}) all events that belong to one component,
* so they can be unsubscribed altogether later.
* @param consumer consumer of the event.
* @param <T> type of object which {@code consumer} will receive, this type is bound to {@code eventType}.
*/
public static <T> void subscribe(@EventType int eventType,
@NonNull Object component,
@NonNull Consumer<T> consumer) {
subscribe(eventType, component, consumer, AndroidSchedulers.mainThread());
}
/**
* Subscribe to a type of event with {@code consumer}'s callback will be delivered on a specific thread.
* <br/><br/>
* <b>Note:</b> Make sure to call {@link RxBus#unregister(Object)} to avoid memory leaks.
*
* @param eventType unique integer to represent an event.
* @param component object which can be used to group (by {@link HashMap}) all events that belong to one component,
* so they can be unsubscribed altogether later.
* @param consumer consumer of the event.
* @param scheduler to specify where will {@code consumer}'s callback will be delivered. Be careful that this will
* override scheduler of all previous events which were subscribed with same {@code component}.
* @param <T> type of object which {@code consumer} will receive, this type is bound to {@code eventType}.
*/
public static <T> void subscribe(@EventType int eventType,
@NonNull Object component,
@NonNull Consumer<T> consumer,
@NonNull Scheduler scheduler) {
Disposable subscription = RxBus.<T>getSubject(eventType)
.observeOn(scheduler)
.subscribe(consumer);
getCompositeSubscription(component).add(subscription);
}
/**
* Unregister all consumers that belong to a component.
*
* @param component component that was used to subscribe.
*/
public static void unregister(@NonNull Object component) {
CompositeDisposable compositeSubscription = sDisposablesMap.remove(component);
if (compositeSubscription != null) {
compositeSubscription.dispose();
}
}
/**
* Publish a message.
*
* @param eventType should be unique per event.
* @param message message to publish.
* @param <T> type of {@code message}
*/
public static <T> void publish(@EventType int eventType, @NonNull T message) {
RxBus.<T>getSubject(eventType).onNext(message);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment