Last active
April 11, 2023 07:57
-
-
Save stefanobaghino/853c45f6355ad3a21b1f to your computer and use it in GitHub Desktop.
Pipe a Kafka consumer to a WebSocket on Play! Framework.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka { | |
consumer { | |
group.id = "default_consumer_group" | |
zookeeper.connect = "localhost:2181" | |
auto.offset.reset = "smallest" | |
consumer.timeout.ms = "-1" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package controllers | |
import java.util.Properties | |
import com.typesafe.config.ConfigFactory | |
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, Whitelist} | |
import kafka.serializer.StringDecoder | |
import play.api.libs.iteratee.{Enumerator, Iteratee} | |
import play.api.mvc.{Controller, WebSocket} | |
import scala.collection.JavaConversions._ | |
object KafkaFeed extends Controller { | |
private val kafkaConsumerConfig = | |
new ConsumerConfig(ConfigFactory.load().getConfig("kafka.consumer").entrySet().foldRight(new Properties) { | |
(item, props) => | |
props.setProperty(item.getKey, item.getValue.unwrapped().toString) | |
props | |
}) | |
private def connect(config: ConsumerConfig) = Consumer.create(config) | |
private def consume(topic: String, connection: ConsumerConnector) = | |
connection.createMessageStreamsByFilter(new Whitelist(topic), 1, new StringDecoder, new StringDecoder).headOption.map(_.toStream) | |
def listenTo(topic: String) = WebSocket.using[String] { _ => | |
val connection = connect(kafkaConsumerConfig) | |
var connected = true | |
val endOnDisconnection = Iteratee.foreach[String](println).map { _ => | |
connection.shutdown() | |
connected = false | |
} | |
val pipeFromKafka = consume(topic, connection) | |
.map(Enumerator.unfold(_) { s => if (connected) Some(s.tail, s.head.message()) else None }) | |
.getOrElse(Enumerator.eof[String]) | |
endOnDisconnection -> pipeFromKafka | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment