Created
April 21, 2020 14:59
-
-
Save filipelenfers/9e239699bad7408bc84aef1d49cdf867 to your computer and use it in GitHub Desktop.
Remove all partitions from a Glue Table and add again (sometimes solves the HIVE_PARTITION_SCHEMA_MISMATCH in Athena)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# !!!THIS SCRIPT ASSUME HIVE COMPATIBLE PARTITIONS!!! | |
# | |
# When you get the following error: | |
# HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. The types are incompatible and cannot be coerced. | |
# what happened is that some column type changed in the table, and partitions have they own version of the schema, | |
# and the version that partitions have cached are not compatible with the table actual version on glue. | |
# We need to: | |
# 1. remove partitions | |
# 2. add partitions again | |
# | |
# !!!THIS SCRIPT ASSUME HIVE COMPATIBLE PARTITIONS!!! | |
import boto3 | |
import time | |
#Config | |
PROFILE = 'dev' | |
DATABASE_NAME = 'default' | |
TABLE_NAME='my_table' | |
BATCH=25 | |
ATHENA_WORKGROUP = 'primary' | |
ATHENA_OUTPUT_LOCATION = 's3://my-result-bucket' | |
session = boto3.Session(profile_name=PROFILE) | |
glue_client = session.client("glue") | |
athena_client = session.client("athena") | |
# 1. get all partitions | |
to_delete = [] | |
next_token = None | |
while True: | |
params = { | |
'DatabaseName': DATABASE_NAME, | |
'TableName': TABLE_NAME | |
} | |
if next_token: | |
params["NextToken"] = next_token | |
response = glue_client.get_partitions(**params) | |
partitions = response['Partitions'] | |
to_delete.extend([partition["Values"][0] for partition in partitions]) | |
if "NextToken" in response: | |
next_token = response["NextToken"] | |
else: | |
break | |
#2. delete partitions | |
for i in range(0, len(to_delete), BATCH): | |
batch_to_delete = [{k:[v]} for k,v in zip(["Values"]*BATCH, to_delete[i:i+BATCH])] | |
print(batch_to_delete) | |
response = glue_client.batch_delete_partition( | |
DatabaseName=DATABASE_NAME, | |
TableName=TABLE_NAME, | |
PartitionsToDelete=batch_to_delete) | |
print() | |
print(response) | |
print() | |
#3. add the partitions again, using MSCK repair. # !!!THIS SCRIPT ASSUME HIVE COMPATIBLE PARTITIONS!!! | |
response = athena_client.start_query_execution( | |
QueryString=f'MSCK REPAIR TABLE {DATABASE_NAME}.{TABLE_NAME};', | |
QueryExecutionContext={ | |
'Database': DATABASE_NAME | |
}, | |
ResultConfiguration={ | |
'OutputLocation': ATHENA_OUTPUT_LOCATION | |
}, | |
WorkGroup=ATHENA_WORKGROUP | |
) | |
query_execution_id = response['QueryExecutionId'] | |
while True: | |
response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) | |
if response['QueryExecution']['Status']['State'] in ['QUEUED','RUNNING']: | |
print("Waiting MSCK REPAIR to complete." ) | |
time.sleep(5) | |
else: | |
print() | |
print(response) | |
print() | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment