Created
August 2, 2016 09:20
-
-
Save lemonxah/8e8e75558036d8419fb9458e55d46689 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.fullfacing.queue | |
import java.io.{ByteArrayOutputStream, ByteArrayInputStream} | |
import java.util.Properties | |
import java.util.concurrent.ConcurrentHashMap | |
import java.util.zip._ | |
import akka.actor.ActorRef | |
import com.fullfacing.framework.Security | |
import com.fullfacing.message.{Response, Request, Retry} | |
import com.rabbitmq.client.AMQP.BasicProperties | |
import com.rabbitmq.client._ | |
import org.slf4j.LoggerFactory | |
import scala.Console._ | |
import scala.Predef | |
import scala.annotation.tailrec | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.duration._ | |
import scala.language._ | |
import scala.util.Try | |
import scala.util.{Success ⇒ S} | |
import scala.util.{Failure ⇒ F} | |
import scalaz._, Scalaz._ | |
/** | |
* Project: common | |
* Package: com.fullfacing.queue | |
* Created on 20/8/15. | |
* lemonxah aka lemonxah - | |
* https://github.com/lemonxah | |
* http://stackoverflow.com/users/2919672/lemon-xah | |
*/ | |
sealed trait Compression | |
object Compression { | |
def apply(s: String): Compression = s.trim.toLowerCase match { | |
case "gzip" ⇒ Gzip | |
case "deflate" ⇒ Deflate | |
case _ ⇒ NoCompression | |
} | |
} | |
case object NoCompression extends Compression | |
case object Gzip extends Compression | |
case object Deflate extends Compression | |
object RabbitMQ { | |
val logger = LoggerFactory.getLogger("com.fullfacing.queue.RabbitMQ") | |
type SubscribeReader = Reader[Consumer[Connection], Unit] | |
type PublishReader = Reader[Producer[Channel, Connection], Unit] | |
def fib(n : Long) : Long = { | |
@tailrec def fib_tail(n: Long, a: Long, b: Long): Long = n match { | |
case 0 ⇒ a | |
case _ ⇒ fib_tail(n-1, b, a + b) | |
} | |
fib_tail( n, 0, 1) | |
} | |
private implicit val backOffstrat: Long ⇒ Duration = i ⇒ fib(Math.min(9L, i)) seconds | |
private val cache = new ConcurrentHashMap[String, (Array[Byte], String) ⇒ Throwable \/ _]() | |
private val connections = new ConcurrentHashMap[String, Option[Connection]] | |
private val chans = new ConcurrentHashMap[String, Option[Channel]] | |
private def connect(conf: Properties): Throwable \/ Connection = \/.fromTryCatchNonFatal { | |
logger.debug(s"${Console.GREEN}Connecting to :${Console.BLUE} ${getKey(conf)}${Console.RESET}") | |
val factory = new ConnectionFactory | |
factory.setRequestedHeartbeat(conf.getProperty("host.heartBeat", "60").toInt) | |
factory.setVirtualHost(conf.getProperty("host.virtualHost", "/")) | |
factory.setHost(conf.getProperty("host.ip", "")) | |
factory.setPort(conf.getProperty("host.port", "5672").toInt) | |
factory.setAutomaticRecoveryEnabled(true) | |
factory.setNetworkRecoveryInterval(10000) | |
factory.setConnectionTimeout(60000) | |
if (conf.containsKey("host.username")) factory.setUsername(conf.getProperty("host.username")) | |
if (conf.containsKey("host.password")) factory.setPassword(conf.getProperty("host.password")) | |
val ssl = conf.getProperty("host.useSSL", "false").toBoolean | |
logger.info(s"${Console.GREEN}Using RabbitMQ SSL${Console.WHITE}: ${Console.BLUE}$ssl${Console.RESET}") | |
if (ssl) { | |
factory.useSslProtocol(conf.getProperty("host.sslProtocol", "TLSv1.2")) | |
} | |
factory.newConnection() | |
} | |
private def getKey(conf: Properties): String = s"${conf.getProperty("host.ip")}:${conf.getProperty("host.port")}" | |
private def getCon(conf: Properties): Option[Connection] = { | |
if (connections.containsKey(getKey(conf))) connections.get(getKey(conf)) else None | |
} | |
private def getChan(conf: Properties): Option[Channel] = { | |
if (chans.containsKey(getKey(conf))) chans.get(getKey(conf)) else None | |
} | |
implicit def confToConnection(conf: Properties, retries: Retry): Connection = { | |
if (retries.count > 0) logger.trace(s"${Console.GREEN}Trying to connect(${getKey(conf)}), tries: ${retries.count}${Console.RESET}") | |
getCon(conf).getOrElse { | |
chans.put(getKey(conf), None) | |
connections.put(getKey(conf), None) | |
connect(conf).fold({ ex ⇒ | |
logger.error(s"${Console.RED}Connection Error: (${getKey(conf)}), Retry no: ${retries.count}, message: ${ex.getMessage}${Console.RESET}") | |
retries.backOff | |
confToConnection(conf, retries.incr) | |
}, { connection ⇒ | |
connections.put(getKey(conf), Some(connection)) | |
confToConnection(conf, Retry(0)) | |
}) | |
} | |
} | |
implicit def confToChannel(conf: Properties): Channel = | |
getChan(conf).getOrElse { | |
getCon(conf).fold { | |
confToConnection(conf, Retry(0)) | |
confToChannel(conf) | |
} { c ⇒ | |
val ch = c.createChannel() | |
chans.put(getKey(conf), Some(ch)) | |
confToChannel(conf) | |
} | |
} | |
implicit val publisher: Publisher[Channel, Connection] = new Publisher[Channel, Connection] { | |
override def publish(a: Publish): Reader[Producer[Channel, Connection], Unit] = new PublishReader({ p ⇒ | |
val props = new BasicProperties.Builder() | |
val useCompressed = p.props.getProperty("topics.compressed", "false").toBoolean | |
if (useCompressed && a.message.length > 300) props.contentEncoding("gzip") | |
props.contentType("application/json; charset=utf-8") | |
props.appId(p.props.getProperty("app.name", "no app name in config!")) | |
p.producer.basicPublish(p.props.getProperty("exchange.name"), a.topic, props.build(), | |
if (useCompressed && a.message.length > 300) gzip(a.message) else a.message) | |
logger.trace(s"${BLUE}Published message on topic($YELLOW${a.topic}$BLUE)$WHITE:$GREEN${new String(a.message)}$RESET") | |
}) | |
} | |
private def consumer(chan: Channel) = new DefaultConsumer(chan) { | |
override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit = { | |
logger.error(s"${Console.RED}Rabbit consumer shutdown tag($consumerTag)${Console.WHITE}: ${Console.RED} ${Console.RESET}", sig) | |
} | |
def ack(deliveryTag: Long) = { | |
this.getChannel.basicAck(deliveryTag, false) | |
} | |
def nack(deliveryTag: Long) = { | |
this.getChannel.basicNack(deliveryTag, false, true) | |
} | |
def handleMessage(b: Array[Byte], envelope: Envelope)(implicit handle: (Array[Byte], String) ⇒ Throwable \/ _) = { | |
handle(b, envelope.getRoutingKey).fold({ t ⇒ | |
logger.error(s"${RED}Queue data not in expected format$RESET", t) | |
nack(envelope.getDeliveryTag) | |
}, {d ⇒ | |
logger.trace(s"${BLUE}Got message on topic($YELLOW${envelope.getRoutingKey}$BLUE)$WHITE:$GREEN $d$RESET") | |
ack(envelope.getDeliveryTag) | |
}) | |
} | |
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]): Unit = { | |
try { | |
val contentEncoding = Option(properties.getContentEncoding) | |
val contentType = Option(properties.getContentType) | |
val compression: Compression = contentEncoding.map(Compression.apply).getOrElse(NoCompression) | |
logger.trace(s"${BLUE}Content encoding for message($YELLOW${envelope.getDeliveryTag}$BLUE)$WHITE:$GREEN $contentEncoding${Console.RESET}") | |
logger.trace(s"${BLUE}Content type for message($YELLOW${envelope.getDeliveryTag}$BLUE)$WHITE:$GREEN $contentType${Console.RESET}") | |
logger.trace(s"${BLUE}Compression for message($YELLOW${envelope.getDeliveryTag}$BLUE)$WHITE:$GREEN $compression${Console.RESET}") | |
def failedDecompress(t: Throwable) = | |
logger.error(s"${Console.RED}Decompression failed: $compression($YELLOW${envelope.getDeliveryTag}$RED)${Console.RESET}", t) | |
if (cache.containsKey(envelope.getRoutingKey) || cache.containsKey("#")) { | |
implicit val handle = if (cache.containsKey(envelope.getRoutingKey)) cache.get(envelope.getRoutingKey) else cache.get("#") | |
uzip(body, compression).fold(failedDecompress, handleMessage(_, envelope)) | |
} else { | |
logger.trace(s"${Console.RED}Dropping message on topic not listening directry for: ${Console.BLUE}${envelope.getRoutingKey}${Console.RESET}") | |
ack(envelope.getDeliveryTag) | |
} | |
} catch { | |
case e: Exception ⇒ | |
logger.error(s"${Console.RED}MessageDelivery error: ${e.getMessage}${Console.RESET}", e) | |
ack(envelope.getDeliveryTag) | |
// deadletter publishing to exchange FF topic Dead Letter | |
// drop dead letters for now else we would duplicate the dead letter messages by the number of | |
// queues that are subscribed to that routing key. | |
} | |
} | |
} | |
private def subscr[M[_]](a: List[Subscribe[_, _, M]], s: Consumer[Connection], chan: Channel, qName: String): Unit = { | |
val exchange = s.props.getProperty("exchange.name") | |
a.foreach(t ⇒ cache.putIfAbsent(t.topic, t.handle)) | |
val (fullfacing, other) = a.map(_.topic).partition(s ⇒ s.split('.').length == 4 && s.startsWith("fullfacing")) | |
(fullfacing.map(_.split('.').toList) | |
.groupBy(_ (1)) | |
.flatMap { case (k, v) ⇒ | |
val org = v.head.head | |
if (v.size < 2) { | |
List(s"$org.$k.${v.head(2)}.${v.head(3)}") | |
} else if (v.forall(_ (3) == "request")) { | |
List(s"$org.$k.*.request") | |
} else if (v.forall(_ (3) == "response")) { | |
List(s"$org.$k.*.response") | |
} else { | |
v.groupBy(_ (2)).map { | |
case (k1, v1) ⇒ if (v1.size > 1) s"$org.$k.$k1.#" else s"$org.$k.$k1.${v1.head(3)}" | |
} | |
} | |
} ++ other).foreach { topic ⇒ | |
logger.info(s"${Console.GREEN}Binding queue('${Console.RED}$qName${Console.GREEN}') to ${Console.WHITE}: ${Console.BLUE}$topic${Console.RESET}") | |
chan.queueBind(qName, exchange, topic) | |
} | |
chan.basicConsume(qName, false, consumer(chan)) | |
} | |
def uzip(body: Array[Byte], compression: Compression): Throwable \/ Array[Byte] = \/.fromTryCatchNonFatal { | |
compression match { | |
case Gzip ⇒ ungzip(body) | |
case Deflate ⇒ decompress(body) | |
case NoCompression ⇒ body | |
} | |
} | |
private[RabbitMQ] def compress(inData: Array[Byte]): Array[Byte] = { | |
val deflater = new Deflater() | |
deflater.setInput(inData) | |
deflater.finish() | |
val compressedData = new Array[Byte](inData.length * 2) // compressed data can be larger than original data | |
val count: Int = deflater.deflate(compressedData) | |
compressedData.take(count) | |
} | |
private[RabbitMQ] def decompress(inData: Array[Byte]): Array[Byte] = { | |
val inflater = new Inflater() | |
inflater.setInput(inData) | |
val decompressedData = new Array[Byte](inData.length * 2) | |
@tailrec def loop(count: Int, finalData: Array[Byte]): Array[Byte] = { | |
if (count <= 0) { | |
finalData | |
} | |
else { | |
val c = inflater.inflate(decompressedData) | |
loop(c, finalData ++ decompressedData.take(c)) | |
} | |
} | |
val count = inflater.inflate(decompressedData) | |
loop(count, decompressedData.take(count)) | |
} | |
private[RabbitMQ] def gzip(bytes: Array[Byte]): Array[Byte] = { | |
val buffer = new Array[Byte](1024 * 4) | |
val byteIn = new ByteArrayInputStream(bytes) | |
val byteOut = new ByteArrayOutputStream() | |
val gzipOut = new GZIPOutputStream(byteOut) | |
@tailrec def loop(n: Int): Unit = { | |
if (n != -1) { | |
gzipOut.write(buffer, 0, n) | |
loop(byteIn.read(buffer)) | |
} | |
} | |
loop(byteIn.read(buffer)) | |
gzipOut.close() | |
byteOut.toByteArray | |
} | |
private[RabbitMQ] def ungzip(bytes: Array[Byte]): Array[Byte] = { | |
val buffer = new Array[Byte](1024 * 4) | |
val byteIn = new ByteArrayInputStream(bytes) | |
val gzipIn = new GZIPInputStream(byteIn) | |
val byteOut = new ByteArrayOutputStream() | |
@tailrec def loop(n: Int): Unit = { | |
if (n != -1) { | |
byteOut.write(buffer, 0, n) | |
loop(gzipIn.read(buffer)) | |
} | |
} | |
loop(gzipIn.read(buffer)) | |
byteOut.toByteArray | |
} | |
def flatReader[M[_]](a: List[Subscribe[_, _, M]]): SubscribeReader = new SubscribeReader({ s ⇒ | |
val c = s.connection.createChannel | |
val qName = s.props.getProperty("app.name", "NoAppName") | |
c.queueDeclare(qName, true, false, false, null) | |
subscr(a, s, c, qName) | |
}) | |
implicit val subscirberRequests: Subscriber[SubscribeRequest[_, _], Connection] = new Subscriber[SubscribeRequest[_, _], Connection] { | |
override def subscribe(a: List[SubscribeRequest[_, _]]): Reader[Consumer[Connection], Unit] = flatReader(a) | |
} | |
implicit val subscirberFlat: Subscriber[SubscribeFlat[_, _], Connection] = new Subscriber[SubscribeFlat[_, _], Connection] { | |
override def subscribe(a: List[SubscribeFlat[_, _]]): Reader[Consumer[Connection], Unit] = flatReader(a) | |
} | |
implicit val subscirberBroadcasts: Subscriber[SubscribeBroadcast[_, _], Connection] = new Subscriber[SubscribeBroadcast[_, _], Connection] { | |
override def subscribe(a: List[SubscribeBroadcast[_, _]]): Reader[Consumer[Connection], Unit] = new SubscribeReader({ s ⇒ | |
val c = s.connection.createChannel | |
val qName = c.queueDeclare.getQueue | |
subscr(a, s, c, qName) | |
}) | |
} | |
implicit val subscirberResponses: Subscriber[SubscribeResponse[_, _], Connection] = new Subscriber[SubscribeResponse[_, _], Connection] { | |
override def subscribe(a: List[SubscribeResponse[_, _]]): Reader[Consumer[Connection], Unit] = new SubscribeReader({ s ⇒ | |
val c = s.connection.createChannel | |
val qName = c.queueDeclare.getQueue | |
subscr(a, s, c, qName) | |
}) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment