|
#!/usr/bin/env python3 |
|
|
|
#%% |
|
import argparse |
|
import datetime as dt |
|
import json |
|
import os |
|
import urllib.parse |
|
import urllib.request |
|
|
|
#%% |
|
API_BASE_URL = "https://api.github.com" |
|
REQUEST_ACCEPT_VERSION = "application/vnd.github.v3+json" |
|
REQUEST_USER_AGENT = "magnetikonline/remove-workflow-run" |
|
|
|
|
|
#%% |
|
def github_request( |
|
auth_token, path, method=None, parameter_collection=None, parse_response=True |
|
): |
|
# build base request URL/headers |
|
request_url = f"{API_BASE_URL}/{path}" |
|
header_collection = { |
|
"Accept": REQUEST_ACCEPT_VERSION, |
|
"Authorization": f"token {auth_token}", |
|
"User-Agent": REQUEST_USER_AGENT, |
|
} |
|
|
|
if method is None: |
|
# GET method |
|
if parameter_collection is not None: |
|
request_url = ( |
|
f"{request_url}?{urllib.parse.urlencode(parameter_collection)}" |
|
) |
|
|
|
request = urllib.request.Request(headers=header_collection, url=request_url) |
|
else: |
|
# POST/PATCH/PUT/DELETE method |
|
request = urllib.request.Request( |
|
headers=header_collection, method=method, url=request_url |
|
) |
|
|
|
response = urllib.request.urlopen(request) |
|
response_data = {} |
|
if parse_response: |
|
response_data = json.load(response) |
|
|
|
response.close() |
|
|
|
return response_data |
|
|
|
|
|
def get_workflow_runs(auth_token, owner_repo_name): |
|
"""Get dict of simplified run identifiers and attributes""" |
|
filter_keys = {'id', 'name', 'path', 'head_branch', 'status', 'conclusion', 'created_at'} |
|
|
|
request_page = 1 |
|
while True: |
|
data = github_request( |
|
auth_token, |
|
f"repos/{owner_repo_name}/actions/runs", |
|
parameter_collection={"page": request_page}, |
|
) |
|
|
|
run_list = data["workflow_runs"] |
|
if len(run_list) < 1: |
|
# no more items |
|
break |
|
|
|
for run in run_list: |
|
yield {k: v for k,v in run.items() if k in filter_keys} |
|
|
|
# move to next page |
|
request_page += 1 |
|
|
|
|
|
def workflow_run_delete(auth_token, owner_repo_name, run_id): |
|
github_request( |
|
auth_token, |
|
f"repos/{owner_repo_name}/actions/runs/{run_id}", |
|
method="DELETE", |
|
parse_response=False, |
|
) |
|
|
|
|
|
def filter_run_olderthandays(runs, age_days=90): |
|
"""Get run IDs older than 'age_days'""" |
|
now = dt.datetime.now(dt.timezone.utc) |
|
|
|
for run in runs: |
|
run_time = dt.datetime.fromisoformat(run['created_at'].replace('Z','+00:00')) |
|
if (now - run_time).days > age_days: |
|
yield run['id'] |
|
|
|
|
|
def filter_run_name(runs, workflow_name): |
|
"""Get run IDs if workflow name match""" |
|
for run in runs: |
|
if run['name'] == workflow_name: |
|
yield run['id'] |
|
|
|
|
|
def filter_run_file(runs, workflow_filepath): |
|
"""Get run IDs if filepath matches: './github/workflows/<filename>""" |
|
for run in runs: |
|
if run['path'] == workflow_filepath: |
|
yield run['id'] |
|
|
|
|
|
def filter_run_conclusion(runs, conclusion): |
|
"""Get run IDs if conclusion in {'cancelled', 'failure', 'skipped', 'startup_failure', 'success'}""" |
|
|
|
for run in runs: |
|
if run['conclusion'] == conclusion: |
|
yield run['id'] |
|
|
|
|
|
def valid_conclusion(conclusion): |
|
valid_conclusions = {'cancelled', 'failure', 'skipped', 'startup_failure', 'success'} |
|
|
|
if conclusion not in valid_conclusions: |
|
raise ValueError(f"ERROR: conclusion must be in {valid_conclusions}. Was: {conclusion}") |
|
return conclusion |
|
|
|
|
|
#%% |
|
def main(): |
|
# fetch GitHub access token |
|
auth_token = os.environ["GITHUB_TOKEN"] |
|
|
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("--repository", required=True) |
|
parser.add_argument("--age_days", required=False, type=int) |
|
parser.add_argument("--run_name", required=False, type=str) |
|
parser.add_argument("--run_file", required=False, type=str) |
|
parser.add_argument("--conclusion", required=False, type=valid_conclusion) |
|
parser.add_argument("--dry_run", required=False, action='store_true') |
|
cli_args = parser.parse_args() |
|
|
|
if cli_args.dry_run: |
|
print("Dry-run enabled") |
|
|
|
# fetch run id list from repository workflow |
|
runs = list(get_workflow_runs(auth_token, cli_args.repository)) |
|
|
|
ids = set() |
|
if cli_args.age_days is not None: |
|
print(f"Getting runs older than {cli_args.age_days} days") |
|
ids = ids.union(set(filter_run_olderthandays(runs, cli_args.age_days))) |
|
if cli_args.run_name is not None: |
|
print(f"Getting runs named {cli_args.run_name}") |
|
ids = ids.union(set(filter_run_name(runs, cli_args.run_name))) |
|
if cli_args.run_file is not None: |
|
print(f"Getting runs from file {cli_args.run_file}") |
|
ids = ids.union(set(filter_run_file(runs, cli_args.run_file))) |
|
if cli_args.conclusion is not None: |
|
print(f"Getting runs with conclusion {cli_args.conclusion}") |
|
ids = ids.union(set(filter_run_conclusion(runs, cli_args.conclusion))) |
|
|
|
if ( |
|
cli_args.age_days is None |
|
and cli_args.run_name is None |
|
and cli_args.run_file is None |
|
and cli_args.conclusion is None |
|
): |
|
print(f"Default clean: runs older than 90 days and those 'cancelled', 'skipped', or 'startup_failure'") |
|
ids = ids.union(set(filter_run_olderthandays(runs, 90))) |
|
ids = ids.union(set(filter_run_conclusion(runs, 'cancelled'))) |
|
ids = ids.union(set(filter_run_conclusion(runs, 'skipped'))) |
|
ids = ids.union(set(filter_run_conclusion(runs, 'startup_failure'))) |
|
|
|
for run in [r for r in runs if r['id'] in ids]: |
|
if cli_args.dry_run: |
|
print(f"Would delete: {run['name']} ({run['path']}) @ {run['created_at']}") |
|
else: |
|
print(f"Deleting run: {run['name']} ({run['path']}) @ {run['created_at']}") |
|
# workflow_run_delete(auth_token, cli_args.repository, run['id']) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |