Skip to content

Instantly share code, notes, and snippets.

@mirland
Last active November 20, 2018 16:07
Show Gist options
  • Save mirland/aac2c1318a1a1facd811 to your computer and use it in GitHub Desktop.
Save mirland/aac2c1318a1a1facd811 to your computer and use it in GitHub Desktop.
Regular Interval Delay, custom rx operator
import java.util.Date;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import timber.log.Timber;
/**
* Created by mirland on 26/02/16.
*/
@SuppressWarnings("unused")
public class MinIntervalDelay<T> implements Observable.Operator<T, T> {
private final static GeneralObserver<Object> observer = new GeneralObserver<>();
private long lastCall;
private long intervalTimeMilliseconds;
public MinIntervalDelay(long intervalTime, TimeUnit intervalUnit) {
this(TimeUnit.MILLISECONDS.convert(intervalTime, intervalUnit));
}
public MinIntervalDelay(long intervalTimeMilliseconds) {
this.intervalTimeMilliseconds = intervalTimeMilliseconds;
lastCall = 0L;
}
long calculateNextCall() {
long now = new Date().getTime();
if (lastCall == 0 || now > lastCall) {
if (now - lastCall >= intervalTimeMilliseconds) {
lastCall = now;
return 0;
} else {
long difference = intervalTimeMilliseconds - (now - lastCall);
lastCall = now + difference;
return difference;
}
} else {
lastCall += intervalTimeMilliseconds;
return lastCall - now;
}
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> s) {
return new Subscriber<T>(s) {
@Override
public void onCompleted() {
Observable.just(null)
.delay(calculateNextCall(), TimeUnit.MILLISECONDS)
.doOnNext(t -> Timber.d("Call onCompleted: %d", new Date().getTime()))
.doOnNext(t -> {
if (!s.isUnsubscribed()) {
s.onCompleted();
}
})
.subscribe(observer);
}
@Override
public void onError(Throwable error) {
Observable.just(error)
.delay(calculateNextCall(), TimeUnit.MILLISECONDS)
.doOnNext(t1 -> Timber.d("Call onError: %d", new Date().getTime()))
.doOnNext(t -> {
if (!s.isUnsubscribed()) {
s.onError(t);
}
})
.subscribe(observer);
}
@Override
public void onNext(T item) {
Observable.just(item)
.delay(calculateNextCall(), TimeUnit.MILLISECONDS)
.doOnNext(t -> Timber.d("Call onNext: %d", new Date().getTime()))
.doOnNext(t -> {
if (!s.isUnsubscribed()) {
s.onNext(item);
}
})
.subscribe(observer);
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment