Skip to content

Instantly share code, notes, and snippets.

@jpzk
Created July 11, 2019 06:26
Show Gist options
  • Save jpzk/a02a21ae7f80c5fdaae9ea2e5fe5b85a to your computer and use it in GitHub Desktop.
Save jpzk/a02a21ae7f80c5fdaae9ea2e5fe5b85a to your computer and use it in GitHub Desktop.
package com.github.simplesteph.udemy.kafka.streams
import java.lang
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
object FavouriteColourAppScala {
def main(args: Array[String]): Unit = {
val config: Properties = new Properties
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "favourite-colour-scala")
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092")
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass)
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0")
val builder: KStreamBuilder = new KStreamBuilder
// Step 1: We create the topic of users keys to colours
val textLines: KStream[String, String] = builder.stream[String, String]("favourite-colour-input")
val usersAndColours: KStream[String, String] = textLines
// 1 - we ensure that a comma is here as we will split on it
.filter((key: String, value: String) => value.contains(","))
// 2 - we select a key that will be the user id (lowercase for safety)
.selectKey[String]((key: String, value: String) => value.split(",")(0).toLowerCase)
// 3 - we get the colour from the value (lowercase for safety)
.mapValues[String]((value: String) => value.split(",")(1).toLowerCase)
// 4 - we filter undesired colours (could be a data sanitization step)
.filter((user: String, colour: String) => List("green", "blue", "red").contains(colour))
val intermediaryTopic = "user-keys-and-colours-scala"
usersAndColours.to(intermediaryTopic)
// step 2 - we read that topic as a KTable so that updates are read correctly
val usersAndColoursTable: KTable[String, String] = builder.table(intermediaryTopic)
// step 3 - we count the occurrences of colours
val favouriteColours: KTable[String, lang.Long] = usersAndColoursTable
// 5 - we group by colour within the KTable
.groupBy((user: String, colour: String) => new KeyValue[String, String](colour, colour))
.count("CountsByColours")
// 6 - we output the results to a Kafka Topic - don't forget the serializers
favouriteColours.to(Serdes.String, Serdes.Long, "favourite-colour-output-scala")
val streams: KafkaStreams = new KafkaStreams(builder, config)
streams.cleanUp()
streams.start()
// print the topology
System.out.println(streams.toString)
// shutdown hook to correctly close the streams application
Runtime.getRuntime.addShutdownHook(new Thread {
override def run(): Unit = {
streams.close()
}
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment