As of this writing, CockroachDB does not have official support for protobuf payload as part of change data capture. Today, I am going to demonstrate a feature that may or may not land in CockroachDB proper. The product team is looking into viability of the feature before we can officially support it.
- Using CockroachDB CDC with Apache Pulsar
- Using CockroachDB CDC with Azure Event Hubs
- Using CockroachDB CDC with Confluent Cloud Kafka and Schema Registry
- SaaS Galore: Integrating CockroachDB with Confluent Kafka, FiveTran and Snowflake
- CockroachDB CDC using Minio as cloud storage sink
- CockroachDB CDC using Hadoop Ozone S3 Gateway as cloud storage sink
Protocol Buffers are language-neutral, platform-neutral extensible mechanisms for serializing structured data. It's a common choice for platforms needing to pass messages between systems. CockroachDB is a distributed SQL database built on a transactional and strongly-consistent key-value store. It scales horizontally; survives disk, machine, rack, and even datacenter failures with minimal latency disruption and no manual intervention; supports strongly-consistent ACID transactions; and provides a familiar SQL API for structuring, manipulating, and querying data. There is no official support for Protocol Buffers in CockroachDB Changefeeds, even though we use Protocol Buffers extensively in code. A recent customer conversation led to this experiment where I'm going to use several recent features to demonstrate ability to serialize CockroachDB rows to proto and emit via CDC Queries. This is the first time we're looking at CDC Queries. This is a new flexible way to express CockroachDB streams.
This tutorial assumes you have an enterprise license. Given the features in this tutorial are unavailable as a product, you have to follow the steps exactly as described to pull the right source code to make it work. These features are not available in any of the available offerings from Cockroach Labs.
- Build CockroachDB with the Protocol Buffers function
- Deploy a CockroachDB cluster with enterprise changefeeds
- Deploy a Kafka Consumer
- Verify
- Conclusion
Before I show you how to get this working, I'd like to express my gratitude to Yevgeniy Miretskiy, who works on the CDC team for the capability and his mentorship to get this working. The source code for the feature is available in the following commit. For brevity, I will skip the steps to setup a build environment.
Check out the pull request
gh pr checkout 89955
Run the preliminary steps to build Cockroach from source.
./dev doctor
Finally build the code
bazel build pkg/cmd/cockroach-short
Navigate to the directory with the built package
cd _bazel/bin/pkg/cmd/cockroach
Start an instance of CockroachDB using the built package
./cockroach start-single-node --insecure --background
To enable CDC we need to execute the following commands:
SET CLUSTER SETTING cluster.organization = '<organization name>';
SET CLUSTER SETTING enterprise.license = '<secret>';
SET CLUSTER SETTING kv.rangefeed.enabled = true;
Generate sample data
CREATE TABLE office_dogs (
id INT PRIMARY KEY,
name STRING);
INSERT INTO office_dogs VALUES
(1, 'Petee'),
(2, 'Carl');
UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
SELECT * FROM office_dogs;
id | name
-----+----------
1 | Petee H
2 | Carl
The function we are going to use to convert rows to Protocol Buffers is crdb_internal.row_to_proto()
. With the given pull request, this function is readily available for querying.
SELECT crdb_internal.row_to_proto(office_dogs) FROM office_dogs;
crdb_internal.row_to_proto
------------------------------------------------------------------------------
\x0a110a046e616d6512091a07506574656520480a0f0a026964120911000000000000f03f
\x0a0e0a046e616d6512061a044361726c0a0f0a0269641209110000000000000040
It takes the row and serializes as proto. We can decode the row back to human readable form using the following query, skip the \x
and copy the rest of the output into the following function:
SELECT crdb_internal.pb_to_json('google.protobuf.Struct',
decode('0a110a046e616d6512091a07506574656520480a0f0a026964120911000000000000f03f', 'hex')) AS proto;
proto
--------------------------------
{"id": 1, "name": "Petee H"}
SELECT crdb_internal.pb_to_json('google.protobuf.Struct',
decode('0a0e0a046e616d6512061a044361726c0a0f0a0269641209110000000000000040', 'hex')) AS proto;
proto
-----------------------------
{"id": 2, "name": "Carl"}
We can use this function in the CDC query, but first, let's set up a webhook sink for a quick demonstration of changefeed queries.
git clone https://github.com/cockroachlabs/cdc-webhook-sink-test-server.git
cd cdc-webhook-sink-test-server
cd go-https-server
chmod +x server.sh
./server.sh
./server.sh
......+.........+.....+....+...+.....+...+.+.....+++++++++++++++++++++++++++++++++++++++++++++*.+...........+.+......+.....+...+....+...+......+...........+...+......+++++++++++++++++++++++++++++++++++++++++++++*......+.....+.............+...+..+.+.....................+......+..+.+++++
-----
2023/08/30 09:39:05 starting server on port 3000
With all of the basics in place, we can create a changefeed
CREATE CHANGEFEED INTO 'webhook-https://localhost:3000?insecure_tls_skip_verify=true' WITH updated AS SELECT crdb_internal.row_to_proto(office_dogs) AS proto FROM office_dogs;
job_id
----------------------
895654238351589377
Looking at the terminal where the webhook sink is running
2023/08/30 11:11:25 {"payload":[{"__crdb__": {"updated": "1693408285757033000.0000000000"}, "proto": "\\x0a0e0a046e616d6512061a044361726c0a0f0a0269641209110000000000000040"}],"length":1}
2023/08/30 11:11:25 {"payload":[{"__crdb__": {"updated": "1693408285757033000.0000000000"}, "proto": "\\x0a110a046e616d6512091a07506574656520480a0f0a026964120911000000000000f03f"}],"length":1}
Let's update a record in the office_dogs TABLE
UPDATE office_dogs SET name = 'Tarzan' WHERE id = 1;
2023/08/30 11:12:58 {"payload":[{"__crdb__": {"updated": "1693408377084928000.0000000000"}, "proto": "\\x0a100a046e616d6512081a065461727a616e0a0f0a026964120911000000000000f03f"}],"length":1}
If we use the decode function to inspect the payload
SELECT crdb_internal.pb_to_json('google.protobuf.Struct', decode('0a100a046e616d6512081a065461727a616e0a0f0a026964120911000000000000f03f', 'hex')) AS proto;
proto
-------------------------------
{"id": 1, "name": "Tarzan"}
I have to mention that the emitted messages are of dynamically typed format and not strongly typed. If your use case requires strongly typed, it's a conversation we have to have another time.
And this is how you can leverage CockroachDB CDC Queries with built-in functions. This function is not available but it can be, given higher demand. Hopefully you've found this article useful. Please reach out to our teach if you need this capability and we will consider it in the future.