Skip to content

Instantly share code, notes, and snippets.

samklr /
Last active December 20, 2018 14:47
Enabling RDS MySQL for Change Data Capture

Do this before you add any data to your instance.

For RDS : Create a Parameter Group with the following parameters changed :
- Set binlog_format = ROW
- Set binlog_row_image = FULL
- Set binlog_rows_query_log_events = ON (1)
- (Optional) Set max_allowed_packet = Max (Increase ...)

Once the instance is created with the above parameter group, you'll need to change retention time of the binlog

jeqo /
Last active July 11, 2018 00:21
Generating Graphviz from Kafka Streams
import org.apache.kafka.streams.TopologyDescription;
public class KafkaStreamsTopologyGraphvizPrinter {
samklr / OffsetMngHbase.scala
Last active March 31, 2020 16:02
Offset Management on HBase
Save offsets for each batch into HBase
def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange],
hbaseTableName:String,batchTime: org.apache.spark.streaming.Time) ={
val hbaseConf = HBaseConfiguration.create()
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf(hbaseTableName))
val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" +String.valueOf(batchTime.milliseconds)
jpallari /
Last active October 17, 2023 13:45
Error handling pitfalls in Scala

Error handling pitfalls in Scala

There are multiple strategies for error handling in Scala.

Errors can be represented as [exceptions][], which is a common way of dealing with errors in languages such as Java. However, exceptions are invisible to the type system, which can make them challenging to deal with. It's easy to leave out the necessary error handling, which can result in unfortunate runtime errors.

elakito / sc_subtest.go
Created December 13, 2016 16:32
sarama_cluster's publisher subscriber samples to test partition assignment
package main
import (
jpallari /
Last active November 9, 2022 18:46
Enforcing invariants in Scala datatypes

Enforcing invariants in Scala datatypes

Scala provides many tools to help us build programs with less runtime errors. Instead of relying on nulls, the recommended practice is to use the Option type. Instead of throwing exceptions, Try and Either types are used for representing potential error scenarios. What’s common with these features is that they’re used for capturing runtime features in the type system, thus lifting the runtime scenario handling to the compilation phase: your program doesn’t compile until you’ve explicitly handled nulls, exceptions, and other runtime features in your code.

In his “Strategic Scala Style” blog post series,

/* Terraform setup to evaluate kafka performances on various aws instances types and ebs sizes */
provider "aws" {
region = "eu-west-1"
variable "ssh_key_name" {
default = "ben@ici"
adamw / windowing.scala
Created August 5, 2016 13:30
Windowing data in Akka
package com.softwaremill.akka
import java.time._
import scala.collection.mutable
import scala.concurrent.Await
longcao / SparkCopyPostgres.scala
Last active September 11, 2024 18:55
COPY Spark DataFrame rows to PostgreSQL (via JDBC)
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.{ DataFrame, Row }
import org.postgresql.copy.CopyManager
import org.postgresql.core.BaseConnection
val jdbcUrl = s"jdbc:postgresql://..." // db credentials elided
val connectionProperties = {
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import sqlContext.implicits._
import org.apache.spark.sql.types.{StructType, StructField, DataType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType, StringType, BinaryType, BooleanType, TimestampType, DateType, ArrayType}
class MinBy(valueType: DataType, minType: DataType) extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(StructField("value", valueType) :: StructField("minCol", minType) :: Nil)
def bufferSchema: StructType = StructType(StructField("value", valueType) :: StructField("minCol", minType) :: Nil)