Last active
December 22, 2017 10:36
-
-
Save akarnokd/98a1a728a89e6f8dbd1efab104c2be0f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pkg; | |
import rx.Observable; | |
import rx.Observable.OnSubscribe; | |
import rx.Observable.Transformer; | |
import rx.Producer; | |
import rx.Scheduler; | |
import rx.Scheduler.Worker; | |
import rx.Subscriber; | |
import rx.exceptions.MissingBackpressureException; | |
import rx.functions.Action0; | |
import rx.internal.operators.BackpressureUtils; | |
import rx.observers.SerializedSubscriber; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.concurrent.atomic.AtomicReference; | |
public final class ObservableConflate<T> implements OnSubscribe<T> { | |
final Observable<T> source; | |
final long timeout; | |
final TimeUnit unit; | |
final Scheduler scheduler; | |
public ObservableConflate(Observable<T> source, long timeout, TimeUnit unit, Scheduler scheduler) { | |
this.source = source; | |
this.timeout = timeout; | |
this.unit = unit; | |
this.scheduler = scheduler; | |
} | |
@Override | |
public void call(Subscriber<? super T> t) { | |
t = new SerializedSubscriber<>(t); | |
Worker worker = scheduler.createWorker(); | |
ConflateSubscriber<T> parent = new ConflateSubscriber<>(t, timeout, unit, worker); | |
t.add(parent); | |
t.add(worker); | |
t.setProducer(parent.requested); | |
source.subscribe(parent); | |
} | |
static final class ConflateSubscriber<T> extends Subscriber<T> implements Action0 { | |
static final Object EMPTY = new Object(); | |
final Subscriber<? super T> actual; | |
final long timeout; | |
final TimeUnit unit; | |
final Worker worker; | |
final AtomicReference<Object> current; | |
final Requested requested; | |
volatile boolean gate; | |
public ConflateSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker) { | |
this.actual = actual; | |
this.timeout = timeout; | |
this.unit = unit; | |
this.worker = worker; | |
this.current = new AtomicReference<>(EMPTY); | |
this.requested = new Requested(); | |
this.request(Long.MAX_VALUE); | |
} | |
@SuppressWarnings("unchecked") | |
@Override | |
public void onNext(T t) { | |
if (!gate) { | |
gate = true; | |
if (emit(t)) { | |
worker.schedule(this, timeout, unit); | |
} | |
} else { | |
current.set(t); | |
if (!gate) { | |
Object o = current.getAndSet(EMPTY); | |
if (o != EMPTY) { | |
gate = true; | |
if (emit((T)o)) { | |
worker.schedule(this, timeout, unit); | |
} | |
} | |
} | |
} | |
} | |
@Override | |
public void onError(Throwable e) { | |
actual.onError(e); | |
worker.unsubscribe(); | |
} | |
@SuppressWarnings("unchecked") | |
@Override | |
public void onCompleted() { | |
Object o = current.getAndSet(EMPTY); | |
if (o != EMPTY) { | |
if (!emit((T)o)) { | |
return; | |
} | |
} | |
actual.onCompleted(); | |
worker.unsubscribe(); | |
} | |
@SuppressWarnings("unchecked") | |
@Override | |
public void call() { | |
Object o = current.getAndSet(EMPTY); | |
if (o == EMPTY) { | |
gate = false; | |
} else { | |
if (emit((T)o)) { | |
worker.schedule(this, timeout, unit); | |
} | |
} | |
} | |
boolean emit(T v) { | |
if (requested.get() != 0L) { | |
actual.onNext(v); | |
requested.producedOne(); | |
return true; | |
} | |
unsubscribe(); | |
actual.onError(new MissingBackpressureException("Could not emit value due to lack of requests")); | |
return false; | |
} | |
final class Requested extends AtomicLong implements Producer { | |
private static final long serialVersionUID = 5469053227556974007L; | |
@Override | |
public void request(long n) { | |
if (n > 0L) { | |
BackpressureUtils.getAndAddRequest(this, n); | |
} | |
else if (n < 0L) { | |
throw new IllegalArgumentException("n >= 0 required but it was " + n); | |
} | |
} | |
void producedOne() { | |
BackpressureUtils.produced(this, 1L); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pkg; | |
import java.util.concurrent.TimeUnit; | |
import org.junit.Test; | |
import rx.Observable; | |
import rx.observers.AssertableSubscriber; | |
import rx.schedulers.TestScheduler; | |
import rx.subjects.PublishSubject; | |
public class ObservableConflateTest { | |
@Test | |
public void normal() { | |
TestScheduler scheduler = new TestScheduler(); | |
PublishSubject<Integer> source = PublishSubject.create(); | |
AssertableSubscriber<Integer> ts = source.compose(f -> Observable.create(new ObservableConflate<Integer>(f, 1, TimeUnit.SECONDS, scheduler))) | |
.test(); | |
source.onNext(1); | |
ts.assertValue(1); | |
source.onNext(2); | |
ts.assertValue(1); | |
source.onNext(3); | |
ts.assertValue(1); | |
scheduler.advanceTimeBy(1, TimeUnit.SECONDS); | |
ts.assertValues(1, 3); | |
scheduler.advanceTimeBy(1, TimeUnit.SECONDS); | |
ts.assertValues(1, 3); | |
source.onNext(4); | |
ts.assertValues(1, 3, 4); | |
source.onNext(5); | |
source.onCompleted(); | |
ts.assertResult(1, 3, 4, 5); | |
scheduler.advanceTimeBy(1, TimeUnit.SECONDS); | |
ts.assertResult(1, 3, 4, 5); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment