Last active
June 17, 2020 21:33
-
-
Save mwmitchell/ac14f8ebaf708f0869e0dff9265b1521 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.test; | |
import com.google.common.base.Stopwatch; | |
import java.time.Duration; | |
import java.util.List; | |
import java.util.concurrent.ThreadLocalRandom; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.Function; | |
import reactor.core.Disposable; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
public class ReactorMonoToFluxWaitExample { | |
public static void main(String[] args) throws InterruptedException { | |
// A function that accepts an initial input, and returns a Flux<Object> | |
Function<String, Flux<Object>> replies = (input) -> Flux.create(sink -> { | |
Thread t = new Thread(() -> { | |
while (!Thread.currentThread().isInterrupted()) { | |
try { | |
// Simulate work... | |
int sleepTime = ThreadLocalRandom.current().nextInt(100, 1000); | |
Thread.sleep(sleepTime); | |
// Finally, emit a value | |
sink.next(input + " -> " + sleepTime); | |
} catch (InterruptedException interruptedException) { | |
// This thread has been interrupted... which means the subscription to the Flux has completed. | |
System.out.println("Generator thread interrupted..."); | |
Thread.currentThread().interrupt(); | |
break; | |
} | |
} | |
}); | |
sink.onRequest(new LongConsumer() { | |
@Override | |
public void accept(long value) { | |
System.out.println("onRequest... request is for " + value + " items, but we'll generate until onCancel / onDispose"); | |
} | |
}); | |
sink.onCancel(new Disposable() { | |
@Override | |
public void dispose() { | |
System.out.println("sink.onCancel called"); | |
// When the subscription is cancelled, interrupt the generator thread above | |
t.interrupt(); | |
} | |
}); | |
sink.onDispose(new Disposable() { | |
@Override | |
public void dispose() { | |
System.out.println("sink.onDispose called"); | |
// When the subscription is completed, interrupt the generator thread above | |
t.interrupt(); | |
} | |
}); | |
// Start the generator thread | |
t.start(); | |
}); | |
Stopwatch stopwatch = Stopwatch.createUnstarted(); | |
// Single input: | |
List<Object> results = Mono.just("hi") | |
// We want many replies: | |
.flatMapMany(replies) | |
// Only request n items, this is arbitrary | |
.limitRequest(6) | |
// and wait only for n seconds | |
.take(Duration.ofSeconds(5)) | |
// Log some output for each item received | |
.doOnNext(value -> System.out.println("doOnNext: " + value + ", stopwatch.elapsed -> " + stopwatch.elapsed(TimeUnit.MILLISECONDS))) | |
// Map to a list of results (should be <= what was passed into limitRequest(...)) | |
.collectList() | |
// Start the stopwatch when subscribed to | |
.doOnSubscribe(s -> stopwatch.start()) | |
// Log everything (reactor debugging utility) | |
.log() | |
// Now wait for n items to be produced, or n seconds have passed | |
.block(); | |
System.err.println("Done in " + stopwatch.stop().elapsed(TimeUnit.MILLISECONDS) + " ms. Got " + results.size() + " results: " + results); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment