Created
May 21, 2023 15:24
-
-
Save erikvanoosten/5e9f34d8ff43de32b583c021c858e309 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package zio.kafka.example | |
import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig } | |
import org.apache.kafka.clients.producer.ProducerRecord | |
import zio._ | |
import zio.kafka.consumer.Consumer.AutoOffsetStrategy | |
import zio.kafka.consumer.{ Consumer, ConsumerSettings, OffsetBatch, Subscription } | |
import zio.kafka.producer.{ Producer, ProducerSettings } | |
import zio.kafka.serde.Serde | |
import zio.logging.backend.SLF4J | |
trait Kafka { | |
def bootstrapServers: List[String] | |
def stop(): UIO[Unit] | |
} | |
object Kafka { | |
final case class EmbeddedKafkaService(embeddedK: EmbeddedK) extends Kafka { | |
override def bootstrapServers: List[String] = List(s"localhost:${embeddedK.config.kafkaPort}") | |
override def stop(): UIO[Unit] = ZIO.succeed(embeddedK.stop(true)) | |
} | |
val embedded: ZLayer[Any, Throwable, Kafka] = ZLayer.scoped { | |
implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig( | |
customBrokerProperties = Map( | |
"group.min.session.timeout.ms" -> "500", | |
"group.initial.rebalance.delay.ms" -> "0", | |
"authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer", | |
"super.users" -> "User:ANONYMOUS" | |
) | |
) | |
ZIO.acquireRelease(ZIO.attempt(EmbeddedKafkaService(EmbeddedKafka.start())))(_.stop()) | |
} | |
} | |
object Main extends ZIOAppDefault { | |
/** | |
* See `zio-logging` documentation: https://zio.github.io/zio-logging/docs/overview/overview_slf4j | |
*/ | |
override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] = | |
zio.Runtime.removeDefaultLoggers >>> SLF4J.slf4j | |
private val topic = "test-topic" | |
private val consumerLayer: ZLayer[Kafka, Throwable, Consumer] = | |
ZLayer.scoped { | |
ZIO.serviceWithZIO[Kafka] { kafka => | |
val consumerSettings = | |
ConsumerSettings(kafka.bootstrapServers) | |
.withGroupId("group1") | |
.withOffsetRetrieval(Consumer.OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest)) | |
Consumer.make(consumerSettings) | |
} | |
} | |
private val producerLayer: ZLayer[Kafka, Throwable, Producer] = | |
ZLayer.scoped { | |
ZIO.serviceWithZIO[Kafka] { kafka => | |
val producerSettings = ProducerSettings(kafka.bootstrapServers) | |
Producer.make(producerSettings) | |
} | |
} | |
override def run: ZIO[ZIOAppArgs with Scope, Any, Any] = | |
ZIO.addFinalizer(ZIO.logInfo("Stopping app")) *> | |
( | |
for { | |
_ <- ZIO.logInfo(s"Starting app") | |
_ <- Producer.produceChunk( | |
Chunk.fromIterable(1 to 1000).map(n => new ProducerRecord(topic, n, n.toString)), | |
Serde.int, | |
Serde.string | |
) | |
_ <- Consumer | |
.plainStream(Subscription.topics(topic), Serde.int, Serde.string) | |
.take(100) | |
.groupedWithin(10, 100.millis) | |
.mapZIOPar(2)(c => ZIO.debug(c.size) as c.map(_.offset)) | |
.map(OffsetBatch.apply) | |
.debug("Offset") | |
.mapZIO(_.commit) | |
.debug("Commit") | |
.runDrain zipPar Fiber.dumpAll.delay(20.seconds) | |
_ <- ZIO.logInfo("Ready!") | |
} yield () | |
).provide( | |
Kafka.embedded, | |
consumerLayer, | |
producerLayer | |
) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment