Last active
January 26, 2016 00:48
-
-
Save paxan/96563686c4c3c08129b1 to your computer and use it in GitHub Desktop.
A tool for storing secrets on disk securely using AWS KMS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import absolute_import, print_function | |
import boto3 | |
import errno | |
import json | |
import os | |
import re | |
import shlex | |
import sys | |
import tempfile | |
from base64 import b64encode, b64decode | |
from contextlib import contextmanager | |
from subprocess import Popen, PIPE | |
@contextmanager | |
def atomic_writer(target_path): | |
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(target_path)) | |
os.close(fd) | |
try: | |
yield tmp_path | |
os.rename(tmp_path, target_path) | |
finally: | |
try: | |
os.unlink(tmp_path) | |
except (OSError, IOError) as ex: | |
# It's OK: tmp_path might be missing. | |
if ex.errno != errno.ENOENT: | |
raise | |
def invoke(stdin_data, argv): | |
if isinstance(argv, basestring): | |
argv = shlex.split(argv) | |
p = Popen(argv, stdout=PIPE, stdin=PIPE, stderr=PIPE) | |
ret = p.communicate(stdin_data) | |
if p.returncode != 0: | |
for line in ret[1].splitlines(): | |
print(argv[0] + ':', line, file=sys.stderr) | |
raise RuntimeError('{} exited with: {}'.format(argv[0], p.returncode)) | |
else: | |
return ret | |
def encrypt(kms, key_id, plaintext_path, ciphertext_path=None, key_spec='AES_256', cipher='aes-256-cbc'): | |
data_key_result = kms.generate_data_key(KeyId=key_id, KeySpec=key_spec) | |
if ciphertext_path is None: | |
ciphertext_path = plaintext_path + '.secret' | |
try: | |
with atomic_writer(ciphertext_path) as tmp_path: | |
invoke(data_key_result['Plaintext'], | |
'openssl {} -a -in "{}" -out "{}" -pass stdin' | |
.format(cipher, plaintext_path, tmp_path)) | |
with atomic_writer(ciphertext_path + '.meta') as tmp_meta_path: | |
with open(tmp_meta_path, 'wb') as f: | |
json.dump({'version': 1, | |
'cipher': cipher, | |
'data_key': b64encode(data_key_result['CiphertextBlob'])}, | |
f, indent=2) | |
finally: | |
del data_key_result # eagerly forget this object | |
def decrypt(kms, ciphertext_path, plaintext_path=None): | |
with open(ciphertext_path + '.meta', 'rb') as f: | |
meta = json.load(f) | |
if plaintext_path is None: | |
mo = re.search(r'^(.+)\.secret$', ciphertext_path) | |
if mo is None: | |
raise RuntimeError('Unable to derive the plaintext file name from: ' + ciphertext_path) | |
plaintext_path = mo.group(1) | |
if plaintext_path == '-': # '-' means, conventionally, "decrypt to stdout". | |
return invoke(kms.decrypt(CiphertextBlob=b64decode(meta['data_key']))['Plaintext'], | |
'openssl {} -a -d -in "{}" -pass stdin' | |
.format(meta['cipher'], ciphertext_path))[0] | |
else: | |
with atomic_writer(plaintext_path) as tmp_path: | |
invoke(kms.decrypt(CiphertextBlob=b64decode(meta['data_key']))['Plaintext'], | |
'openssl {} -a -d -in "{}" -out "{}" -pass stdin' | |
.format(meta['cipher'], ciphertext_path, tmp_path)) | |
__all__ = 'encrypt', 'decrypt' | |
if __name__ == '__main__': | |
def help(*_): | |
print('''\ | |
Usage: | |
1) Encryption: | |
{prog} encrypt KMS-KEY-ID PLAINTEXT-FILE [CIPHERTEXT-FILE] | |
Encrypts the plaintext file using the specified KMS key. Encryption | |
metadata will be stored in a sidecar file: 'CIPHERTEXT-FILE.meta' | |
If a ciphertext file is not specified, it will be derived from the | |
plaintext file by appending '.secret' to its name. | |
2) Decryption: | |
{prog} decrypt CIPHERTEXT-FILE [PLAINTEXT-FILE] | |
Decrypts the ciphertext file using KMS. Expects that the encryption | |
metadata is stored in a sidecar file: 'CIPHERTEXT-FILE.meta' | |
If a plaintext file is not specified, it will be derived from the | |
ciphertext file by removing '.secret' suffix from its name. | |
If '-' is specified as plaintext file, output goes to stdout. | |
'''.format(prog='python -m kmstool'), file=sys.stderr) | |
if len(sys.argv) < 2: | |
help() | |
else: | |
fn = globals().get(sys.argv[1], help) | |
kms = boto3.client('kms') | |
ret = fn(kms, *sys.argv[2:]) | |
if isinstance(ret, basestring): | |
print(ret) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment