Skip to content

Instantly share code, notes, and snippets.

@davideicardi
Last active June 22, 2020 04:39
Show Gist options
  • Save davideicardi/ff914afe01cd8db97bae16a533d21fc8 to your computer and use it in GitHub Desktop.
Save davideicardi/ff914afe01cd8db97bae16a533d21fc8 to your computer and use it in GitHub Desktop.
Alpakka Kafka connector (akka-stream-kafka) example. Produce and consumer kafka messages using Akka Stream.

Alpakka Kafka connector (akka-stream-kafka) example

Simple solution to use Alpakka Kafka connector to produce and consume kafka messages.

I assume that you have 2 scala apps, a producer and a consumer.

Producer

Add the following dependencies:

  • "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"

Create an application.conf:

# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.producer {
  # Tuning parameter of how many sends that can run in parallel.
  # parallelism = 100

  # How long to wait for `KafkaProducer.close`
  # close-timeout = 60s

  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the producer stages. Some blocking may occur.
  # When this value is empty, the dispatcher configured for the stream
  # will be used.
  # use-dispatcher = "akka.kafka.default-dispatcher"

  # The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
  # eos-commit-interval = 100ms

  # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
  # can be defined in this configuration section.
  kafka-clients {
    bootstrap.servers = "quickstart.cloudera:9092"
  }
}

Create a producer.scala:

package fakeProducer

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}

object App {
  def main(args: Array[String]): Unit = {
    println("Hello from producer")

    implicit val system: ActorSystem = ActorSystem("producer-sample")
    implicit val materializer: Materializer = ActorMaterializer()

    val producerSettings =
      ProducerSettings(system, new StringSerializer, new StringSerializer)

    val done: Future[Done] =
      Source(1 to 100)
        .map(value => new ProducerRecord[String, String]("test-topic", "msg " + value))
        .runWith(Producer.plainSink(producerSettings))

    implicit val ec: ExecutionContextExecutor = system.dispatcher
    done onComplete  {
      case Success(_) => println("Done"); system.terminate()
      case Failure(err) => println(err.toString); system.terminate()
    }
  }
}

Consumer

Add the following dependencies:

  • "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"

Create an application.conf file:

# Properties for akka.kafka.ConsumerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.consumer {
  # Tuning property of scheduled polls.
  # poll-interval = 50ms

  # Tuning property of the `KafkaConsumer.poll` parameter.
  # Note that non-zero value means that the thread that
  # is executing the stage will be blocked.
  # poll-timeout = 50ms

  # The stage will await outstanding offset commit requests before
  # shutting down, but if that takes longer than this timeout it will
  # stop forcefully.
  # stop-timeout = 30s

  # How long to wait for `KafkaConsumer.close`
  # close-timeout = 20s

  # If offset commit requests are not completed within this timeout
  # the returned Future is completed `CommitTimeoutException`.
  # commit-timeout = 15s

  # If commits take longer than this time a warning is logged
  # commit-time-warning = 1s

  # If for any reason `KafkaConsumer.poll` blocks for longer than the configured
  # poll-timeout then it is forcefully woken up with `KafkaConsumer.wakeup`.
  # The KafkaConsumerActor will throw
  # `org.apache.kafka.common.errors.WakeupException` which will be ignored
  # until `max-wakeups` limit gets exceeded.
  # wakeup-timeout = 60s

  # After exceeding maxinum wakeups the consumer will stop and the stage and fail.
  # Setting it to 0 will let it ignore the wakeups and try to get the polling done forever.
  # max-wakeups = 10

  # If set to a finite duration, the consumer will re-send the last committed offsets periodically
  # for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
  # commit-refresh-interval = infinite

  # If enabled, log stack traces before waking up the KafkaConsumer to give
  # some indication why the KafkaConsumer is not honouring the `poll-timeout`
  #wakeup-debug = true

  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the KafkaConsumerActor. Some blocking may occur.
  # use-dispatcher = "akka.kafka.default-dispatcher"

  # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
  # can be defined in this configuration section.
  kafka-clients {
    # auto-commit disabled by default
    # Setting enable.auto.commit means that offsets are committed automatically 
    #  with a frequency controlled by the config auto.commit.interval.ms.
    enable.auto.commit = true

    bootstrap.servers = "quickstart.cloudera:9092"
    group.id = "test-group1"

    auto.offset.reset = "earliest"
  }

  # Time to wait for pending requests when a partition is closed
  # wait-close-partition = 500ms
}

Create a consumer.scala file:

package fakeConsumer

import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Sink
import akka.stream.{ActorMaterializer, Materializer}
import org.apache.kafka.common.serialization.StringDeserializer

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

object App {
  def main(args: Array[String]): Unit = {
    println("Hello from hBaseWriter")

    implicit val system: ActorSystem = ActorSystem("consumer-sample")
    implicit val materializer: Materializer = ActorMaterializer()

    val consumerSettings =
      ConsumerSettings(system, new StringDeserializer, new StringDeserializer)

    val done = Consumer
      .plainSource(consumerSettings, Subscriptions.topics("test-topic"))
      .runWith(Sink.foreach(println)) // just print each message for debugging

    implicit val ec: ExecutionContextExecutor = system.dispatcher
    done onComplete  {
      case Success(_) => println("Done"); system.terminate()
      case Failure(err) => println(err.toString); system.terminate()
    }
  }
}

To correctly handle Ctrl+C or if your app is hosted inside a container and you want to gracefully shutdown you can use the following code:

// retrieve the control object
val control = val done = Consumer
  .plainSource(consumerSettings, Subscriptions.topics("test-topic"))
  .toMat(Sink.foreach(println))(Keep.both)
  .mapMaterializedValue(DrainingControl.apply)
  .run()

// Correctly handle Ctrl+C and docker container stop
sys.addShutdownHook({
  println("Shutdown requested...")

  val done = control.shutdown()

  implicit val ec: ExecutionContextExecutor = system.dispatcher
  done
    .onComplete {
      case Success(_) => logger.info("Exiting ...")
      case Failure(err) => logger.error("Error", err)
    }
})

See also

@maxstreese
Copy link

Thanks for the example! I do have one problem though: If the producer fails to construct (because of bogus bootstrap server configuration for example) the call to runWith blocks. Any ideas?

@maxstreese
Copy link

FYI: I have created an issue regarding this in the Alpakka Kafka repository: akka/alpakka-kafka#770. In case I/we are missing something obvious do let me know! :)

@davideicardi
Copy link
Author

@maxstreese Sorry for late response ... Recently I have used the following code:

  def main(args: Array[String]): Unit = {
    log.info(s"Starting up $APP_NAME...")

    try {
      // start akka-stream
      val streamFuture = streamGraph.runStream() // here I run the stream ...
      log.info("Listening for incoming messages...")

      // This will block until the stream fails
      Await.result(streamFuture, Duration.Inf)
    } catch {
      case e: Exception => log.error(s"Failed to run input message stream: ${e.getMessage}", e)
    }

    log.info(s"Exiting $APP_NAME ...")

    log.info(s"Closing resources ...")

    try {
      disposeResources()

      materializer.shutdown()
      actorSystem.terminate()
    } finally {
      System.exit(1)
    }
  }

Not sure if this is the right solution but it seems to work correctly.

@vinyoliver
Copy link

Tks for sharing! By any chance do you have some code example on how to write a functional test for this? I'm struggling to create a functional test for my consumer =/

@davideicardi
Copy link
Author

davideicardi commented Apr 9, 2020

@vinyoliver

Tks for sharing! By any chance do you have some code example on how to write a functional test for this? I'm struggling to create a functional test for my consumer =/

No, sorry...

@mkargal
Copy link

mkargal commented Jun 22, 2020

Hi,
Do you have any samples of error handling implemented for this?. i.e, in case Kafka broker shuts down for some reason, producer fails and terminate the stream. but the elements it was producing is also lost. i don't see any way of getting those elements and recreate for retry. any thoughts?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment