Skip to content

Instantly share code, notes, and snippets.

@HaloFour
Created July 15, 2020 22:26
Show Gist options
  • Save HaloFour/e46da71f9a2876e5541b932789df6b36 to your computer and use it in GitHub Desktop.
Save HaloFour/e46da71f9a2876e5541b932789df6b36 to your computer and use it in GitHub Desktop.
Experiments with Project Loom
package sandbox;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static org.assertj.core.api.Assertions.assertThat;
import static sandbox.Async.async;
import static sandbox.Async.await;
import static sandbox.Async.mono;
import static sandbox.Generators.createGenerator;
import static sandbox.Helpers.delay;
import static sandbox.Helpers.hot;
import static sandbox.Helpers.log;
import static sandbox.Helpers.logStats;
import static sandbox.Helpers.withVirtualExecutor;
public class LoomTest {
@Test
void testContinuation() {
var continuationScope = new ContinuationScope("Continuations");
Continuation.yield(continuationScope);
Runnable task = () -> log("Hello from Continuation!");
var continuation = new Continuation(continuationScope, task);
continuation.run();
assertThat(continuation.isDone()).isTrue();
}
@Test
void testContinuationYield() {
var continuationScope = new ContinuationScope("Continuations");
var continuation = new Continuation(continuationScope, () -> {
log("Hello from Continuation!");
Continuation.yield(continuationScope);
log("Hello again from Continuation!");
});
continuation.run();
log("Now we're outside of the Continuation!");
assertThat(continuation.isDone()).isFalse();
continuation.run();
log("Outside of the Continuation yet again!");
assertThat(continuation.isDone()).isTrue();
}
@Test
void testGenerators() {
Iterable<Integer> iterable = createGenerator(consumer -> {
consumer.accept(3);
consumer.accept(2);
consumer.accept(1);
});
for (var i : iterable) {
log("Iterated %d.", i);
}
}
@Test
void testResumeFromAnotherThread() throws InterruptedException {
var continuationScope = new ContinuationScope("Continuations");
var continuation = new Continuation(continuationScope, () -> {
log("Hello from Continuation!");
Continuation.yield(continuationScope);
log("Hello again from Continuation!");
});
continuation.run();
log("Now we're outside of the Continuation!");
assertThat(continuation.isDone()).isFalse();
Thread.builder()
.name("some-other-thread")
.task(continuation::run)
.start()
.join();
assertThat(continuation.isDone()).isTrue();
}
@Test
void testAsyncAwait() {
CompletionStage<String> future = async(() -> {
log("Async method started.");
CompletionStage<String> future1 = delay(500, TimeUnit.MILLISECONDS)
.thenApply(ignored -> "Hello");
log("First future started.");
CompletionStage<String> future2 = delay(750, TimeUnit.MILLISECONDS)
.thenApply(ignored -> "World");
log("Second future started.");
String value1 = await(future1);
log("First future completed = %s.", value1);
String value2 = await(future2);
log("Second future completed = %s.", value2);
return String.format("%s %s!", value1, value2);
});
log("We're waiting on the coroutine to finish now.");
var result = future.toCompletableFuture().join();
assertThat(result).isEqualTo("Hello World!");
}
@Test
void testAsyncAwaitReactive() {
Mono<String> mono = mono(() -> {
log("Async method started.");
Mono<String> mono1 = Mono.delay(Duration.ofMillis(500))
.map(ignored -> "Hello")
.as(hot());
Mono<String> mono2 = Mono.delay(Duration.ofMillis(600))
.map(ignored -> "World")
.as(hot());
String value1 = await(mono1);
log("First publisher published: %s", value1);
String value2 = await(mono2);
log("Second publisher published: %s", value2);
return String.format("%s %s!", value1, value2);
});
StepVerifier.create(mono)
.expectNext("Hello World!")
.verifyComplete();
}
@Test
void testVirtualThread() throws InterruptedException {
var virtualThread1 = Thread.startVirtualThread(() -> log("Hello from Virtual Thread!"));
virtualThread1.join();
var virtualThread2 = Thread.builder()
.virtual()
.name("virtual-thread")
.task(() -> log("Hello from another Virtual Thread!"))
.start();
virtualThread2.join();
var virtualThread3 = Thread.builder()
.virtual(Runnable::run)
.name("another-virtual-thread")
.task(() -> {
log("I'm sleepy");
sleepUninterruptibly(1, TimeUnit.SECONDS);
log("Hello from a third Virtual Thread!");
})
.start();
log("Third virtual thread started!");
virtualThread3.join();
}
@Test
void testVirtualThreadExecutor() {
var workerThreadFactory = Thread.builder()
.name("worker-thread")
.factory();
try (var workerExecutor = Executors.newSingleThreadExecutor(workerThreadFactory)) {
workerExecutor.submitTask(() -> "prime").join();
var virtualThreadFactory = Thread.builder()
.name("virtual-", 0)
.virtual(workerExecutor)
.factory();
try (var executorService = Executors.newUnboundedExecutor(virtualThreadFactory)) {
int threads = 25_000;
var startLatch = new CountDownLatch(threads);
var endLatch = new CountDownLatch(1);
var memoryStats = logStats();
log("Starting virtual threads.");
for (int i = 0; i < threads; i++) {
executorService.submit(() -> {
startLatch.countDown();
awaitUninterruptibly(endLatch);
});
}
awaitUninterruptibly(startLatch);
log("Finished starting %d virtual threads.", threads);
logStats(threads, memoryStats);
endLatch.countDown();
}
log("Virtual threads finished.");
}
}
@Test
void testFluxBlockingIterable() {
var flux = Flux.interval(Duration.of(1, ChronoUnit.SECONDS))
.publish()
.autoConnect(0);
withVirtualExecutor(1, virtualExecutor -> {
for (int i = 0; i < 5; i++) {
final int id = i;
virtualExecutor.submit(() -> {
for (var value : flux.take(10).toIterable()) {
log("%d: Received value %d", id, value);
}
});
}
});
}
@Test
void testChannelQueue() {
var channel = new Channel<String>();
withVirtualExecutor(1, virtualExecutor -> {
virtualExecutor.submit(() -> {
for (var value : channel.receive()) {
log("Received \"%s\"", value);
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
log("All messages received.");
});
virtualExecutor.submit(() -> {
for (int i = 0; i < 10; i++) {
var value = String.format("Hello %d!", i);
log("Sending \"%s\".", value);
channel.send(value);
}
log("Closing channel.");
channel.close();
});
});
}
@Test
void testHttpClient() {
WireMockServer wireMockServer = new WireMockServer(WireMockConfiguration.options()
.port(8080)
.containerThreads(20)
);
log("Starting WireMock");
wireMockServer.start();
try {
WireMock.stubFor(WireMock.get("/hello")
.willReturn(WireMock.ok("Hello World!")
.withFixedDelay(2000)));
var httpClient = HttpClient.newBuilder()
.executor(Executors.newSingleThreadExecutor())
.build();
withVirtualExecutor(1, virtualExecutor -> {
for (int i = 0; i < 10; i++) {
final int id = i;
virtualExecutor.execute(() -> {
var request = HttpRequest.newBuilder(URI.create("http://localhost:8080/hello"))
.GET()
.build();
log("%d: Sending request.", id);
try {
var response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
log("%d: Got response: %s", id, response.body());
} catch (Exception exception) {
log("%d: Error! %s", id, exception);
}
});
}
});
} finally {
wireMockServer.stop();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment