Created
July 26, 2016 13:34
-
-
Save maciekciolek/d1f1a031be4fb1c7eeea645669550449 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package vectos.kafka.akkaimpl.producer | |
import akka.Done | |
import akka.actor._ | |
import akka.pattern.{ask, pipe} | |
import akka.util.Timeout | |
import vectos.kafka.akkaimpl.KafkaConnection | |
import vectos.kafka.types.v0._ | |
import scala.concurrent.duration._ | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.util.{Failure, Success, Try} | |
object ConnectionCoordinator { | |
def props(servers: Seq[BrokerAddress]) = Props(new ConnectionCoordinator(servers)) | |
} | |
final case class Produce(topic: String, partition: Int, records: Seq[KafkaRecord]) { | |
def toKafkaRequest(timeout: Int): KafkaRequest.Produce = { | |
import vectos.kafka.types.v0._ | |
val messages = records.map(r => MessageSetEntry(0, Message(0, 0, r.key.toVector, r.value.toVector))).toVector | |
val topicRequest = ProduceTopicRequest(Some(topic), Vector(ProduceTopicPartitionRequest(partition, messages))) | |
KafkaRequest.Produce(1, timeout, Vector(topicRequest)) | |
} | |
} | |
class ConnectionCoordinator(bootstrapServers: Seq[BrokerAddress]) extends Actor with ActorLogging with Stash { | |
@SuppressWarnings(Array("org.wartremover.warts.Var")) | |
private var brokers = Map.empty[Int, Broker] | |
@SuppressWarnings(Array("org.wartremover.warts.Var")) | |
private var topics = Map.empty[String, TopicStatus] | |
private implicit val ec: ExecutionContext = context.dispatcher | |
private implicit val timeout: Timeout = Timeout(30.seconds) //TODO: This should be configureable | |
override def preStart(): Unit = { | |
val bootstraps = bootstrapServers.map(initializeBrokerConnection) | |
context.become(bootstraping(bootstraps)) | |
} | |
private def bootstraping(connections: Seq[ActorRef]): Receive = { | |
connections.foreach(_ ! KafkaRequest.Metadata(Vector.empty)) | |
{ | |
case _: Produce => stash() | |
case Success(metadata: KafkaResponse.Metadata) => | |
updateMetadata(metadata) | |
connections.foreach(connection => context.stop(connection)) //TODO: Maybe reuse this connections | |
unstashAll() | |
context.become(receive) | |
} | |
} | |
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) | |
private def forwardToBroker(broker: Broker, produce: Produce): Unit = | |
broker.ref | |
.ask(produce.toKafkaRequest(30)) //TODO: This should be configureable | |
.mapTo[Try[KafkaResponse.Produce]] | |
.flatMap { | |
case Success(response) => | |
val errors = response.topics.flatMap(_.partitions.map(_.errorCode).filter(_ != KafkaError.NoError)) | |
if (errors.isEmpty) { | |
Future.successful(Done) | |
} else { | |
Future.failed(new IllegalStateException(s"Errors occurred $errors.")) | |
} | |
case Failure(ex) => Future.failed(ex) | |
} | |
.pipeTo(sender()) | |
override def receive: Receive = { | |
case p @ Produce(topic, partition, _) if topics.contains(topic) => | |
getBrokerFor(topic, partition) match { | |
case Some(broker) => | |
forwardToBroker(broker, p) | |
case None => | |
sender() ! Status.Failure(new IllegalStateException(s"Broker does not exist for topic $topic, partition $partition.")) | |
} | |
case Produce(topic, _, _) => | |
stash() | |
context.become(fetchingMetadata(topic)) | |
case Terminated(brokerRef) => | |
markBrokerAsUnavailable(brokerRef) | |
} | |
private def markBrokerAsUnavailable(brokerRef: ActorRef): Unit = { | |
log.error(s"Broker unavailable $brokerRef") | |
//TODO: Handle this error | |
} | |
private def fetchingMetadata(topic: String): Receive = { | |
log.info(s"Fetching metadata for $topic.") | |
brokers.foreach { | |
case (_, broker) => | |
broker.ref ! KafkaRequest.Metadata(Vector(Some(topic))) | |
} | |
{ | |
case Success(metadata: KafkaResponse.Metadata) => | |
updateMetadata(metadata) | |
unstashAll() | |
context.become(receive) | |
case Terminated(brokerRef) => | |
markBrokerAsUnavailable(brokerRef) | |
case _ => stash() | |
} | |
} | |
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) | |
private def updateMetadata(metadata: KafkaResponse.Metadata) = { | |
val currentBrokersIds = brokers.keys.toSet | |
val newBrokers = metadata.brokers.filterNot(b => currentBrokersIds.contains(b.nodeId)) | |
brokers = brokers ++ newBrokers.map { | |
case m => | |
//TODO: MetadataBrokerResponse`s host should not be an option | |
val host = m.host.getOrElse(throw new IllegalStateException("Cannot resolve host")) | |
val address = BrokerAddress(host, m.port) | |
val connection = initializeBrokerConnection(address) | |
context.watch(connection) | |
m.nodeId -> Broker(m.nodeId, address, connection) | |
}.toMap | |
topics = topics ++ metadata.topicMetadata | |
.map(t => { | |
//TODO: MetadataTopicMetadataResponse`s topic should not be an option | |
val name = t.name.getOrElse(throw new IllegalStateException("Cannot resolve topic name")) | |
name -> TopicStatus(t.partitionMetaData.map(p => p.id -> p.leader).toMap) | |
}) | |
.filter { case (_, status) => status.partitionToBroker.nonEmpty } | |
.toMap | |
} | |
private def initializeBrokerConnection(address: BrokerAddress) = { | |
val props = KafkaConnection.props(KafkaConnection.Settings(address.host, address.port, 100)) | |
context.actorOf(props) | |
} | |
private def getBrokerFor(topic: String, partition: Int): Option[Broker] = | |
for { | |
t <- topics.get(topic) | |
p <- t.partitionToBroker.get(partition) | |
broker <- brokers.get(p) | |
} yield broker | |
override def unhandled(message: Any): Unit = { | |
super.unhandled(message) | |
log.error(s"Unhandled message $message.") | |
} | |
private case class Broker(id: Int, address: BrokerAddress, ref: ActorRef) | |
private case class TopicStatus(partitionToBroker: Map[Int, Int]) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package vectos.kafka.akkaimpl.producer | |
import akka.Done | |
import akka.actor.ActorSystem | |
import akka.pattern.ask | |
import akka.util.Timeout | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.concurrent.duration._ | |
final case class KafkaRecord(key: Array[Byte], value: Array[Byte]) | |
object KafkaRecord { | |
@SuppressWarnings(Array("org.wartremover.warts.Overloading")) | |
def apply(key: String, value: String): KafkaRecord = | |
KafkaRecord(key.getBytes("UTF-8"), value.getBytes("UTF-8")) | |
} | |
final case class ProducerSettings(bootstrapServer: Seq[BrokerAddress], timeout: Duration) | |
final case class BrokerAddress(host: String, port: Int) | |
class KafkaProducer(settings: ProducerSettings)(implicit system: ActorSystem) { | |
private val coordinator = system.actorOf(ConnectionCoordinator.props(settings.bootstrapServer)) | |
//TODO: Decide if timeout of ask should be the same as request timeout | |
private implicit val timeout: Timeout = Timeout(30.seconds) | |
private implicit val ec: ExecutionContext = system.dispatcher | |
def send(topic: String, partition: Int, record: KafkaRecord): Future[Done] = ??? | |
def sendBatch(topic: String, partition: Int, records: Vector[KafkaRecord]): Future[Done] = { | |
coordinator.ask(Produce(topic, partition, records)).mapTo[Done] | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment