Skip to content

Instantly share code, notes, and snippets.

@antoniomaria
Created October 2, 2019 06:13
Show Gist options
  • Save antoniomaria/e7e0e690f50ed83d53ff2d317cd9b137 to your computer and use it in GitHub Desktop.
Save antoniomaria/e7e0e690f50ed83d53ff2d317cd9b137 to your computer and use it in GitHub Desktop.
Reactive Stream Tcp Client
import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BidiFlow, Flow, Sink, Source, Tcp}
import akka.util.ByteString
import akka.{Done, NotUsed}
import org.slf4j.LoggerFactory
import scodec.bits.ByteVector
import scala.concurrent.Future
object ReactiveStreamTcpClient extends App {
val log = LoggerFactory.getLogger("ReactiveStreamTcpClient")
implicit val system: ActorSystem = ActorSystem()
implicit val logAdapter = Logging(system, this.getClass)
implicit lazy val mat = ActorMaterializer()
// Connection to the server
val connection = Tcp().outgoingConnection("127.0.0.1", 1234)
// Single request to server in hexadecimal
val aRequest = "00010001000100346032a1090607608574050801018b0760857405080201ac0a80083735383030303030be10040e01000000065f1f0400001e3d0000"
val sink: Sink[String, Future[Done]] = Sink.foreach[String](response =>
println("Response from server " + response))
connection.
join(logging). // Print everything coming, and leaving from/to the server
join(codec). // Akka stream protocol stack simple
runWith(Source.single(aaRequest).
concat(Source.maybe) // Not to close the stream before getting the response from server
, sink)
def codec: BidiFlow[ByteString, String, String, ByteString, NotUsed] = {
val incoming = Flow[ByteString].map(toHex)
val outcoming = Flow[String].map { hexString => ByteString(ByteVector.fromValidHex(hexString).toByteBuffer) }
BidiFlow.fromFlows(incoming, outcoming)
}
def logging(implicit logger: LoggingAdapter): BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = {
def loggingFlow[M](tag: String, logger: LoggingAdapter): Flow[ByteString, ByteString, NotUsed] =
Flow[ByteString].map { buf =>
if (logger.isDebugEnabled) {
logger.debug("{}: {}", tag, toHex(buf))
}
buf
}
BidiFlow.fromFlows(
loggingFlow("from server", logger),
loggingFlow("to server", logger)
)
}
def toHex(buf: ByteString): String = {
buf.map(x => "%02X" format x).mkString
}
}
@antoniomaria
Copy link
Author

Reactive TCP client using akka stream tcp-io with a sample protocol stack codec.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment