Last active
October 27, 2018 16:02
-
-
Save akarnokd/8f1a3e31882c076c704f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hu.akarnokd.reactiveflowbridge; | |
import java.util.Objects; | |
import java.util.concurrent.Flow; | |
import java.util.function.Function; | |
/** | |
* Bridge between Reactive-Streams API and the Java 9 Flow API. | |
*/ | |
public final class ReactiveFlowBridge { | |
/** Utility class. */ | |
private ReactiveFlowBridge() { | |
throw new IllegalStateException("No instances!"); | |
} | |
/** | |
* Converts a Flow Publisher into a Reactive Publisher. | |
* @param <T> the value type | |
* @param flowPublisher the source Flow Publisher to convert | |
* @return the equivalent Reactive Publisher | |
*/ | |
@SuppressWarnings("unchecked") | |
public static <T> org.reactivestreams.Publisher<T> toReactive( | |
Flow.Publisher<? extends T> flowPublisher) { | |
Objects.requireNonNull(flowPublisher, "flowPublisher"); | |
if (flowPublisher instanceof org.reactivestreams.Publisher) { | |
return (org.reactivestreams.Publisher<T>)flowPublisher; | |
} | |
if (flowPublisher instanceof FlowPublisherFromReactive) { | |
return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactive); | |
} | |
return new ReactivePublisherFromFlow<>(flowPublisher); | |
} | |
/** | |
* Converts a Reactive Publisher into a Flow Publisher. | |
* @param <T> the value type | |
* @param reactivePublisher the source Reactive Publisher to convert | |
* @return the equivalent Flow Publisher | |
*/ | |
@SuppressWarnings("unchecked") | |
public static <T> Flow.Publisher<T> toFlow( | |
org.reactivestreams.Publisher<? extends T> reactivePublisher | |
) { | |
Objects.requireNonNull(reactivePublisher, "reactivePublisher"); | |
if (reactivePublisher instanceof org.reactivestreams.Publisher) { | |
return (Flow.Publisher<T>)reactivePublisher; | |
} | |
if (reactivePublisher instanceof ReactivePublisherFromFlow) { | |
return (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactivePublisher).flow); | |
} | |
return new FlowPublisherFromReactive<>(reactivePublisher); | |
} | |
/** | |
* Converts a Flow Processor into a Reactive Processor. | |
* @param <T> the input value type | |
* @param <U> the output value type | |
* @param flowProcessor the source Flow Processor to convert | |
* @return the equivalent Reactive Processor | |
*/ | |
@SuppressWarnings("unchecked") | |
public static <T, U> org.reactivestreams.Processor<T, U> toReactive( | |
Flow.Processor<? super T, ? extends U> flowProcessor | |
) { | |
Objects.requireNonNull(flowProcessor, "flowProcessor"); | |
if (flowProcessor instanceof org.reactivestreams.Processor) { | |
return (org.reactivestreams.Processor<T, U>)flowProcessor; | |
} | |
if (flowProcessor instanceof FlowToReactiveProcessor) { | |
return (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactive); | |
} | |
return new ReactiveToFlowProcessor<>(flowProcessor); | |
} | |
/** | |
* Converts a Reactive Processor into a Flow Processor. | |
* @param <T> the input value type | |
* @param <U> the output value type | |
* @param reactiveProcessor the source Reactive Processor to convert | |
* @return the equivalent Flow Processor | |
*/ | |
@SuppressWarnings("unchecked") | |
public static <T, U> Flow.Processor<T, U> toFlow( | |
org.reactivestreams.Processor<? super T, ? extends U> reactiveProcessor | |
) { | |
Objects.requireNonNull(reactiveProcessor, "reactiveProcessor"); | |
if (reactiveProcessor instanceof org.reactivestreams.Processor) { | |
return (Flow.Processor<T, U>)reactiveProcessor; | |
} | |
if (reactiveProcessor instanceof ReactiveToFlowProcessor) { | |
return (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveProcessor).flow); | |
} | |
return new FlowToReactiveProcessor<>(reactiveProcessor); | |
} | |
/** | |
* Converts a Reactive Publisher into a Flow Publisher via a function then | |
* applies the returned Flow Publisher to a transformer function that returns a value. | |
* <p> | |
* This allows a more inlined conversion from a Reactive Publisher to a | |
* fluent Flow-based API: | |
* <pre><code> | |
* org.reactivestreams.Publisher<T> source = ... | |
* | |
* ReactiveFlowBridge.to( | |
* source, | |
* ReactiveFlowBridge::toFlow, | |
* FluentFlow::from) | |
* .map(v -> v) | |
* .subscribe(...); | |
* </code></pre> | |
* @param <T> the element type | |
* @param <U> the output type | |
* @param reactivePublisher the source Reactive Publisher | |
* @param toFlow the function that converts from Reactive Publisher into Flow Publisher | |
* @param transformer the transformer that takes a Flow Publisher and returns some transformation of it | |
* @return the transformation result | |
*/ | |
public static <T, U> U to( | |
org.reactivestreams.Publisher<? extends T> reactivePublisher, | |
Function<? super org.reactivestreams.Publisher<? extends T>, ? extends Flow.Publisher<? extends T>> toFlow, | |
Function<Flow.Publisher<? extends T>, U> transformer) { | |
return transformer.apply(toFlow.apply(reactivePublisher)); | |
} | |
/** | |
* Converts a Flow Publisher into a Reactive Publisher via a function then | |
* applies the returned Reactive Publisher to a transformer function that returns a value. | |
* <p> | |
* This allows a more inlined conversion from a Flow Publisher to a | |
* fluent Reactive-based API: | |
* <pre><code> | |
* Flow.Publisher<T> source = ... | |
* | |
* ReactiveFlowBridge.to( | |
* source, | |
* ReactiveFlowBridge::toFlow, | |
* Observable::fromPublisher) | |
* .map(v -> v) | |
* .subscribe(...); | |
* </code></pre> | |
* @param <T> the element type | |
* @param <U> the output type | |
* @param flowPublisher the source Reactive Publisher | |
* @param toReactive the function that converts from Reactive Publisher into Flow Publisher | |
* @param transformer the transformer that takes a Flow Publisher and returns some transformation of it | |
* @return the transformation result | |
*/ | |
public static <T, U> U to( | |
Flow.Publisher<? extends T> flowPublisher, | |
Function<? super Flow.Publisher<? extends T>, ? extends org.reactivestreams.Publisher<? extends T>> toReactive, | |
Function<org.reactivestreams.Publisher<? extends T>, U> transformer) { | |
return transformer.apply(toReactive.apply(flowPublisher)); | |
} | |
/** | |
* Wraps a Reactive Subscription and converts the calls to a Flow Subscription. | |
*/ | |
static final class FlowToReactiveSubscription implements Flow.Subscription { | |
private final org.reactivestreams.Subscription reactive; | |
public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) { | |
this.reactive = reactive; | |
} | |
@Override | |
public void request(long n) { | |
reactive.request(n); | |
} | |
@Override | |
public void cancel() { | |
reactive.cancel(); | |
} | |
} | |
/** | |
* Wraps a Flow Subscription and converts the calls to a Reactive Subscription. | |
*/ | |
static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription { | |
private final Flow.Subscription flow; | |
public ReactiveToFlowSubscription(Flow.Subscription flow) { | |
this.flow = flow; | |
} | |
@Override | |
public void request(long n) { | |
flow.request(n); | |
} | |
@Override | |
public void cancel() { | |
flow.cancel(); | |
} | |
} | |
/** | |
* Wraps a Reactive Subscriber and forwards methods of the Flow Subscriber to it. | |
* @param <T> the value type | |
*/ | |
static final class FlowToReactiveSubscriber<T> | |
implements Flow.Subscriber<T> { | |
private final org.reactivestreams.Subscriber<? super T> reactive; | |
public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) { | |
this.reactive = reactive; | |
} | |
@Override | |
public void onSubscribe(Flow.Subscription subscription) { | |
reactive.onSubscribe(new ReactiveToFlowSubscription(subscription)); | |
} | |
@Override | |
public void onNext(T item) { | |
reactive.onNext(item); | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
reactive.onError(throwable); | |
} | |
@Override | |
public void onComplete() { | |
reactive.onComplete(); | |
} | |
} | |
/** | |
* Wraps a Reactive Subscriber and forwards methods of the Flow Subscriber to it. | |
* @param <T> the value type | |
*/ | |
static final class ReactiveToFlowSubscriber<T> | |
implements org.reactivestreams.Subscriber<T> { | |
private final Flow.Subscriber<? super T> flow; | |
public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) { | |
this.flow = flow; | |
} | |
@Override | |
public void onSubscribe(org.reactivestreams.Subscription subscription) { | |
flow.onSubscribe(new FlowToReactiveSubscription(subscription)); | |
} | |
@Override | |
public void onNext(T item) { | |
flow.onNext(item); | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
flow.onError(throwable); | |
} | |
@Override | |
public void onComplete() { | |
flow.onComplete(); | |
} | |
} | |
/** | |
* Wraps a Flow Processor and forwards methods of the Reactive Processor to it. | |
* @param <T> the input type | |
* @param <U> the output type | |
*/ | |
static final class ReactiveToFlowProcessor<T, U> | |
implements org.reactivestreams.Processor<T, U> { | |
private final Flow.Processor<? super T, ? extends U> flow; | |
public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) { | |
this.flow = flow; | |
} | |
@Override | |
public void onSubscribe(org.reactivestreams.Subscription s) { | |
flow.onSubscribe(new FlowToReactiveSubscription(s)); | |
} | |
@Override | |
public void onNext(T t) { | |
flow.onNext(t); | |
} | |
@Override | |
public void onError(Throwable t) { | |
flow.onError(t); | |
} | |
@Override | |
public void onComplete() { | |
flow.onComplete(); | |
} | |
@Override | |
public void subscribe(org.reactivestreams.Subscriber<? super U> s) { | |
flow.subscribe(new FlowToReactiveSubscriber<>(s)); | |
} | |
} | |
/** | |
* Wraps a Reactive Processor and forwards methods of the Flow Processor to it. | |
* @param <T> the input type | |
* @param <U> the output type | |
*/ | |
static final class FlowToReactiveProcessor<T, U> | |
implements Flow.Processor<T, U> { | |
private final org.reactivestreams.Processor<? super T, ? extends U> reactive; | |
public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) { | |
this.reactive = reactive; | |
} | |
@Override | |
public void onSubscribe(Flow.Subscription s) { | |
reactive.onSubscribe(new ReactiveToFlowSubscription(s)); | |
} | |
@Override | |
public void onNext(T t) { | |
reactive.onNext(t); | |
} | |
@Override | |
public void onError(Throwable t) { | |
reactive.onError(t); | |
} | |
@Override | |
public void onComplete() { | |
reactive.onComplete(); | |
} | |
@Override | |
public void subscribe(Flow.Subscriber<? super U> s) { | |
reactive.subscribe(new ReactiveToFlowSubscriber<>(s)); | |
} | |
} | |
/** | |
* Reactive Publisher that wraps a Flow Publisher. | |
* @param <T> the value type | |
*/ | |
static final class ReactivePublisherFromFlow<T> implements org.reactivestreams.Publisher<T> { | |
final Flow.Publisher<? extends T> flow; | |
public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) { | |
this.flow = flowPublisher; | |
} | |
@Override | |
public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) { | |
flow.subscribe(new FlowToReactiveSubscriber<>(reactive)); | |
} | |
} | |
/** | |
* Flow Publisher that wraps a Reactive Publisher. | |
* @param <T> the value type | |
*/ | |
static final class FlowPublisherFromReactive<T> implements Flow.Publisher<T> { | |
final org.reactivestreams.Publisher<? extends T> reactive; | |
public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reactivePublisher) { | |
this.reactive = reactivePublisher; | |
} | |
@Override | |
public void subscribe(Flow.Subscriber<? super T> flow) { | |
reactive.subscribe(new ReactiveToFlowSubscriber<>(flow)); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment