Skip to content

Instantly share code, notes, and snippets.

@ilyasahsan123
Last active January 28, 2019 15:11
Show Gist options
  • Save ilyasahsan123/20245c68f75363f1c0d896be8ea948a9 to your computer and use it in GitHub Desktop.
Save ilyasahsan123/20245c68f75363f1c0d896be8ea948a9 to your computer and use it in GitHub Desktop.
import json
from kafka import KafkaConsumer
from google.cloud import bigquery
if __name__ == "__main__":
# bigwuery configuration
bigquery_client = bigquery.Client()
dataset_ref = bigquery_client.dataset('DATASET_NAME')
table_ref = dataset_ref.table('TABLE_NAME')
table = bigquery_client.get_table(table_ref)
# set kafka topic name
consumer = KafkaConsumer('debezium.school.users')
# transformation and send data to bigquery
for message in consumer:
data = json.loads(message.value)
user = data['payload']['after']
stored_data = [
(user['id'], user['firstname'], user['lastname'])
]
bigquery_client.insert_rows(table, stored_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment