Skip to content

Instantly share code, notes, and snippets.

@serenamm
Last active September 9, 2019 18:41
Show Gist options
  • Save serenamm/c88852815e64154f1df3d066e8bffce5 to your computer and use it in GitHub Desktop.
Save serenamm/c88852815e64154f1df3d066e8bffce5 to your computer and use it in GitHub Desktop.
import pyspark
from pyspark.sql import SparkSession
create_table_query = '''
SELECT
item_id_1,
item_id_2
FROM (
SELECT
item_id_1,
item_id_2,
ROW_NUMBER() OVER(PARTITION BY item_id_1 ORDER BY similarity_score DESC) as row_num
FROM {similarity_table} s
{same_category_q}
)
WHERE row_num <= {num_items}
'''
def create_new_table(spark, table_paths, params, from_date, to_date, same_category_q):
similarity_table = table_paths["product_similarity"]["table"]
created_table = spark.sql(create_table_query.format(similarity_table=similarity_table,
same_category_q=same_category_q,
num_items=params["num_items"]))
# Write table to some path
created_table.coalesce(1).write.save(table_paths["created_table"]["path"],
format="orc", mode="Overwrite")
def make_query(same_category, table_paths):
if same_category is True:
same_category_q = '''
INNER JOIN {product_table} p
ON s.item_id_1 = p.item_id
INNER JOIN {product_table} q
ON s.item_id_2 = q.item_id
WHERE item_id_1 != item_id_2
AND p.category_id = q.category_id
'''.format(product_table=table_paths["product_table"]["table"])
else:
same_category_q = ''
return same_category_q
if __name__ == "__main__":
spark = (SparkSession
.builder
.appName("testing_tutorial")
.enableHiveSupport()
.getOrCreate())
same_category = True # or False
table_paths = foo # Assume paths are in some JSON
params = bar
same_category_q, target_join_q = make_query(same_category, table_paths)
create_new_table(spark, table_paths, params, same_category_q)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment