Last active
April 2, 2020 21:12
-
-
Save RajatGoyal/ca835861de42354de0eb7f776ea312ce to your computer and use it in GitHub Desktop.
debezium cdc memsql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import struct | |
import sys | |
import json | |
binary_stdin = sys.stdin if sys.version_info < (3, 0) else sys.stdin.buffer | |
binary_stderr = sys.stderr if sys.version_info < (3, 0) else sys.stderr.buffer | |
binary_stdout = sys.stdout if sys.version_info < (3, 0) else sys.stdout.buffer | |
def input_stream(): | |
""" | |
Consume STDIN and yield each record that is received from MemSQL | |
""" | |
while True: | |
byte_len = binary_stdin.read(8) | |
if len(byte_len) == 8: | |
byte_len = struct.unpack("L", byte_len)[0] | |
result = binary_stdin.read(byte_len) | |
yield result | |
else: | |
assert len(byte_len) == 0, byte_len | |
return | |
def log(message): | |
""" | |
Log an informational message to stderr which will show up in MemSQL in | |
the event of transform failure. | |
""" | |
binary_stderr.write(message + b"\n") | |
def emit(message): | |
""" | |
Emit a record back to MemSQL by writing it to STDOUT. The record | |
should be formatted as JSON, Avro, or CSV as it will be parsed by | |
LOAD DATA. | |
""" | |
binary_stdout.write(message + b"\n") | |
log(b"Begin transform") | |
def send_insert_query(table_name, columns, values_arr): | |
sql = 'INSERT INTO %s ( %s ) VALUES %s;' % (table_name, columns, ','.join(values_arr)) | |
emit({'q': sql}) | |
# We start the transform here by reading from the input_stream() iterator. | |
for data in input_stream(): | |
columns = '' | |
values_arr = [] | |
for item in json.loads(data): | |
if item['op'] == 'd': | |
if values_arr: | |
send_insert_query(item['source']['table'], columns, values_arr) | |
sql = 'delete from ' + item['source']['table'] + ' where ' + item['source']['table'] + '.id = ' + item['before']['id'] + ';' | |
emit({'q': sql}) | |
values_arr = [] | |
else: | |
if not columns: | |
columns = ', '.join("`" + str(x) + "`" for x in item.keys()) | |
values = ', '.join("'" + str(x) + "'" for x in item.values()) | |
values_arr.append("( %s )" % values) | |
if values_arr: | |
send_insert_query(table_name, columns, values_arr) | |
log(b"End transform") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment