Last active
March 16, 2023 19:37
-
-
Save asdaraujo/8366763b4022fe346e989dcfa691abff to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import avro.schema | |
import io | |
import os | |
import requests | |
from avro.io import DatumReader, BinaryDecoder | |
from cachetools import TTLCache | |
from kafka import KafkaConsumer | |
# Kafka broker | |
BROKERS = ['cdp.52.33.201.179.nip.io:9092'] | |
# Kafka topics | |
TOPICS = ['machine-data-nifi'] | |
# Schema Registry base URL | |
SCHEMA_REGISTRY_URL = "http://cdp.52.33.201.179.nip.io:7788" | |
# Indicates if the producer is using a Kafka Avro Schema Registry serializer. | |
PRODUCER_USES_SCHEMA_REGISTRY = True | |
# If using Schema Registry the reader schema file is optional, since the client can | |
# retrieve the schema from the registry. Otherwise it must be provided. | |
# If the file is provided, the reader schema will be used rather than the one from registry. | |
READER_SCHEMA_FILE = "machine.avsc" | |
consumer = KafkaConsumer(bootstrap_servers=BROKERS) | |
consumer.subscribe(TOPICS) | |
class SchemaRegistryClient(object): | |
SCHEMA_METADATA = "/api/v1/schemaregistry/schemasById/{}" | |
SCHEMA_VERSION = "/api/v1/schemaregistry/schemas/{}/versions/{}" | |
SCHEMA_VERSION_BY_ID = "/api/v1/schemaregistry/schemas/versionsById/{}" | |
def __init__(self, url, cache_size=100, ttl_secs=3600): | |
self.base_url = url | |
self.schema_name_cache = TTLCache(maxsize=cache_size, ttl=ttl_secs) | |
self.schema_cache = TTLCache(maxsize=cache_size, ttl=ttl_secs) | |
def get_schema_name(self, metadata_id): | |
if metadata_id not in self.schema_name_cache: | |
r = requests.get(self.base_url + self.SCHEMA_METADATA.format(metadata_id)) | |
if r.status_code != requests.codes.ok: | |
raise RuntimeError("Failed to retrieve schema name (Error: {}, Message: {})".format(r.status_code, r.text)) | |
self.schema_name_cache[metadata_id] = r.json()['schemaMetadata']['name'] | |
return self.schema_name_cache[metadata_id] | |
def get_schema(self, metadata_id=None, version_id=None): | |
if not version_id: | |
version_id = 'latest' | |
if metadata_id: | |
path = self.SCHEMA_VERSION.format(self.get_schema_name(metadata_id), version_id) | |
else: | |
path = self.SCHEMA_VERSION_BY_ID.format(version_id) | |
identifier = (metadata_id, version_id) | |
if identifier not in self.schema_cache: | |
r = requests.get(self.base_url + path) | |
if r.status_code != requests.codes.ok: | |
raise RuntimeError("Failed to retrieve schema (Error: {}, Message: {})".format(r.status_code, r.text)) | |
j = r.json() | |
self.schema_cache[identifier] = avro.schema.parse(j['schemaText']) | |
print("Fetched version {} of schema {} from Schema Registry".format(j['version'], j['name'])) | |
return self.schema_cache[identifier] | |
if os.path.exists(READER_SCHEMA_FILE): | |
print("Using schema from local file {}".format(READER_SCHEMA_FILE)) | |
reader_schema = avro.schema.parse(open(READER_SCHEMA_FILE, "rb").read()) | |
else: | |
reader_schema = None | |
sr = SchemaRegistryClient(SCHEMA_REGISTRY_URL) | |
for message in consumer: | |
schema = reader_schema | |
if PRODUCER_USES_SCHEMA_REGISTRY: | |
protocol_id = message.value[0] | |
metadata_id = None | |
version_id = None | |
if protocol_id == 0: # Confluent protocol | |
metadata_id = int.from_bytes(message.value[1:5], byteorder='big', signed=False) | |
payload = message.value[5:] | |
elif protocol_id == 1: # Schema metadata id and version protocol | |
metadata_id = int.from_bytes(message.value[1:9], byteorder='big', signed=False) | |
version_id = int.from_bytes(message.value[9:13], byteorder='big', signed=False) | |
payload = message.value[13:] | |
elif protocol_id == 2: # Schema version id as long protocol | |
metadata_id = int.from_bytes(message.value[1:9], byteorder='big', signed=False) | |
payload = message.value[9:] | |
elif protocol_id == 3: # Schema version id as int protocol | |
# TODO: Handle the case where schema id is larger than max integer value | |
version_id = int.from_bytes(message.value[1:5], byteorder='big', signed=False) | |
payload = message.value[5:] | |
else: | |
raise RuntimeError("Unimplemented protocol %s. See https://github.com/hortonworks/registry/blob/master/docs/serdes.rst" % (protocol_id,)) | |
if metadata_id or version_id: | |
if not reader_schema: | |
schema = sr.get_schema(metadata_id, version_id) | |
else: | |
payload = message.value | |
bytes_reader = io.BytesIO(payload) | |
decoder = BinaryDecoder(bytes_reader) | |
if schema is None: | |
raise RuntimeError("Schema must not be None") | |
reader = DatumReader(schema) | |
print(reader.read(decoder)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment