Skip to content

Instantly share code, notes, and snippets.

@rrevol
Last active July 15, 2016 12:41
Show Gist options
  • Save rrevol/b08f7231887321f584bc76e20dc81fa7 to your computer and use it in GitHub Desktop.
Save rrevol/b08f7231887321f584bc76e20dc81fa7 to your computer and use it in GitHub Desktop.
In RxJava : Highlighting of an unexpected difference between flatmap and concatmap when the incoming stream has only one element
import java.util.*;
import java.util.function.*;
import rx.Observable;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import static rx.Observable.*;
import static rx.internal.util.UtilityFunctions.identity;
import static rx.schedulers.Schedulers.*;
public class FlatMapVSConcatMapTest {
private static final int trialNb = 10;
private static final int testSize = 10000; // the greater the size, the more chances to have failures
private FlatMapVSConcatMapTest() {
}
public static void main(String[] args) {
System.out.print("With ConcatMap : ");
testLoop(FlatMapVSConcatMapTest::initIntFlowWithConcatMap);
System.out.print("With FlatMap : ");
testLoop(FlatMapVSConcatMapTest::initIntFlowWithFlatMap);
}
private static void testLoop(Function<Integer, Observable<Integer>> intFlowProducer) {
TestSubscriber<String> subscriber = new TestSubscriber<>();
range(0, trialNb) //
.flatMap(any -> initTestFlowWithZip(intFlowProducer)) //
.filter(identity()) //
.count() //
.map(count -> "run " + trialNb + ", failures : " + (trialNb - count)) //
.doOnNext(System.out::println) //
.subscribe(subscriber);
subscriber.awaitTerminalEvent();
}
private static Observable<? extends Boolean> initTestFlowWithZip(Function<Integer, Observable<Integer>> intFlowProducer) {
Observable<Integer> flow1 = intFlowProducer.apply(testSize);
Observable<Integer> flow2 = intFlowProducer.apply(testSize);
return Observable.zip(flow1, flow2, Objects::equals).all(identity());
}
private static Observable<Integer> initIntFlowWithFlatMap(int size) {
return just(size)// here we have a reduce, but for the repro case, just will do the job : returning only one element
.observeOn(io()) // because function retrieveFlowToMap performs io to init the returned observable
.flatMap(FlatMapVSConcatMapTest::retrieveFlowToMap);
}
private static Observable<Integer> initIntFlowWithConcatMap(int size) {
return just(size)// here we have a reduce, but for the repro case, just will do the job : returning only one element
.observeOn(io()) // because function retrieveFlowToMap performs io to init the returned observable
.concatMap(FlatMapVSConcatMapTest::retrieveFlowToMap);
}
private static Observable<Integer> retrieveFlowToMap(int size) {
return range(0, size) // here we init the flow from a custom OnSubscribe, but for the repro case, range will do the job
.subscribeOn(io()) // because call to request() in our custom OnSubscribe performs io
.observeOn(computation()); // because we then perform computations on the returned data
}
private static Observable<? extends Boolean> initTestFlowWithOrderCheck(Function<Integer, Observable<Integer>> intFlowProducer) {
Observable<Integer> flow = intFlowProducer.apply(testSize);
return flow.all(new Func1<Integer, Boolean>() {
private int previous = -1;
@Override
public Boolean call(Integer integer) {
boolean increasing = previous < integer;
previous = integer;
return increasing;
}
});
}
}
@rrevol
Copy link
Author

rrevol commented Jul 15, 2016

Typical output of the main :

With ConcatMap : run 10, failures : 0
With FlatMap : run 10, failures : 6

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment