Last active
January 7, 2021 23:29
-
-
Save rnbtechnology/c04904dd73cb9e88ac31a8708f9e33c4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkContext | |
from pyspark.sql.column import Column, _to_java_column | |
from pyspark.sql.functions import col | |
# instantiates a Scala Map containing configurations for communicating with Schema Regsitry APIs | |
def get_schema_registry_conf_map(spark, schema_registry_url, topic_name): | |
sc = spark.SparkContext | |
jvm_gateway = sc._gateway.jvm | |
schema_registry_config_dict = { | |
"schema.registry.url": schema_registry_url, | |
"schema.registry.topic": topic_name, | |
"value.schema.id": "latest", | |
"value.schema.naming.strategy": "topic.name" | |
} | |
conf_map = getattr( | |
getattr(jvm_gateway.scala.collection.immutable.Map, "EmptyMap$"), "MODULE$" | |
) | |
for k, v in schema_registry_config_dict.items(): | |
conf_map = getattr(conf_map, "$plus")(jvm_gateway.scala.Tuple2(k, v)) | |
return conf_map | |
# returns deserialized column (using Schema Registry) | |
def from_avro(col, conf_map): | |
jvm_gateway = SparkContext._active_spark_context._gateway.jvm | |
abris_avro = jvm_gateway.za.co.absa.abris.avro | |
return Column( | |
abris_avro.functions.from_confluent_avro(_to_java_column(col), conf_map) | |
) | |
TOPIC_NAME = "topic_name" | |
SCHEMA_REGISTRY_URL = "host1:port1,host2:port2" | |
# instantiate Scala Map for communicating with Schema Regsitry APIs | |
conf_map = get_schema_registry_conf_map(spark, SCHEMA_REGISTRY_URL, TOPIC_NAME) | |
# deserialize column containing data (using Schema Registry) and select pertinent columns for processing and | |
deserialized_df = df.select( | |
col("key").cast("string"), | |
col("partition"), | |
col("offset"), | |
col("timestamp"), | |
col("timestampType"), | |
from_avro(df.value, conf_map).alias("value") | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment