-
-
Save PedroMartinSteenstrup/6f40a32e6d1aa8851011b58a528c8119 to your computer and use it in GitHub Desktop.
emr stuff
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
from analytics_utils.config import config_2 | |
class EMR: | |
def __init__(self, **kwargs): | |
self.config_object = getattr(config_2, 'EMRUtils')(**kwargs) | |
self.config = self.config_object.config | |
self.cluster_config = None | |
self._set_client() | |
def _set_client(self): | |
self.client = boto3.client('emr', region_name=self.config['aws_region']) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import json | |
import inspect | |
import time | |
import requests | |
JSON_HEADER = {'Content-Type': 'application/json'} | |
class Cluster: | |
def __init__(self, instance_count=3, instance_type='m4.large', | |
instance_type_master=None, instance_type_worker=None): | |
self.client = boto3.client('emr', region_name='eu-central-1') | |
print('Waiting for the EMR cluster to boot...', flush=True) | |
spin_up_response = self.client.run_job_flow( | |
Name="Spark On-Demand", | |
LogUri='s3://xxxxx-xx-xxx/emr/logs', | |
ReleaseLabel='emr-5.24.1', | |
Instances={ | |
'InstanceGroups': [ | |
{ | |
'Name': "Master nodes", | |
'Market': 'ON_DEMAND', | |
'InstanceRole': 'MASTER', | |
'InstanceType': instance_type_master or instance_type, | |
'InstanceCount': 1, | |
'Configurations': [ | |
{ | |
'Classification': 'spark-env', | |
'Configurations': [ | |
{ | |
'Classification': 'export', | |
'Properties': { | |
'PYSPARK_PYTHON': '/usr/bin/python3' | |
} | |
} | |
] | |
}, | |
{ | |
'Classification': 'spark-defaults', | |
'Properties': | |
{ | |
'spark.delta.logStore.class': "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" | |
} | |
} | |
] | |
}, | |
{ | |
'Name': "Slave nodes", | |
'Market': 'ON_DEMAND', | |
'InstanceRole': 'CORE', | |
'InstanceType': instance_type_worker or instance_type, | |
'InstanceCount': instance_count - 1 | |
} | |
], | |
'KeepJobFlowAliveWhenNoSteps': True, | |
'TerminationProtected': False, | |
'Ec2SubnetId': 'subnet-xxxxxxxxxx', | |
'Ec2KeyName': 'xxxxxxxxxxxxxxx' | |
}, | |
Applications=[ | |
{'Name': 'hadoop'}, | |
{'Name': 'spark'}, | |
{'Name': 'hive'}, | |
{'Name': 'livy'}, | |
{'Name': 'zeppelin'} | |
], | |
Tags=[ | |
{ | |
'Key': 'Name', | |
'Value': 'spark on-demand' | |
}, | |
{ | |
'Key': 'hostgroup', | |
'Value': 'analytics' | |
} | |
], | |
BootstrapActions=[ | |
{ | |
'Name': 'Prepare Python environment', | |
'ScriptBootstrapAction': { | |
'Path': 's3://htg-dw-c1/emr/bootstrap.sh' | |
} | |
} | |
], | |
Steps=[ | |
{ | |
'Name': 'Delta Lake - Copy Jar', | |
'ActionOnFailure': 'CANCEL_AND_WAIT', | |
'HadoopJarStep': { | |
'Jar': 'command-runner.jar', | |
'Args': ['sudo', 'aws', 's3', 'cp', 's3://xxxxxxxxxx.jar', | |
'/usr/lib/livy/repl_2.11-jars'] | |
} | |
}, | |
{ | |
'Name': 'Apache Livy - Copy Setup', | |
'ActionOnFailure': 'CANCEL_AND_WAIT', | |
'HadoopJarStep': { | |
'Jar': 'command-runner.jar', | |
'Args': ['aws', 's3', 'cp', 's3://xxxxxxxxxx.sh', '/home/hadoop/'] | |
} | |
}, | |
{ | |
'Name': 'Apache Livy - Run Setup', | |
'ActionOnFailure': 'CANCEL_AND_WAIT', | |
'HadoopJarStep': { | |
'Jar': 'command-runner.jar', | |
'Args': ['bash', '/home/hadoop/step-livy.sh'] | |
} | |
} | |
], | |
VisibleToAllUsers=True, | |
JobFlowRole='EMR_EC2_DefaultRole', | |
ServiceRole='EMR_DefaultRole' | |
) | |
self.id = spin_up_response['JobFlowId'] | |
self.client.get_waiter('cluster_running').wait(ClusterId=self.id) | |
final_step_id = self.client.list_steps(ClusterId=self.id)['Steps'][0]['Id'] | |
self.client.get_waiter('step_complete').wait(ClusterId=self.id, StepId=final_step_id) | |
print(f'...cluster ({self.id}) is now running.', flush=True) | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
print(f'Terminating the cluster ({self.id}).', flush=True) | |
self.terminate() | |
@property | |
def dns(self): | |
description_response = self.client.describe_cluster(ClusterId=self.id) | |
return description_response['Cluster']['MasterPublicDnsName'] | |
def terminate(self): | |
self.client.terminate_job_flows(JobFlowIds=[self.id]) | |
class Session: | |
def __init__(self, cluster_dns): | |
self.dns = cluster_dns | |
self.id = '' | |
response = requests.post(self.url, data=json.dumps({'kind': 'pyspark'}), headers=JSON_HEADER) | |
self.id = response.json()['id'] | |
print('Establishing a Spark session...', flush=True) | |
status = None | |
while status != 'idle': | |
time.sleep(3) | |
status_response = requests.get(self.url, headers=JSON_HEADER) | |
status = status_response.json()['state'] | |
print(f'...session ({self.id}) is ready.', flush=True) | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.terminate() | |
@property | |
def url(self): | |
return f'http://{self.dns}:8998/sessions/{self.id}' | |
def terminate(self): | |
print(f'Terminating the session ({self.id}).', flush=True) | |
requests.delete(self.url, headers=JSON_HEADER) | |
def submit(self, code, follow=True): | |
endpoint = f'{self.url}/statements' | |
response = requests.post(endpoint, data=json.dumps({'code': code}), headers=JSON_HEADER) | |
endpoint = f"{endpoint}/{response.json()['id']}" | |
if follow: | |
state = None | |
while state != 'available': | |
time.sleep(3) | |
response_json = requests.get(endpoint, headers=JSON_HEADER).json() | |
state = response_json['state'] | |
if response_json['output']['status'] == 'error': | |
print(f"Statement exception: {response_json['output']['evalue']}") | |
for trace in response_json['output']['traceback']: | |
print(trace) | |
output = response_json['output']['data'].get('text/plain') | |
if output is not None: | |
print(output, flush=True) | |
return response | |
def emr_cluster(**kwargs_cluster): | |
def decorator(func): | |
def wrapper(*args, **kwargs): | |
with Cluster(**kwargs_cluster) as cluster: | |
with Session(cluster.dns) as session: | |
func_code = inspect.getsource(func).split('\n')[1:] | |
arguments_raw = [('', arg) for arg in args] + [(f'{key}=', value) for key, value in kwargs.items()] | |
arguments = ', '.join(f'{k}\'{v}\'' if isinstance(v, str) else f'{k}{v}' for k, v in arguments_raw) | |
func_call = f'{func.__name__}({arguments})' | |
session.submit('\n'.join(func_code + [func_call])) | |
return wrapper | |
return decorator |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment