Skip to content

Instantly share code, notes, and snippets.

@ericsysmin
Created May 17, 2019 20:44
Show Gist options
  • Save ericsysmin/7c368d9d38367c10426a4d43c1e19ebf to your computer and use it in GitHub Desktop.
Save ericsysmin/7c368d9d38367c10426a4d43c1e19ebf to your computer and use it in GitHub Desktop.
#!/usr/bin/pyhton
# Copyright (c) 2019 Avi Networks
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.errors import AnsibleError
from ansible.module_utils.gcp_utils import GcpSession
import json
class GcpMockModule(object):
def __init__(self, params):
self.params = params
def fail_json(self, *args, **kwargs):
raise AnsibleError(kwargs['msg'])
def raise_for_status(self, response):
try:
response.raise_for_status()
except getattr(requests.exceptions, 'RequestException'):
self.fail_json(msg="GCP returned error: %s" % response.json())
class GcpKmsFilter():
def run(self, method, **kwargs):
params = {
'ciphertext': kwargs.get('ciphertext', None),
'plaintext': kwargs.get('plaintext', None),
'additional_authenticated_data': kwargs.get('additional_authenticated_data', None),
'key_ring': kwargs.get('key_ring', None),
'crypto_key': kwargs.get('crypto_key', None),
'projects': kwargs.get('projects', None),
'scopes': kwargs.get('scopes', None),
'locations': kwargs.get('locations', 'global'),
'auth_kind': kwargs.get('auth_kind', None),
'service_account_file': kwargs.get('service_account_file', None),
'service_account_email': kwargs.get('service_account_email', None),
}
if not params['scopes']:
params['scopes'] = ['https://www.googleapis.com/auth/cloudkms']
fake_module = GcpMockModule(params)
if method == "encrypt":
return self.kms_encrypt(fake_module)
elif method == "decrypt":
return self.kms_decrypt(fake_module)
def kms_decrypt(self, module):
payload = {"ciphertext": module.params['ciphertext']}
if module.params['additional_authenticated_data']:
payload['additionalAuthenticatedData'] = module.params['additional_authenticated_data']
auth = GcpSession(module, 'cloudkms')
url = "https://cloudkms.googleapis.com/v1/projects/{projects}/locations/{locations}/" \
"keyRings/{key_ring}/cryptoKeys/{crypto_key}:decrypt".format(**module.params)
response = auth.post(url, body=payload)
return response.json()['plaintext']
def kms_encrypt(self, module):
payload = {"plaintext": module.params['plaintext']}
if module.params['additional_authenticated_data']:
payload['additionalAuthenticatedData'] = module.params['additional_authenticated_data']
auth = GcpSession(module, 'cloudkms')
url = "https://cloudkms.googleapis.com/v1/projects/{projects}/locations/{locations}/" \
"keyRings/{key_ring}/cryptoKeys/{crypto_key}:encrypt".format(**module.params)
response = auth.post(url, body=payload)
return response.json()['ciphertext']
def gcp_kms_encrypt(plaintext, **kwargs):
return GcpKmsFilter().run('encrypt', plaintext=plaintext, **kwargs)
def gcp_kms_decrypt(ciphertext, **kwargs):
return GcpKmsFilter().run('decrypt', ciphertext=ciphertext, **kwargs)
class FilterModule(object):
def filters(self):
return {
'gcp_kms_encrypt': gcp_kms_encrypt,
'gcp_kms_decrypt': gcp_kms_decrypt
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment