Events from Kafka topic may be consumed by ClickHouse and stored. This may be useful for making projections from events.
ClickHouse's nodes must be added to cluster to use ReplicatedMergeTree
.
First, create database my_db
.
Check configured clusters:
select * from system.clusters;
Lats deal that it's name is my_cluster
.
ClickHouse must be configured to use schema registry.
In this example we stream some user's balance operations to Kafka topic, that match the
schema userBalanceOperation-value
:
{
"type": "record",
"name": "userBalanceOperation",
"namespace": "some-namespace",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "userId",
"type": "bytes"
},
{
"name": "operationId",
"type": "long"
},
{
"name": "amount",
"type": "long"
},
{
"name": "createdAt",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
Kafka events consumed using special table with Engine=Kafka
:
CREATE TABLE my_db.ke_user_balance_operations ON CLUSTER my_cluster
(
`id` UInt64,
`operationId` UInt32,
`userId` String,
`createdAt` DateTime,
`amount` Int64
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'some-kafka-broker-host:9093',
kafka_topic_list = 'userBalanceOperation',
kafka_group_name = 'clickhouse-userBalanceOperation',
kafka_format = 'AvroConfluent';
Events will be stored to ReplicatedMergeTree
:
CREATE TABLE my_db.tb_user_balance_operations ON CLUSTER my_cluster
(
`id` UInt64,
`operationId` UInt32,
`userId` String,
`createdAt` DateTime,
`amount` Int64
)
ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/tables/my_db.ke_user_balance_operations/shard_{shard}', '{replica}')
ORDER BY (id, createdAt)
SETTINGS index_granularity = 8192;
Consumed events from kafka's my_db.ke_user_balance_operations
stored to replicated table
my_db.tb_user_balance_operations
through materialised view my_db.mv_user_balance_operations
.
CREATE MATERIALIZED VIEW my_db.mv_user_balance_operations ON CLUSTER my_cluster TO my_db.tb_user_balance_operations(
`id` UInt64,
`operationId` UInt32,
`userId` String,
`createdAt` DateTime,
`amount` String
)
AS SELECT `id`, `operationId`, `userId`, `createdAt`, `amount`
FROM my_db.ke_user_balance_operations;
Dropping old tables:
DROP TABLE my_db.mv_user_balance_operations ON CLUSTER my_cluster SYNC;
DROP TABLE my_db.tb_user_balance_operations ON CLUSTER my_cluster SYNC;
DROP TABLE my_db.ke_user_balance_operations ON CLUSTER my_cluster SYNC;
If by some reasons we require reading data from beginning, we need to stop consuming events from Kafka using
DETACH TABLE
, setting offset of consumer to 0 and then reattach table:
DETACH TABLE ke_user_balance_operations on cluster my_cluster;
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server my_csome-kafka-broker-host:9093 \
--command-config command.properties \
--group clickhouse-userBalanceOperation \
--topic userBalanceOperation \
--reset-offsets --to-earliest \
--execute
Config:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="SOME_USER" password="SOME-PASSWORD";
ATTACH TABLE ke_user_balance_operations on cluster my_cluster;
void DB::StorageKafka::threadFunc(size_t): Code: 252, e.displayText() = DB::Exception: Too many partitions for single
INSERT block (more than 100). The limit is controlled by 'max_partitions_per_insert_block' setting. Large number
of partitions is a common misconception. It will lead to severe negative performance impact, including slow server
startup, slow INSERT queries and slow SELECT queries. Recommended total number of partitions for a table is under
1000..10000. Please note, that partitioning is not intended to speed up SELECT queries (ORDER BY key is sufficient
to make range queries fast). Partitions are intended for data manipulation (DROP PARTITION, etc).: while write prefix
to view my_db.mv_user_balance_operations (af6e3c05-163a-4b04-af6e-3c05163aeb04), Stack trace (when copying this
message, always include the lines below)
Get current settings:
select * from system.settings where name ='max_partitions_per_insert_block' \G
Decimal values stored to Kafka as bytes
.
Field declaration in Avro:
"name": "amount",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"logicalType": "decimal"
}
]
ClickHouse's consumer than can not decode this field to ClickHouse's decimal, so currently there is no way to use decimals when consume Kafka events to ClickHouse.
Useful links:
- https://clickhouse.com/docs/en/interfaces/formats/#data_types-matching
- https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/#bytes-decimals-numerics
- https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/index.html#mapping-column-types
- confluentinc/kafka-connect-jdbc#563
- https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/index.html#numeric-mapping-property