This is a fun experiment using DuckDB to parse CockroachDB Change Data Capture output and querying CockroachDB with DuckDB.
CockroachDB has native support for change data capture. It supports object storage sinks across all major cloud providers. At the time of writing, there are a couple of supported formats available like Avro and newline-delimited json. Up until now I've been avoiding newline-delimited json because I don't find it easy to use. Today, I'd like to look at DuckDB as a viable tool to parse the CDC generated output in newline-delimited format.
- Start a CockroachDB cluster
- Parse CockroachDB newly-delimited changefeed ouptut using DuckDB
- Query CockroachDB tables using DuckDB
- Conclusion
I am using a serverless instance of CockroachDB. It has enterprise change feeds enabled by default. You can sign up for a free instance here.
We're going to follow the example to send sample data to an S3 bucket. DuckDB supports reading from S3 directly but today I'm going to download files to my machine and parse them locally.
I'm using the tpcc workload to generate changefeed data but you can use the example in the doc above.
Initialize
cockroach workload init tpcc \
--warehouses 100 $DATABASE_URL
Execute the workload
cockroach workload run tpcc \
--duration=120m \
--concurrency=3 \
--max-rate=1000 \
--tolerate-errors \
--warehouses=10 \
--conns 60 \
--ramp=1m \
--workers=100 \
$DATABASE_URL
Create a changefeed job
CREATE CHANGEFEED FOR TABLE history INTO 's3://artemawsbucket/tpcc/history?AWS_ACCESS_KEY_ID=<AWS_ACCESS_KEY_ID>&AWS_SECRET_ACCESS_KEY=<AWS_SECRET_ACCESS_KEY>' with updated;
Then, navigate to your S3 bucket and find the files there.
Copy data from S3 to your filesystem
aws s3 cp s3://artemawsbucket/tpcc/history . --recursive
Install duckdb
brew install duckdb
Finally, navigate to the directory with the json files and start duckdb.
duckdb
Looking at the available json functions, the standard json function works
SELECT * FROM read_json_objects('202305161404194891609990000000000-fb5d1ff7b5a47331-2-15-00000000-history-a.ndjson');
│ {"after": {"h_amount": 10.00, "h_c_d_id": 8, "h_c_id": 2404, "h_c_w_id": 1, "h_d_id": 8, "h_data": "9v3L5bOacQHehuVoJHJ2vp… │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 8, "h_c_id": 2431, "h_c_w_id": 1, "h_d_id": 8, "h_data": "ljve8BmeEvbQ5dJWLgvcp"… │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 8, "h_c_id": 2382, "h_c_w_id": 1, "h_d_id": 8, "h_data": "ve8BmeEvbQ5dJWLgvcp", … │
Similarly, there's a newline-delimited function read_ndjson_objects
, this time we're going to use globbing instead of individual file. We're also going to limit the output as my entire dataset is 3 million rows.
SELECT * FROM read_ndjson_objects('*.ndjson') LIMIT 5;
┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ json │
│ json │
├──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1166, "h_c_w_id": 25, "h_d_id": 10, "h_data": "Z5x9v3L5bOacQHehuVo… │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1181, "h_c_w_id": 25, "h_d_id": 10, "h_data": "3L5bOacQHehuVoJHJ2v… │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1171, "h_c_w_id": 25, "h_d_id": 10, "h_data": "L5bOacQHehuVoJHJ2vp… │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1188, "h_c_w_id": 25, "h_d_id": 10, "h_data": "cQHehuVoJHJ2vp", "h… │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1184, "h_c_w_id": 25, "h_d_id": 10, "h_data": "VzccrxcAzZ5x9v3L5b"… │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
We can create a DuckDB table out of the json files
CREATE TABLE history AS SELECT * FROM read_ndjson_objects('*.ndjson');
show tables;
┌─────────┐
│ name │
│ varchar │
├─────────┤
│ history │
└─────────┘
select json as col from history limit 5;
┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ col │
│ json │
├──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1166, "h_c_w_id": 25, "h_d_id": 10, "h_data": "Z5x9v3L5bOacQHehuVo… │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1181, "h_c_w_id": 25, "h_d_id": 10, "h_data": "3L5bOacQHehuVoJHJ2v… │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1171, "h_c_w_id": 25, "h_d_id": 10, "h_data": "L5bOacQHehuVoJHJ2vp… │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1188, "h_c_w_id": 25, "h_d_id": 10, "h_data": "cQHehuVoJHJ2vp", "h… │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1184, "h_c_w_id": 25, "h_d_id": 10, "h_data": "VzccrxcAzZ5x9v3L5b"… │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
We can query the individual columns
select json->'after'->'h_amount' from history limit 1;
┌─────────────────────────────────┐
│ "json" -> 'after' -> 'h_amount' │
│ json │
├─────────────────────────────────┤
│ 10.0 │
└─────────────────────────────────┘
We can cast too
select json->'after'->'h_data', cast (json->'after'->'h_c_id' as integer) as c_id from history where c_id > 2000 limit 5;
┌───────────────────────────────┬───────┐
│ "json" -> 'after' -> 'h_data' │ c_id │
│ json │ int32 │
├───────────────────────────────┼───────┤
│ "7xrljve8BmeEvbQ5dJW" │ 2002 │
│ "AzZ5x9v3L5bOac" │ 2001 │
│ "x9v3L5bOacQHehuVoJ" │ 2024 │
│ "2vp7xrljve8Bme" │ 2006 │
│ "UtEdpJzCGyo91sT" │ 2029 │
└───────────────────────────────┴───────┘
We can use ->>
notation to output values as varchar instead of json
SELECT distinct(cast (json->>'after'->>'h_amount' as float)) FROM history LIMIT 5;
┌──────────────────────────────────────────────────────┐
│ CAST((("json" ->> 'after') ->> 'h_amount') AS FLOAT) │
│ float │
├──────────────────────────────────────────────────────┤
│ 10.0 │
│ 2612.12 │
│ 3986.51 │
│ 2836.18 │
│ 359.5 │
└──────────────────────────────────────────────────────┘
Another useful json function is read_json_auto
. It handles column types implicitly.
SELECT * FROM read_json_auto('*.ndjson');
┌──────────────────────────────────────────────┬──────────────────────────────────────────────┬────────────────────────────────┐
│ after │ key │ updated │
│ struct(h_amount double, h_c_d_id ubigint, … │ json[] │ varchar │
├──────────────────────────────────────────────┼──────────────────────────────────────────────┼────────────────────────────────┤
│ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id… │ [25, "42674618-a16f-4000-8000-0000000bdfb5"] │ 1684245859489160999.0000000000 │
│ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id… │ [25, "426799fb-7793-4c00-8000-0000000bdfc4"] │ 1684245859489160999.0000000000 │
│ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id… │ [25, "4267620e-e8d1-4000-8000-0000000bdfba"] │ 1684245859489160999.0000000000 │
│ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id… │ [25, "4267c121-0eb5-4800-8000-0000000bdfcb"] │ 1684245859489160999.0000000000 │
│ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id… │ [25, "4267aac2-6f34-4400-8000-0000000bdfc7"] │ 1684245859489160999.0000000000 │
We can drill down to the individual array index level
SELECT CAST (key->0 AS INTEGER) AS hkey FROM read_json_auto('*.ndjson') WHERE hkey = 25 LIMIT 5;
┌───────┐
│ hkey │
│ int32 │
├───────┤
│ 25 │
│ 25 │
│ 25 │
│ 25 │
│ 25 │
└───────┘
This has a lot of promise and I will look closely as DuckDB grows in popularity. It will definitely help in analyzing the CDC output.
DuckDB supports querying PostgreSQL directly using the PostgreSQL extension and today I'd like to see if we can do the same accessing CockroachDB.
duckdb
INSTALL postgres_scanner;
LOAD postgres_scanner;
CREATE SCHEMA abc;
CALL postgres_attach('dbname=defaultdb user=artem host=hostname port=26257 password=password' sslmode=verify-full sslrootcert=certlocation, source_schema='public' , sink_schema='abc');
┌─────────┐
│ Success │
│ boolean │
├─────────┤
│ 0 rows │
└─────────┘
SELECT table_schema,table_name,table_type FROM information_schema.tables;
┌──────────────┬──────────────────┬────────────┐
│ table_schema │ table_name │ table_type │
│ varchar │ varchar │ varchar │
├──────────────┼──────────────────┼────────────┤
│ abc │ pgbench_tellers │ VIEW │
│ abc │ pgbench_history │ VIEW │
│ abc │ pgbench_branches │ VIEW │
│ abc │ pgbench_accounts │ VIEW │
│ abc │ example │ VIEW │
└──────────────┴──────────────────┴────────────┘
PRAGMA show_tables;
┌──────────────────┐
│ name │
│ varchar │
├──────────────────┤
│ example │
│ pgbench_accounts │
│ pgbench_branches │
│ pgbench_history │
│ pgbench_tellers │
└──────────────────┘
Query the tables directly, make sure to specify the abc
schema
SELECT * FROM abc.pgbench_history LIMIT 5;
Error: Invalid Error: IO Error: Unable to query Postgres: ERROR: at or near "(": syntax error
DETAIL: source SQL:
COPY (SELECT "tid", "bid", "tbalance", "filler" FROM "public"."pgbench_tellers" WHERE ctid BETWEEN '(0,0)'::tid AND '(4294967295,0)'::tid ) TO STDOUT (FORMAT binary)
^
ERROR: at or near "(": syntax error
DETAIL: source SQL:
COPY (SELECT "tid", "bid", "tbalance", "filler" FROM "public"."pgbench_tellers" WHERE ctid BETWEEN '(0,0)'::tid AND '(4294967295,0)'::tid ) TO STDOUT (FORMAT binary)
This is where it starts to break. The problem stems from DuckDB needing to return the result with FORMAT binary
. In CockroachDB 23.1, COPY command works with text
and csv
format only. I've filed issues 1, 2, 3 to add support for binary
, json
and parquet
.
demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT csv);
1
2
3
4
5
demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT text);
1
2
3
4
5
demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT binary);
ERROR: unimplemented: binary format for COPY TO not implemented
SQLSTATE: 0A000
HINT: You have attempted to use a feature that is not yet implemented.
See: https://go.crdb.dev/issue-v/97180/v23.1
demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT json);
invalid syntax: statement ignored: at or near "json": syntax error: unimplemented: this syntax
SQLSTATE: 0A000
DETAIL: source SQL:
COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT json)
^
HINT: You have attempted to use a feature that is not yet implemented.
See: https://go.crdb.dev/issue-v/96590/v23.1
demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT parquet);
invalid syntax: statement ignored: at or near "parquet": syntax error: unimplemented: this syntax
SQLSTATE: 0A000
DETAIL: source SQL:
COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT parquet)
^
HINT: You have attempted to use a feature that is not yet implemented.
See: https://go.crdb.dev/issue-v/96590/v23.1
Unfortunately, the postgres_scanner does not work with text
or csv
or at least I haven't found a way.
D COPY (SELECT * FROM abc.test) TO STDOUT (FORMAT csv);
Error: Invalid Error: IO Error: Unable to query Postgres: SSL SYSCALL error: EOF detected
SSL SYSCALL error: EOF detected
D COPY (SELECT * FROM abc.test) TO STDOUT (FORMAT text);
Error: Catalog Error: Copy Function with name text does not exist!
Did you mean "parquet"?
D COPY (SELECT * FROM abc.test) TO STDOUT (FORMAT parquet);
Error: Invalid Error: IO Error: Unable to query Postgres: SSL SYSCALL error: EOF detected
SSL SYSCALL error: EOF detected
Your mileage will vary, this was a fun experiment and I will be paying close attention as this project matures.