Created
January 20, 2020 23:04
-
-
Save jtalmi/c6265c8a17120cfb150c97512cb68aa6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
'''Script to autogenerate dbt commands for changed models against a chosen git branch, | |
with support for fully refreshing models with specific tags. | |
Usage: | |
$ python3 dbt_run_changed.py --target_branch master --target dev --commands [run, test] --full_refresh_tags [full_refresh] | |
Assume model1 and model2 are changed models and model2 is tagged with "full_refresh". The script will generate three dbt commands: | |
1. dbt run --target dev --model model2 --full-refresh | |
2. dbt run --target dev --model model1 | |
3. dbt test --target dev --model model1 model2 | |
If we include the --full_refresh tag in the function call, command #2 will be: | |
2. dbt run --target dev --model model1 --full-refresh | |
''' | |
import os | |
import sys | |
import json | |
import subprocess | |
import logging | |
from typing import List, Dict | |
from fire import Fire | |
logging.basicConfig(stream=sys.stdout, level='INFO') | |
def intersection_is_not_empty(a: List, b: List) -> bool: | |
return len(set(a) & set(b)) > 0 | |
def fetch_changed_models(branch: str='master') -> List: | |
''' Returns changed dbt models relative to a git branch''' | |
list_of_changed_files = subprocess.check_output(["git", "diff", branch, "--name-only"]).split() | |
list_of_changed_files = [file.decode('utf-8') for file in list_of_changed_files] | |
list_of_changed_models = [os.path.splitext(model)[0] for model in list_of_changed_files if model.startswith('models/') and model.endswith('.sql')] | |
list_of_remaining_changed_models = [os.path.basename(model) for model in list_of_changed_models if os.path.exists("{}/{}.sql".format(os.getcwd(), model))] | |
return list_of_remaining_changed_models | |
def fetch_project_json(list_of_models: List=None) -> Dict: | |
'''Returns a dict of model configs''' | |
dbt_command = ["dbt", "ls", "--output", "json"] | |
if list_of_models: | |
dbt_command += (['--model'] + list_of_models) | |
else: | |
dbt_command += ['--resource-type', 'model'] | |
model_dicts = subprocess.check_output(dbt_command).decode('utf-8').split('\n')[:-1] | |
model_dicts = [json.loads(model_dict) for model_dict in model_dicts] | |
project_dict = {} | |
for model in model_dicts: | |
project_dict[model['name']] = model | |
return project_dict | |
def generate_dbt_command(command: str, target: str, include_models: List=[], exclude_models: List=[], children: bool=False, full_refresh: bool=False) -> List: | |
'''Returns a dbt command based on paramters''' | |
dbt_command = ['dbt', command, '--target', target] | |
if include_models: | |
models_with_children = [f'{model}+' for model in include_models] if children else include_models | |
dbt_command += (['--model'] + models_with_children) | |
if exclude_models: | |
dbt_command += (['--exclude'] + exclude_models) | |
if full_refresh: | |
dbt_command += ['--full-refresh'] | |
return dbt_command | |
def generate_dbt_commands(project_dict: Dict, command: bool, target: str, model_names: List, children: bool, full_refresh: bool, full_refresh_tags: List) -> List: | |
'''Generates three possible dbt commands: | |
- dbt run command for models with full refresh tags | |
- dbt run command for models without full refresh tags | |
- dbt test command''' | |
dbt_commands = [] | |
remaining_models = model_names | |
if command == 'run' and full_refresh_tags: | |
tagged_models = [model for model in remaining_models if intersection_is_not_empty(full_refresh_tags, project_dict[model]['config']['tags'])] | |
if tagged_models: | |
dbt_commands.append(generate_dbt_command(command, target, tagged_models, children=children, full_refresh=True)) | |
remaining_models = list(set(model_names) - set(tagged_models)) | |
if command == 'run' and remaining_models: | |
dbt_commands.append(generate_dbt_command(command, target, remaining_models, children=children, full_refresh=full_refresh)) | |
elif command == 'test': | |
dbt_commands.append(generate_dbt_command(command, target, remaining_models, children=children)) | |
return dbt_commands | |
def generate_and_execute_dbt_commands(target_branch: str, target: str, commands: List=['run', 'test'], children: bool=False, full_refresh: bool=False, full_refresh_tags: List=[]) -> None: | |
'''Generates and executes dbt commands''' | |
list_of_changed_models = fetch_changed_models(target_branch) | |
if not list_of_changed_models: | |
return "No changed models" | |
project_dict = fetch_project_json() | |
logging.info('Changed models: %s', " ".join(list_of_changed_models)) | |
logging.info('Full refresh models with these tags: %s', full_refresh_tags) | |
dbt_commands = [] | |
for command in commands: | |
dbt_commands += generate_dbt_commands(project_dict, command, target, list_of_changed_models, children, full_refresh, full_refresh_tags) | |
for dbt_command in dbt_commands: | |
#TODO: change to debug | |
logging.info("Executing: %s", " ".join(dbt_command)) | |
dbt_return_code = subprocess.call(dbt_command) | |
if dbt_return_code != 0: | |
raise Exception('DBT command {} failed'.format(dbt_command)) | |
return [" ".join(command) for command in dbt_commands] | |
if __name__ == "__main__": | |
Fire(generate_and_execute_dbt_commands) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment