Skip to content

Instantly share code, notes, and snippets.

@dbist
Last active September 10, 2024 14:13
Show Gist options
  • Save dbist/3991dd83144eb20d99c6faf815a0ca94 to your computer and use it in GitHub Desktop.
Save dbist/3991dd83144eb20d99c6faf815a0ca94 to your computer and use it in GitHub Desktop.

Exploring Real-Time Data Ingestion into Snowflake Using CockroachDB, Redpanda, and Kafka Connect


This article serves as a follow-up to previous discussions on data ingestion into Snowflake, specifically focusing on the complexities and efforts involved in integrating Change Data Capture (CDC) data with Snowflake in real time.


Previous Articles on Snowflake

Previous Articles on CockroachDB CDC


Motivation

This article builds upon the previous discussion in Tour of Snowflake ingestion using CockroachDB and Redpanda Connect, where we investigated the process of streaming changefeeds from CockroachDB to Snowflake using Redpanda Connect and Snowpipe in batch mode. Here, we will shift our focus to Kafka Connect and demonstrate how both batch and streaming modes can be utilized for data ingestion into Snowflake.

Overview

  • Deploy a CockroachDB cluster with enterprise changefeeds
  • Deploy Snowflake
  • Deploy Kafka Connect
  • Verify
  • Conclusion

Detailed Instructions

Deploy a CockroachDB cluster with Enterprise Changefeeds

Start by either launching a CockroachDB instance or utilizing a managed service.

To enable CDC, execute the following commands:

SET CLUSTER SETTING cluster.organization = '<organization name>';
SET CLUSTER SETTING enterprise.license = '<secret>';
SET CLUSTER SETTING kv.rangefeed.enabled = true;

Verify that changefeeds are enabled:

SHOW CLUSTER SETTING kv.rangefeed.enabled;

If the value is false, update it to true.

Create a source table:

CREATE TABLE cockroachdb (
     id INT PRIMARY KEY,
     value STRING DEFAULT md5(random()::text),
     created_at TIMESTAMPTZ DEFAULT now(),
     updated_at TIMESTAMPTZ DEFAULT NULL);

Insert random data:

INSERT INTO cockroachdb SELECT
   (generate_series(1, 10000));

Update a row:

UPDATE cockroachdb SET value = 'UPDATED', updated_at = now() WHERE id = 1;

Create a changefeed job pointing to a local instance of Redpanda:

CREATE CHANGEFEED FOR TABLE cockroachdb INTO 'kafka://redpanda:29092';

Inspect the data:

SELECT * FROM cockroachdb LIMIT 5;
  id |              value               |          created_at           |          updated_at
-----+----------------------------------+-------------------------------+--------------------------------
   1 | UPDATED                          | 2024-09-09 13:17:57.837984+00 | 2024-09-09 13:17:57.917108+00
   2 | 27a41183599c44251506e2971ba78426 | 2024-09-09 13:17:57.837984+00 | NULL
   3 | 3bf8bc26a750a15691ec4d7ddbb7f5e5 | 2024-09-09 13:17:57.837984+00 | NULL
   4 | b8c5786e8651ddfb3a68eabeadb52f2e | 2024-09-09 13:17:57.837984+00 | NULL
   5 | 3a24df165773639ce89d0d877e7103b7 | 2024-09-09 13:17:57.837984+00 | NULL
(5 rows)

The next step is to setup the Snowflake Kafka connector

Deploy Snowflake

Create a database and schema for outputting changefeed data:

USE ROLE SYSADMIN;
CREATE OR REPLACE DATABASE KAFKADB;
CREATE OR REPLACE SCHEMA kafka_schema;

Follow the Snowflake documentation to configure the Kafka connector.

Create the necessary tables:

create or replace table kafkatb_batch(
    RECORD_METADATA VARIANT,
    RECORD_CONTENT VARIANT
);

create or replace table kafkatb_streaming(
    RECORD_METADATA VARIANT,
    RECORD_CONTENT VARIANT
);

Set up roles and permissions:

-- Use a role that can create and manage roles and privileges.
USE ROLE securityadmin;

-- Create a Snowflake role with the privileges to work with the connector.
CREATE OR REPLACE ROLE kafka_connector_role_1;

-- Grant privileges on the database.
GRANT USAGE ON DATABASE kafkadb TO ROLE kafka_connector_role_1;

-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will load data into an existing table.
GRANT OWNERSHIP ON TABLE KAFKADB.KAFKA_SCHEMA.kafkatb_batch TO ROLE kafka_connector_role_1;
GRANT OWNERSHIP ON TABLE KAFKADB.KAFKA_SCHEMA.kafkatb_streaming TO ROLE kafka_connector_role_1;

-- Grant the custom role to an existing user.
GRANT ROLE kafka_connector_role_1 TO USER username;

-- Set the custom role as the default role for the user.
-- If you encounter an 'Insufficient privileges' error, verify the role that has the OWNERSHIP privilege on the user.
ALTER USER username SET DEFAULT_ROLE = kafka_connector_role_1;

Ensure you follow the documentation for setting up key pair authentication for the Snowflake Kafka connector.

Deploy Kafka Connect

Run Redpanda using Docker Compose.

docker compose -f compose-redpandadata.yaml up -d

Once up, navigate to the Redpanda Console.

topics

Click into the cockroachdb topic:

inspect a topic

Install the Snowflake Kafka connector:

confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:latest

Use the following configuration for Kafka Connect in distributed mode, saved as connect-distributed.properties:

bootstrap.servers=172.18.0.3:29092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/share/confluent-hub-components,plugin.path=/usr/local/share/kafka/plugins,/usr/share/filestream-connectors

Deploy Kafka Connect in distributed mode:

./kafka-connect/bin/connect-distributed.sh connect-distributed.properties

Register the Snowflake connector with the following configuration, saved as snowflake-sink-batch.json:

{
    "name":"snowflake-sink-batch",
    "config":{
      "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
      "tasks.max":"8",
      "topics":"cockroachdb",
      "snowflake.topic2table.map": "cockroachdb:kafkatb_batch",
      "buffer.count.records":"10000",
      "buffer.flush.time":"60",
      "buffer.size.bytes":"5000000",
      "snowflake.url.name":"account-name:443",
      "snowflake.user.name":"username",
      "snowflake.private.key":"private-key",
      "snowflake.private.key.passphrase":"",
      "snowflake.database.name":"kafkadb",
      "snowflake.schema.name":"kafka_schema",
      "snowflake.role.name":"kafka_connector_role_1",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
    }
  }

Publish the connector configuration:

curl -d @"snowflake-sink-batch.json" -H "Content-Type: application/json" -X POST http://kafka-connect:8083/connectors

Verify the connector in the Kafka Connect UI and in the Kafka Connect section of the Redpanda Console.

snowflake sink

If you click on the snowflake-sink-batch sink, you can see additional information

snowflake sink detailed view

The comprehensive steps needed to set this up are thoroughly outlined in the tutorial.

Data will now flow into Snowflake in batch mode, with updates occurring every 60 seconds as determined by the buffer.flush.time parameter.

You can now query the data in Snowflake:

select * from kafkatb_batch limit 5;

If everything is configured correctly, the data from CockroachDB should be available in Snowflake in real-time or in batches, depending on your configuration.

record_metadata:

{
  "CreateTime": 1725887877966,
  "key": "[3]",
  "offset": 30007,
  "partition": 0,
  "topic": "cockroachdb"
}

record_content:

{
  "after": {
    "created_at": "2024-09-09T13:17:57.837984Z",
    "id": 1,
    "updated_at": "2024-09-09T13:17:57.917108Z",
    "value": "UPDATED"
  }
}

The next step is to configure the connector in streaming mode. First, stop the current connector with the following command:

curl -X DELETE http://localhost:8083/connectors/snowflake-sink-batch

The updated connector configuration will appear as follows:

{
    "name":"snowflake-sink-streaming",
    "config":{
      "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
      "tasks.max":"8",
      "topics":"cockroachdb",
      "snowflake.topic2table.map": "cockroachdb:kafkatb_streaming",
      "buffer.count.records":"10000",
      "buffer.flush.time":"10",
      "buffer.size.bytes":"5000000",
      "snowflake.url.name":"<snowflake-account>:443",
      "snowflake.user.name":"username",
      "snowflake.private.key":"private-key",
      "snowflake.private.key.passphrase":"",
      "snowflake.database.name":"kafkadb",
      "snowflake.schema.name":"kafka_schema",
      "snowflake.role.name":"kafka_connector_role_1",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable":"false",
      "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
      "errors.log.enable":"true",
      "schemas.enable":"false"

    }
  }

Take note of the snowflake.ingestion.method parameter. This feature removes the need to wait 60 seconds to push data to Snowflake, allowing us to reduce the buffer.flush.time to 10 seconds.

To deploy the connector, use the following command:

curl -d @"snowflake-sink-streaming.json" -H "Content-Type: application/json" -X POST http://kafka-connect:8083/connectors

Shortly after deployment, the data will be available in the Snowflake table.

The previous examples demonstrated how data was ingested into predefined Snowflake tables. The following method will automatically infer the schema from the Kafka messages:

  {
    "name":"snowflake-sink-streaming-schematized",
    "config":{
      "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
      "tasks.max":"8",
      "topics":"cockroachdb",
      "snowflake.topic2table.map": "cockroachdb:kafkatb_streaming_schematized",
      "buffer.count.records":"10000",
      "buffer.flush.time":"10",
      "buffer.size.bytes":"5000000",
      "snowflake.url.name":"<snowflake-account>:443",
      "snowflake.user.name":"username",
      "snowflake.private.key":"private-key",
      "snowflake.private.key.passphrase":"",
      "snowflake.database.name":"kafkadb",
      "snowflake.schema.name":"kafka_schema",
      "snowflake.role.name":"kafka_connector_role_1",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable":"false",
      "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
      "errors.log.enable":"true",
      "schemas.enable":"false",
      "snowflake.enable.schematization": "TRUE"
    }
  }

Save this as snowflake-sink-streaming-schematized.json and deploy it using:

curl -d @"snowflake-sink-streaming-schematized.json" -H "Content-Type: application/json" -X POST http://kafka-connect:8083/connectors

Upon deployment, a new table will be created in Snowflake with the following schema:

create or replace TABLE KAFKADB.KAFKA_SCHEMA.KAFKATB_STREAMING_SCHEMATIZED (
	RECORD_METADATA VARIANT COMMENT 'created by automatic table creation from Snowflake Kafka Connector',
	AFTER VARIANT COMMENT 'column created by schema evolution from Snowflake Kafka Connector'
);

To inspect the table, use the following query:

SELECT after AS record FROM kafkatb_streaming_schematized LIMIT 5;

Sample result:

{
  "created_at": "2024-09-09T16:39:34.993226Z",
  "id": 18712,
  "updated_at": null,
  "value": "0d6bd8a4a790aab95c97a084d17bd820"
}

Verify

We can flatten the data for easier manipulation using the following query:

USE ROLE securityadmin;
GRANT CREATE VIEW ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;

USE ROLE kafka_connector_role_1;
USE DATABASE KAFKADB;
USE SCHEMA KAFKA_SCHEMA;
CREATE VIEW v_kafkatb_batch_flattened AS
    SELECT PARSE_JSON(record_content:after):id AS ID,
        PARSE_JSON(record_content:after):value AS VALUE,
        PARSE_JSON(record_content:after):created_at AS CREATED_AT,
        PARSE_JSON(record_content:after):updated_at AS UPDATED_AT
    FROM kafkatb_batch;

SELECT * FROM v_kafkatb_batch_flattened limit 1;
ID	VALUE	    CREATED_AT	                  UPDATED_AT
1   "UPDATED"	"2024-09-09T13:17:57.837984Z"	"2024-09-09T13:17:57.917108Z"

Alternatively, for the schematized table, the view creation statement would be:

CREATE VIEW v_kafkatb_streaming_schematized_flattened AS
    SELECT PARSE_JSON(after):id AS ID,
        PARSE_JSON(after):value AS VALUE,
        PARSE_JSON(after):created_at AS CREATED_AT,
        PARSE_JSON(after):updated_at AS UPDATED_AT
    FROM kafkatb_streaming_schematized;

To verify the data flow, make an update in CockroachDB and check for the changes in Snowflake:

UPDATE cockroachdb 
  SET value = 'UPDATED', updated_at = now() 
WHERE  
  id = 20000; 

In Snowflake, execute the following query to confirm the update:

SELECT * FROM v_kafkatb_streaming_schematized_flattened where VALUE = 'UPDATED';

Sample result:

ID	VALUE		CREATED_AT	                  UPDATED_AT
20000	"UPDATED"	"2024-09-09T18:15:13.460078Z"	"2024-09-09T18:16:56.550778Z"
19999	"UPDATED"	"2024-09-09T18:15:13.460078Z"	"2024-09-09T18:15:27.365272Z"

The architectural diagram is below:

architecture

Conclusion

In this process, we explored Kafka Connect as a solution to stream changefeeds into Snowflake. This approach provides greater control over how messages are delivered to Snowflake, leveraging the Snowflake Kafka Connector with Snowpipe Streaming for real-time, reliable data ingestion.


References

How to Integrate Existing Systems with Kafka Connect
Streaming data to Snowflake with Kafka Connect and Redpanda
Red Panda Compose

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment