Last active
February 27, 2018 07:57
-
-
Save IgorBerman/188614485e192d7d62df700f23dddf91 to your computer and use it in GitHub Desktop.
akka-http akka-streams java8
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example; | |
import java.util.concurrent.CompletionStage; | |
import java.util.concurrent.TimeUnit; | |
import scala.runtime.BoxedUnit; | |
import scala.util.Try; | |
import akka.Done; | |
import akka.NotUsed; | |
import akka.actor.ActorSystem; | |
import akka.http.javadsl.ConnectHttp; | |
import akka.http.javadsl.HostConnectionPool; | |
import akka.http.javadsl.Http; | |
import akka.http.javadsl.marshallers.jackson.Jackson; | |
import akka.http.javadsl.model.HttpRequest; | |
import akka.http.javadsl.model.HttpResponse; | |
import akka.japi.Pair; | |
import akka.stream.ActorMaterializer; | |
import akka.stream.Materializer; | |
import akka.stream.javadsl.Flow; | |
import akka.stream.javadsl.Keep; | |
import akka.stream.javadsl.RunnableGraph; | |
import akka.stream.javadsl.Sink; | |
import akka.stream.javadsl.Source; | |
public class StreamWithHttpPoolExample { | |
//see https://jsonplaceholder.typicode.com/posts/1 | |
@SuppressWarnings("unused") | |
private static class Post { | |
private int userId; | |
private int id; | |
private String title; | |
private String body; | |
public int getUserId() { | |
return userId; | |
} | |
public void setUserId(int userId) { | |
this.userId = userId; | |
} | |
public int getId() { | |
return id; | |
} | |
public void setId(int id) { | |
this.id = id; | |
} | |
public String getTitle() { | |
return title; | |
} | |
public void setTitle(String title) { | |
this.title = title; | |
} | |
public String getBody() { | |
return body; | |
} | |
public void setBody(String body) { | |
this.body = body; | |
} | |
@Override | |
public String toString() { | |
return "Post [userId=" + userId + ", id=" + id + ", title=" + title + "]"; | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
final ActorSystem system = ActorSystem.create(); | |
final Materializer mat = ActorMaterializer.create(system); | |
//take some source | |
Source<Integer,NotUsed> source = Source.range(1, 10).named("source"); | |
final String externalResource = "http://jsonplaceholder.typicode.com/"; | |
//map source to requests to external resource | |
Source<Pair<HttpRequest, Integer>, NotUsed> requestsStream = source.map(y -> { | |
system.log().info("Creating reqest for " + y); | |
HttpRequest req = HttpRequest.create(externalResource + "posts/"+y); | |
return Pair.create(req, y);//we create Pair here due to API of cached host connection pool, see below! | |
}).named("requests"); | |
//create http pool as flow | |
final Http http = Http.get(system); | |
final ConnectHttp connectHttp = ConnectHttp.toHost(externalResource); | |
Flow<Pair<HttpRequest, Integer>, Pair<Try<HttpResponse>, Integer>, HostConnectionPool> cachedHostConnectionPool | |
= http.cachedHostConnectionPool(connectHttp, mat); | |
//map requests to maybe responses | |
Source<Pair<Try<HttpResponse>, Integer>, NotUsed> maybeResponsesStream = requestsStream.via(cachedHostConnectionPool).named("mayberesps"); | |
//extract from response body of successfull requests body with json and map it to Post object | |
Source<Post, NotUsed> postsStream = maybeResponsesStream.mapAsync(4, x -> { | |
system.log().info("MapAsync" + x); | |
Try<HttpResponse> maybeResponse = x.first(); | |
if (maybeResponse.isFailure()) { | |
throw new RuntimeException(x.first().failed().get()); | |
} else { | |
HttpResponse rsp = x.first().get(); | |
return Jackson.unmarshaller(Post.class).unmarshal(rsp.entity(), system.dispatcher(), mat); | |
} | |
}).named("posts"); | |
Sink<Post, CompletionStage<Done>> consoleSink = Sink.foreach(x -> system.log().info("Out:" + x)); | |
RunnableGraph<Pair<NotUsed,CompletionStage<Done>>> graph = postsStream.toMat(consoleSink, Keep.both()); | |
Pair<NotUsed, CompletionStage<Done>> notUsedWithCompletionHook = graph.run(mat); | |
notUsedWithCompletionHook.second().whenComplete((x,t) -> { | |
try { | |
system.log().warning("Stream completed" + (t != null ? " with exception " + t + ", " + t.getCause() : "")); | |
try { | |
CompletionStage<BoxedUnit> shutdownAllConnectionPools = http.shutdownAllConnectionPools(); | |
shutdownAllConnectionPools.toCompletableFuture().get(1000, TimeUnit.MILLISECONDS); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
}finally { | |
system.terminate(); | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment