Skip to content

Instantly share code, notes, and snippets.

@HaloFour
Created July 17, 2020 00:18
Show Gist options
  • Save HaloFour/a2c3d2e7b116801a08949e71af810896 to your computer and use it in GitHub Desktop.
Save HaloFour/a2c3d2e7b116801a08949e71af810896 to your computer and use it in GitHub Desktop.
Async/Await in Loom
package sandbox;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
public class Async {
private static final ContinuationScope continuationScope = new ContinuationScope("Async");
public static <T> CompletionStage<T> async(Callable<T> action) {
var future = new CompletableFuture<T>();
var continuation = new Continuation(continuationScope, () -> {
try {
var result = action.call();
future.complete(result);
} catch (Exception exception) {
future.completeExceptionally(exception);
}
});
continuation.run();
return future;
}
public static <T> T await(CompletionStage<T> completionStage) {
var currentContinuation = Continuation.getCurrentContinuation(continuationScope);
var future = completionStage.toCompletableFuture();
if (!future.isDone()) {
future.whenComplete((result, error) -> currentContinuation.run());
Continuation.yield(continuationScope);
}
return future.join();
}
public static <T> Mono<T> mono(Callable<T> action) {
return Mono.create(sink -> {
var continuation = new Continuation(continuationScope, () -> {
try {
var result = action.call();
sink.success(result);
} catch (Exception exception) {
sink.error(exception);
}
});
continuation.run();
});
}
public static <T> T await(Mono<T> mono) {
var currentContinuation = Continuation.getCurrentContinuation(continuationScope);
var subscriber = new CoreSubscriber<T>() {
public T result;
public Throwable error;
public volatile boolean done;
@Override
public void onSubscribe(Subscription s) {
s.request(1L);
}
@Override
public void onNext(T result) {
this.result = result;
}
@Override
public void onError(Throwable error) {
this.error = error;
done();
}
@Override
public void onComplete() {
done();
}
private void done() {
done = true;
if (Continuation.getCurrentContinuation(continuationScope) != currentContinuation) {
currentContinuation.run();
}
}
};
mono.subscribe(subscriber);
if (!subscriber.done) {
Continuation.yield(continuationScope);
}
if (subscriber.error != null) {
throw Exceptions.propagate(subscriber.error);
} else {
return subscriber.result;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment