Created
October 2, 2019 06:13
-
-
Save antoniomaria/e7e0e690f50ed83d53ff2d317cd9b137 to your computer and use it in GitHub Desktop.
Reactive Stream Tcp Client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.event.{Logging, LoggingAdapter} | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.{BidiFlow, Flow, Sink, Source, Tcp} | |
import akka.util.ByteString | |
import akka.{Done, NotUsed} | |
import org.slf4j.LoggerFactory | |
import scodec.bits.ByteVector | |
import scala.concurrent.Future | |
object ReactiveStreamTcpClient extends App { | |
val log = LoggerFactory.getLogger("ReactiveStreamTcpClient") | |
implicit val system: ActorSystem = ActorSystem() | |
implicit val logAdapter = Logging(system, this.getClass) | |
implicit lazy val mat = ActorMaterializer() | |
// Connection to the server | |
val connection = Tcp().outgoingConnection("127.0.0.1", 1234) | |
// Single request to server in hexadecimal | |
val aRequest = "00010001000100346032a1090607608574050801018b0760857405080201ac0a80083735383030303030be10040e01000000065f1f0400001e3d0000" | |
val sink: Sink[String, Future[Done]] = Sink.foreach[String](response => | |
println("Response from server " + response)) | |
connection. | |
join(logging). // Print everything coming, and leaving from/to the server | |
join(codec). // Akka stream protocol stack simple | |
runWith(Source.single(aaRequest). | |
concat(Source.maybe) // Not to close the stream before getting the response from server | |
, sink) | |
def codec: BidiFlow[ByteString, String, String, ByteString, NotUsed] = { | |
val incoming = Flow[ByteString].map(toHex) | |
val outcoming = Flow[String].map { hexString => ByteString(ByteVector.fromValidHex(hexString).toByteBuffer) } | |
BidiFlow.fromFlows(incoming, outcoming) | |
} | |
def logging(implicit logger: LoggingAdapter): BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = { | |
def loggingFlow[M](tag: String, logger: LoggingAdapter): Flow[ByteString, ByteString, NotUsed] = | |
Flow[ByteString].map { buf => | |
if (logger.isDebugEnabled) { | |
logger.debug("{}: {}", tag, toHex(buf)) | |
} | |
buf | |
} | |
BidiFlow.fromFlows( | |
loggingFlow("from server", logger), | |
loggingFlow("to server", logger) | |
) | |
} | |
def toHex(buf: ByteString): String = { | |
buf.map(x => "%02X" format x).mkString | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Reactive TCP client using akka stream tcp-io with a sample protocol stack codec.