Skip to content

Instantly share code, notes, and snippets.

@alexbasista
Last active September 7, 2021 13:03
Show Gist options
  • Save alexbasista/1c76865d4db2fad1c740eaaa0bb06808 to your computer and use it in GitHub Desktop.
Save alexbasista/1c76865d4db2fad1c740eaaa0bb06808 to your computer and use it in GitHub Desktop.
Examples of interacting with Terraform Cloud/Enterprise API via Python
import os
import sys
import json
import requests
def create_team(hostname, org, name, **kwargs):
"""
POST /organizations/:organization_name/teams
"""
if kwargs.get('tfe_token'):
token = kwargs.get('tfe_token')
kwargs.pop('tfe_token')
elif os.getenv('TFE_TOKEN'):
token = os.getenv('TFE_TOKEN')
else:
print('ERROR: Missing API token.')
sys.exit()
header = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/vnd.api+json'
}
teams_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations', org, 'teams'])
payload = {}
data = {}
data['type'] = 'teams'
attributes = {}
attributes['name'] = name
if kwargs.get('organization_access'):
if not isinstance(kwargs.get('organization_access'), dict):
print("ERROR: 'organization_access' argument must be of type dict.")
sys.exit()
else:
attributes['organization-access'] = kwargs.get('organization_access')
if kwargs.get('visibility'):
attributes['visibility'] = kwargs.get('visibility')
data['attributes'] = attributes
payload['data'] = data
r = requests.post(url=teams_endpoint, headers=header, data=json.dumps(payload))
r.raise_for_status()
return r
def create_oauth_client(hostname, org, name, service_provider, http_url, api_url, oauth_token_string, **kwargs):
"""
POST /organizations/:organization_name/oauth-clients
"""
if kwargs.get('tfe_token'):
token = kwargs.get('tfe_token')
kwargs.pop('tfe_token')
elif os.getenv('TFE_TOKEN'):
token = os.getenv('TFE_TOKEN')
else:
print('ERROR: Missing API token.')
sys.exit()
header = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/vnd.api+json'
}
oc_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations', org, 'oauth-clients'])
payload = {}
data = {}
data['type'] = 'oauth-clients'
attributes = {}
attributes['service-provider'] = service_provider
attributes['name'] = name
attributes['http-url'] = http_url
attributes['api-url'] = api_url
attributes['oauth-token-string'] = oauth_token_string
if kwargs.get('private_key'):
attributes['private-key'] = kwargs.get('private_key')
data['attributes'] = attributes
payload['data'] = data
r = requests.post(url=oc_endpoint, headers=header, data=json.dumps(payload))
r.raise_for_status()
return r
def create_org(hostname, name, email, **kwargs):
"""
POST /organizations
"""
org_attributes_list = [
'name',
'email',
'session_timeout',
'session_remember',
'collaborator_auth_policy',
'cost_estimation_enabled',
'owners_team_saml_role_id'
]
if kwargs.get('tfe_token'):
token = kwargs.get('tfe_token')
kwargs.pop('tfe_token')
elif os.getenv('TFE_TOKEN'):
token = os.getenv('TFE_TOKEN')
else:
print('ERROR: Missing API token.')
sys.exit()
header = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/vnd.api+json'
}
org_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations'])
payload = {}
data = {}
data['type'] = 'organizations'
attributes = {}
attributes['name'] = name
attributes['email'] = email
for key, value in kwargs.items():
if key in org_attributes_list:
attributes[key] = value
else:
print("WARNING: '{}' is an invalid key for Organizations API.".format(key))
data['attributes'] = attributes
payload['data'] = data
r = requests.post(url=org_endpoint, headers=header, data=json.dumps(payload))
return r
import os
import sys
import json
import requests
def _get_oauth_token_id(hostname, header, org, oauth_client_name):
"""
GET /organizations/:organization_name/oauth-clients
"""
oc_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations', org, 'oauth-clients'])
r = requests.get(url=oc_endpoint, headers=header)
r.raise_for_status()
oc = [ i for i in r.json()['data'] if i['attributes']['name'] == oauth_client_name ]
return oc[0]['relationships']['oauth-tokens']['data'][0]['id']
def create_policy_set(hostname, org, name, **kwargs):
"""
POST /organizations/:organization_name/policy-sets
"""
ps_attributes_list = [
'name',
'description',
'global',
'policies_path'
]
ps_vcs_repo_attributes_list = [
'branch',
'identifier',
'ingress_submodules',
'oauth_token_id'
]
ps_relationships_list = [
'workspace_ids'
]
ps_kwargs_non_params = [
'oauth_client_name'
]
if kwargs.get('tfe_token'):
token = kwargs.get('tfe_token')
kwargs.pop('tfe_token')
elif os.getenv('TFE_TOKEN'):
token = os.getenv('TFE_TOKEN')
else:
print('ERROR: Missing API token.')
sys.exit()
header = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/vnd.api+json'
}
ps_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations', org, 'policy-sets'])
payload = {}
data = {}
data['type'] = 'policy-sets'
attributes = {}
vcs_repo = {}
attributes['name'] = name
for key, value in kwargs.items():
if key in ps_attributes_list:
attributes[key] = value
elif key in ps_vcs_repo_attributes_list:
vcs_repo[key] = value
elif key in ps_relationships_list:
pass
elif key in ps_kwargs_non_params:
pass
else:
print("WARNING: '{}' is an invalid key for Policy Sets API".format(key))
if len(vcs_repo) > 0:
if not kwargs.get('oauth_token_id'):
if not kwargs.get('oauth_client_name'):
print("ERROR: A value is required for either 'oauth_token_id' or 'oauth_client_name' if a VCS repo identifier is specified.")
sys.exit()
else:
ot_id = _get_oauth_token_id(hostname=hostname, header=header, org=org, oauth_client_name=kwargs.get('oauth_client_name'))
vcs_repo['oauth-token-id'] = ot_id
else:
ot_id = kwargs.get('oauth_token_id')
vcs_repo['oauth-token-id'] = ot_id
attributes['vcs-repo'] = vcs_repo
if kwargs.get('workspace_ids'):
relationships = {}
workspaces = {}
workspaces_data = []
for ws in kwargs.get('workspace_ids'):
obj = { 'id': ws, 'type': 'workspaces' }
workspaces_data.append(obj)
workspaces['data'] = workspaces_data
relationships['workspaces'] = workspaces
data['relationships'] = relationships
data['attributes'] = attributes
payload['data'] = data
r = requests.post(url=ps_endpoint, headers=header, data=json.dumps(payload))
r.raise_for_status()
return r
import os
import sys
import json
import requests
def _get_workspace_id(hostname, header, org, name):
"""
GET /workspaces/:workspace_id
"""
ws_endpoint = '/'.join(['https:/', hostname, 'organizations', org, 'workspaces', name])
r = requests.get(url=ws_endpoint, headers=header)
return r.json()['data']['id']
def _lock_workspace(hostname, header, ws_id, reason='Locked by Python script.'):
"""
POST /workspaces/:workspace_id/actions/lock
"""
ws_lock_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'workspaces', ws_id, 'actions', 'lock'])
payload = { "reason": reason }
r = requests.post(url=ws_lock_endpoint, headers=header, data=json.dumps(payload))
return r
def _unlock_workspace(hostname, header, ws_id):
"""
POST /workspaces/:workspace_id/actions/unlock
"""
ws_unlock_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'workspaces', ws_id, 'actions', 'unlock'])
r = requests.post(url=ws_unlock_endpoint, headers=header)
return r
def create_state_version(hostname, org, ws, serial, md5, state, **kwargs):
"""
POST /workspaces/:workspace_id/state-versions
"""
if kwargs.get('tfe_token'):
token = kwargs.get('tfe_token')
kwargs.pop('tfe_token')
elif os.getenv('TFE_TOKEN'):
token = os.getenv('TFE_TOKEN')
else:
print('ERROR: Missing API token.')
sys.exit()
header = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/vnd.api+json'
}
ws_id = _get_workspace_id(hostname=hostname, header=header, org=org, name=ws)
sv_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'workspaces', ws_id, 'state-versions'])
payload = {}
data = {}
data['type'] = 'state-versions'
attributes = {}
attributes['serial'] = serial
attributes['md5'] = md5
attributes['state'] = state
if kwargs.get('lineage'):
attributes['lineage'] = kwargs.get('lineage')
data['attributes'] = attributes
payload['data'] = data
r = requests.post(url=sv_endpoint, headers=header, data=json.dumps(payload))
r.raise_for_status()
return r
import os
import sys
import json
import requests
def _does_workspace_exist(hostname, header, org, name):
"""
GET /organizations/:organization_name/workspaces/:name
"""
ws_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations', org, 'workspaces', name])
r = requests.get(url=ws_endpoint, headers=header)
if r.status_code == 404:
return False
elif r.status_code == 200:
return True
else:
print('ERROR: Received unexpected reponse code: {}'.format(r.status_code))
return True
def create_workspace(hostname, org, name, **kwargs):
"""
POST /organizations/:organization_name/workspaces
"""
ws_attributes_list = [
'name',
'agent_pool_id',
'allow_destroy_plan',
'auto_apply',
'description',
'execution_mode',
'file_triggers_enabled',
'source_name',
'source_url',
'queue_all_runs',
'speculative_enabled',
'terraform_version',
'trigger_prefixes'
'working_directory',
]
ws_vcs_repo_attributes_list = [
'oauth_token_id',
'branch',
'default_branch',
'ingress_submodules',
'identifier'
]
if kwargs.get('tfe_token'):
token = kwargs.get('tfe_token')
kwargs.pop('tfe_token')
elif os.getenv('TFE_TOKEN'):
token = os.getenv('TFE_TOKEN')
else:
print("ERROR: Missing API token.")
sys.exit()
header = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/vnd.api+json'
}
ws_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations', org, 'workspaces'])
payload = {}
data = {}
data['type'] = 'workspaces'
attributes = {}
vcs_repo = {}
attributes['name'] = name
for key, value in kwargs.items():
if key in ws_attributes_list:
attributes[key] = value
elif key in ws_vcs_repo_attributes_list:
vcs_repo[key] = value
else:
print("WARNING: '{}' is an invalid key for Workspaces API".format(key))
if len(vcs_repo) > 0:
attributes['vcs-repo'] = vcs_repo
data['attributes'] = attributes
payload['data'] = data
if _does_workspace_exist(hostname=hostname, header=header, org=org, name=name):
print("ERROR: Workspace '{}' already exists".format(name))
sys.exit()
r = requests.post(url=ws_endpoint, headers=header, data=json.dumps(payload))
r.raise_for_status()
return r
import os
import sys
import json
import requests
def list_orgs(hostname, **kwargs):
"""
GET /organizations
"""
if kwargs.get('tfe_token'):
token = kwargs.get('tfe_token')
elif os.getenv('TFE_TOKEN'):
token = os.getenv('TFE_TOKEN')
else:
print("ERROR: Missing API token.")
sys.exit()
header = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/vnd.api+json'
}
org_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations'])
r = requests.get(url=org_endpoint, headers=header)
r.raise_for_status()
return [ i['id'] for i in r.json()['data'] ]
import os
import sys
import json
import requests
def _get_ps_id(hostname, header, org, name):
"""
GET /organizations/:organization_name/policy-sets
"""
ps_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'organizations', org, 'policy-sets'])
r = requests.get(url=ps_endpoint, headers=header)
ps_id = [ i['id'] for i in r.json()['data'] if i['attributes']['name'] == name ]
return ps_id[0]
def _get_psp_id(hostname, header, org, policy_set, parameter):
"""
GET /policy-sets/:policy_set_id/parameters
"""
ps_id = _get_ps_id(hostname=hostname, header=header, org=org, name=policy_set)
psp_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'policy-sets', ps_id, 'parameters'])
r = requests.get(url=psp_endpoint, headers=header)
psp_id = [ i['id'] for i in r.json()['data'] if i['attributes']['key'] == parameter ]
return psp_id[0]
def create(hostname, org, policy_set, key, value, sensitive=False, **kwargs):
"""
POST /policy-sets/:policy_set_id/parameters
"""
if kwargs.get('tfe_token'):
token = kwargs.get('tfe_token')
kwargs.pop('tfe_token')
elif os.getenv('TFE_TOKEN'):
token = os.getenv('TFE_TOKEN')
else:
print('ERROR: Missing API token.')
sys.exit()
header = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/vnd.api+json'
}
ps_id = _get_ps_id(hostname=hostname, header=header, org=org, name=policy_set)
psp_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'policy-sets', ps_id, 'parameters'])
payload = {}
data = {}
data['type'] = 'vars'
attributes = {}
attributes['key'] = key
attributes['value'] = value
attributes['category'] = 'policy-set'
attributes['sensitive'] = sensitive
data['attributes'] = attributes
payload['data'] = data
r = requests.post(url=psp_endpoint, headers=header, data=json.dumps(payload))
r.raise_for_status()
return r
def list(hostname, org, policy_set, **kwargs):
"""
GET /policy-sets/:policy_set_id/parameters
"""
if kwargs.get('tfe_token'):
token = kwargs.get('tfe_token')
kwargs.pop('tfe_token')
elif os.getenv('TFE_TOKEN'):
token = os.getenv('TFE_TOKEN')
else:
print('ERROR: Missing API token.')
sys.exit()
header = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/vnd.api+json'
}
ps_id = _get_ps_id(hostname=hostname, header=header, org=org, name=policy_set)
psp_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'policy-sets', ps_id, 'parameters'])
r = requests.get(url=psp_endpoint, headers=header)
r.raise_for_status()
return r
def update(hostname, org, policy_set, parameter, **kwargs):
"""
PATCH /policy-sets/:policy_set_id/parameters/:parameter_id
"""
psp_attributes_list = [
'key',
'value',
'sensitive'
]
if kwargs.get('tfe_token'):
token = kwargs.get('tfe_token')
kwargs.pop('tfe_token')
elif os.getenv('TFE_TOKEN'):
token = os.getenv('TFE_TOKEN')
else:
print('ERROR: Missing API token.')
sys.exit()
header = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/vnd.api+json'
}
ps_id = _get_ps_id(hostname=hostname, header=header, org=org, name=policy_set)
psp_id = _get_psp_id(hostname=hostname, header=header, org=org, policy_set=policy_set, parameter=parameter)
psp_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'policy-sets', ps_id, 'parameters', psp_id])
payload = {}
data = {}
data['type'] = 'vars'
data['id'] = psp_id
attributes = {}
for key, value in kwargs.items():
if key in psp_attributes_list:
attributes[key] = value
else:
print("ERROR: '{}' is an invalid key for Policy Set Parameters API".format(key))
attributes['category'] = 'policy-set'
data['attributes'] = attributes
payload['data'] = data
r = requests.patch(url=psp_endpoint, headers=header, data=json.dumps(payload))
r.raise_for_status()
return r
def delete(hostname, org, policy_set, parameter, **kwargs):
"""
DELETE /policy-sets/:policy_set_id/parameters/:parameter_id
"""
if kwargs.get('tfe_token'):
token = kwargs.get('tfe_token')
kwargs.pop('tfe_token')
elif os.getenv('TFE_TOKEN'):
token = os.getenv('TFE_TOKEN')
else:
print('ERROR: Missing API token.')
sys.exit()
header = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/vnd.api+json'
}
ps_id = _get_ps_id(hostname=hostname, header=header, org=org, name=policy_set)
psp_id = _get_psp_id(hostname=hostname, header=header, org=org, policy_set=policy_set, parameter=parameter)
psp_endpoint = '/'.join(['https:/', hostname, 'api', 'v2', 'policy-sets', ps_id, 'parameters', psp_id])
r = requests.delete(url=psp_endpoint, headers=header)
r.raise_for_status()
return r
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment