Skip to content

Instantly share code, notes, and snippets.

@alon-hypr
Last active September 16, 2018 12:48
Show Gist options
  • Save alon-hypr/0c673a857b372d0cac71b06872f3f18c to your computer and use it in GitHub Desktop.
Save alon-hypr/0c673a857b372d0cac71b06872f3f18c to your computer and use it in GitHub Desktop.
update running services on image deploy to ecs
import logging
import boto3
import argparse
logger = logging.getLogger('ecs-deploy')
def configure_logger(log_info):
# create logger
logger = logging.getLogger('ecs-deploy')
logger.setLevel(logging.DEBUG)
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(log_info)
# create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
client = boto3.client('ecs')
updated_task_definitions = []
updated_families = []
def latest_revision_of_family(family):
response = client.describe_task_definition(
taskDefinition=family
)
return response.get('taskDefinition')
def get_active_families():
"""
Get latest revisions of all task definitions
"""
families = []
response = client.list_task_definition_families(
status='ACTIVE'
)
families += response.get('families')
while response.get('nextToken'):
response = client.list_task_definition_families(
status='ACTIVE',
nextToken=response.get('nextToken')
)
families += response.get('families')
return families
def update_task_definitions_by_image(image, task_definition_descriptions):
created_task_defs = []
for description in task_definition_descriptions:
if any(container.get('image')==image for container in description.get('containerDefinitions')):
# register a new revision of the task definition
logger.info('Updating task definition {0}'.format(description.get('taskDefinitionArn')))
new_task_def = client.register_task_definition(
family=description.get('family'),
containerDefinitions=description.get('containerDefinitions')
).get('taskDefinition')
created_task_defs.append(new_task_def.get('taskDefinitionArn'))
updated_families.append(description.get('family'))
return created_task_defs
def update_services():
# list all clusters
clusters = []
response = client.list_clusters()
clusters += response.get('clusterArns')
while response.get('nextToken'):
response = client.list_clusters(
nextToken=response.get('nextToken')
)
clusters += response.get('clusterArns')
logger.info('Num of clusters found: {0}'.format(len(clusters)))
# For each cluster, list the services and check if the service contains updated task definition
for cluster in clusters:
response = client.list_services(
cluster=cluster
)
services = response.get('serviceArns')
while response.get('nextToken'):
response = client.list_services(
cluster=cluster,
nextToken=response.get('nextToken')
)
services += response.get('serviceArns')
logger.info('Num of services found in cluster {0}: {1}'.format(cluster, len(services)))
# split services to chunks because describe_services accepts up to 10 services in each call
cluster_services_description = []
chunk_size = 10
service_chunks = [services[i:i + chunk_size] for i in range(0, len(services), chunk_size)]
# describe all services in cluster
for service_chunk in service_chunks:
cluster_services_description += client.describe_services(services=service_chunk, cluster=cluster).get('services')
for service_description in cluster_services_description:
service_task_definition = service_description.get('taskDefinition')
logger.debug('checking if {0} was updated...'.format(service_task_definition))
# describe task definition to get family
task_definition_desc = client.describe_task_definition(taskDefinition=service_task_definition).get('taskDefinition')
# check if task definition updated
if task_definition_desc.get('family') in updated_families:
# get the family of the task definition
logger.info('Updating service {0} | task definition: {1}'.format(service_description.get('serviceArn'), task_definition_desc.get('family')))
# update the service with the latest active task definition in the task definition family
client.update_service(cluster=cluster, service=service_description.get('serviceArn'), taskDefinition=task_definition_desc.get('family'))
def cleanup_task_definitions(new_task_defs):
# Cleanup all updated taskDefinition
for family in updated_families:
definitions = []
response = client.list_task_definitions(
familyPrefix=family,
maxResults=100
)
definitions += response.get('taskDefinitionArns')
while response.get('nextToken'):
response = client.list_services(
cluster=cluster,
nextToken=response.get('nextToken')
)
definitions += response.get('taskDefinitionArns')
for definition in definitions:
if definition not in new_task_defs:
logger.info('Dregistering task definition {0}'.format(definition))
client.deregister_task_definition(taskDefinition=definition)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument('--image', type=str, required=True, dest='image', help='Name of the image to update')
parser.add_argument('--l', type=str, dest='log_level', default='INFO', choices=['DEBUG', 'INFO', 'WARN', 'ERROR'],help='log level')
args = parser.parse_args()
# make sure image has tag
if ':' not in args.image:
args.image = args.image + ':latest'
configure_logger(args.log_level)
active_families = get_active_families()
latest_revisions = []
for family in active_families:
latest_revisions.append(latest_revision_of_family(family))
logger.info('task definitions update STARTED')
new_task_defs = update_task_definitions_by_image(args.image, latest_revisions)
logger.info('task definitions update DONE')
logger.info('updated_families: {0}'.format(updated_families))
logger.info('services update STARTED')
update_services()
logger.info('services update DONE')
logger.info('task definition cleanup STARTED')
cleanup_task_definitions(new_task_defs)
logger.info('task definition cleanup DONE')
@alon-hypr
Copy link
Author

only works with python 3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment