Skip to content

Instantly share code, notes, and snippets.

@rssanders3
Last active November 19, 2022 03:29
Show Gist options
  • Save rssanders3/4bcb1f36e5a6a2dc007e6dcd98e165af to your computer and use it in GitHub Desktop.
Save rssanders3/4bcb1f36e5a6a2dc007e6dcd98e165af to your computer and use it in GitHub Desktop.
# Setup Test Data
df = spark.read.format("csv").option("header", "true").load(f"s3://{AWS_BUCKET}/data/input_data.csv")
df.registerTempTable("input_data")
df = spark.read.format("csv").option("header", "true").load(f"s3://{AWS_BUCKET}/data/input_data_updates.csv")
df.registerTempTable("input_data_updates")
## CREATE TABLE
sql_stmnt = f"""
CREATE OR REPLACE TABLE iceberg_catalog.{GLUE_DB_NAME}.iceberg_data_1
USING iceberg
TBLPROPERTIES ('table_type'='ICEBERG', 'format-version'='2')
LOCATION 's3://{AWS_BUCKET}/data/{GLUE_DB_NAME}/iceberg_data_1'
AS SELECT * FROM input_data
"""
spark.sql(sql_stmnt).show()
# OR
sql_stmnt = f"""
CREATE TABLE IF NOT EXISTS iceberg_catalog.{GLUE_DB_NAME}.iceberg_data_2 (
id string,
name string
)
USING iceberg
TBLPROPERTIES ('table_type'='ICEBERG', 'format-version'='2')
LOCATION 's3://{AWS_BUCKET}/data/{GLUE_DB_NAME}/iceberg_data_2'
"""
print(f"Executing SparkSQL:\n`{sql_stmnt}`")
spark.sql(sql_stmnt).show()
## INSERT
sql_stmnt = f"""
INSERT OVERWRITE TABLE iceberg_catalog.{GLUE_DB_NAME}.iceberg_data_2 SELECT * FROM input_data
"""
print(f"Executing SparkSQL:\n`{sql_stmnt}`")
spark.sql(sql_stmnt).show()
# OR
sql_stmnt = f"""
INSERT INTO iceberg_catalog.{GLUE_DB_NAME}.iceberg_data_2 VALUES ('5', 'Ram')
"""
print(f"Executing SparkSQL:\n`{sql_stmnt}`")
spark.sql(sql_stmnt).show()
## UPSERT
sql_stmnt = f"""
MERGE INTO iceberg_catalog.{GLUE_DB_NAME}.iceberg_data_2 AS data
USING input_data_updates AS updates
ON data.id = updates.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
"""
print(f"Executing SparkSQL:\n`{sql_stmnt}`")
spark.sql(sql_stmnt).show(
## UPDATE
sql_stmnt = f"""
UPDATE iceberg_catalog.{GLUE_DB_NAME}.iceberg_data_2 SET name = "who?" WHERE id = '1'
"""
print(f"Executing SparkSQL:\n`{sql_stmnt}`")
spark.sql(sql_stmnt).show()
## DELETE
sql_stmnt = f"""
DELETE FROM iceberg_catalog.{GLUE_DB_NAME}.iceberg_data_2 WHERE id = "1"
"""
print(f"Executing SparkSQL:\n`{sql_stmnt}`")
spark.sql(sql_stmnt).show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment