Skip to content

Instantly share code, notes, and snippets.

@rmoff
Last active July 1, 2019 15:46
Show Gist options
  • Save rmoff/09cd2ac2f7c226ce493e04a3095e05ac to your computer and use it in GitHub Desktop.
Save rmoff/09cd2ac2f7c226ce493e04a3095e05ac to your computer and use it in GitHub Desktop.
Kafka Connect - Single Message Transforms (SMT)
@toritsejuFO
Copy link

toritsejuFO commented Sep 10, 2018

Please, how do I use the suggested RegexRouter to map a topic name in Confluent Kafka to a different index name in Elasticsearch during sinking? According to the docs, the topic.index.map is deprecated, and I don't seem to understand how RegexRouter works.

What I'm trying to do is have Kafka sink messages to Elasticsearch from an existing topic to an index in Elasticsearch once I load the ES sink connector, but with a different name than the topic name. This index doesn't exist and hence will be created.

@WilliamShen2019
Copy link

@toritsejuFO hello friend,have u solved this problem? how to RegexRouter a topic to an existed index

@spati-java
Copy link

spati-java commented Apr 15, 2019

I am also running into the same problem, I tried with below config, when fired a curl command I noticed the , but the index name has not been changed and getting an exception

"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"kafka_(.*)",
"transforms.dropPrefix.replacement":"$1",
"transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.routeTS.topic.format":"${timestamp}",
"transforms.routeTS.timestamp.format":"YYYYMM"

getting the below exception

{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [kafka_v1_TableName], must be lowercase"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment