Skip to content

Instantly share code, notes, and snippets.

@salrashid123
Last active September 10, 2024 12:20
Show Gist options
  • Save salrashid123/bca7a24e1d59567adb89fef093d8564d to your computer and use it in GitHub Desktop.
Save salrashid123/bca7a24e1d59567adb89fef093d8564d to your computer and use it in GitHub Desktop.
AWS SigV4 Signing without SDK using Trusted Platform Module (TPM)

AWS v4 signed request using Trusted Platform Module

similar to https://github.com/aws-samples/sigv4-signing-examples but the varaint below invokes sts.GetCallerIdentity where the AWS_SECRET_ACCESS_KEY is embedded in a TPM

In other words, this sample will seal the AWS_SECRET_ACCESS_KEY inside a TPM and then use the TPM to create an AWS v4 signature at no time does the secret leave the TPM but it can be made to issue an hmac

for more info, see

Setup using a software TPM

mkdir /tmp/myvtpm
sudo swtpm_setup --tpmstate /tmp/myvtpm --tpm2 --create-ek-cert
sudo swtpm socket --tpmstate dir=/tmp/myvtpm --tpm2 --server type=tcp,port=2321 --ctrl type=tcp,port=2322 --flags not-need-init,startup-clear

### in new window
export TPM2TOOLS_TCTI="swtpm:port=2321"
tpm2_pcrread sha256:23

then export some env vars and run the script

export AWS_ACCESS_KEY_ID=redacted
export AWS_SECRET_ACCESS_KEY=redacted
python3 main_tpm.py

this script will embed the AWS secret into the TPM and then issue signed headers which will be used to call the STS GetCallerIdentity endpoint. Each run of this script will embed the key but you can certainly persist the embedded key and reuse it for later invocations. That part is left out of this sample.

  • main_tpm.py:
import datetime
import hashlib
import hmac
import requests
import os
import sys

from tpm2_pytss import *

from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend

# AWS access keys
access_key = os.environ['AWS_ACCESS_KEY_ID']  
secret_key = os.environ['AWS_SECRET_ACCESS_KEY']



ectx = ESAPI(tcti="swtpm:port=2321")
ectx.startup(TPM2_SU.CLEAR)

k = 'AWS4'+secret_key

inPublic = TPM2B_PUBLIC(
            TPMT_PUBLIC.parse(
                alg="rsa2048",
                objectAttributes=TPMA_OBJECT.USERWITHAUTH
                | TPMA_OBJECT.RESTRICTED
                | TPMA_OBJECT.DECRYPT
                | TPMA_OBJECT.FIXEDTPM
                | TPMA_OBJECT.FIXEDPARENT
                | TPMA_OBJECT.SENSITIVEDATAORIGIN,
            )
)

inPublicHMAC = TPM2B_PUBLIC(
            TPMT_PUBLIC.parse(
                alg="hmac",
                nameAlg="sha256",
                objectAttributes=TPMA_OBJECT.USERWITHAUTH
                | TPMA_OBJECT.FIXEDPARENT
                | TPMA_OBJECT.FIXEDTPM
                | TPMA_OBJECT.SIGN_ENCRYPT,
            )
)

inSensitive = TPM2B_SENSITIVE_CREATE()
primary1, _, _, _, _ = ectx.create_primary(inSensitive, inPublic)

inSensitiveHMAC = TPM2B_SENSITIVE_CREATE(TPMS_SENSITIVE_CREATE(data=TPM2B_SENSITIVE_DATA(k.encode("utf-8"))))
priv, pub, _, _, _ = ectx.create(primary1, inSensitiveHMAC, inPublicHMAC)

## if you want, you can write the pub/priv to disk (eg pub.marshal())
# pub2, _ = TPM2B_PUBLIC.unmarshal(pub.marshal())
# priv2, _ = TPM2B_PRIVATE.unmarshal(priv.marshal())


childHandle = ectx.load(primary1, priv, pub)
ectx.flush_context(primary1)
# hmac = ectx.hmac(childHandle, b"1234", TPM2_ALG.SHA256)
# print(hmac.marshal().hex())
# ectx.flush_context(childHandle)


import http.client as http_client
http_client.HTTPConnection.debuglevel = 1
import logging

logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True



# Request parameters
method = 'POST'
service = 'sts'
host = "sts.amazonaws.com"
region = 'us-east-1'
endpoint = '/'

# Create a datetime object for signing
t = datetime.datetime.utcnow()
amzdate = t.strftime('%Y%m%dT%H%M%SZ')
datestamp = t.strftime('%Y%m%d') 

# Create the canonical request
canonical_uri = endpoint

canonical_querystring = ''
canonical_headers = 'content-type:application/x-www-form-urlencoded' + '\n' +'host:' + host + '\n' + 'x-amz-date:'  + amzdate + '\n'

signed_headers = 'content-type;host;x-amz-date'

# https://docs.aws.amazon.com/STS/latest/APIReference/API_GetCallerIdentity.html
payload = 'Action=GetCallerIdentity&Version=2011-06-15'
#payload = 'Action=GetSessionToken&DurationSeconds=3600&Version=2011-06-15'
#payload = 'Action=AssumeRole&&RoleSessionName=testAR&RoleArn=arn:aws:iam::291738886548:role/gcpsts&Version=2011-06-15'

payload_hash = hashlib.sha256(payload.encode('utf-8')).hexdigest()
canonical_request = (method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n'
                     + canonical_headers + '\n' + signed_headers + '\n' + payload_hash)

# Create the string to sign
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = datestamp + '/' + region + '/' + service + '/' + 'aws4_request'
string_to_sign = (algorithm + '\n' +  amzdate + '\n' +  credential_scope + '\n' +  
                  hashlib.sha256(canonical_request.encode('utf-8')).hexdigest())

def sign(key, msg):
    return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()

def getSignatureKey(key, dateStamp, regionName, serviceName):

    ## instead of the first hmac operation using the AWS Secret, use the TPM based key
    #kDate = sign(("AWS4" + key).encode("utf-8"), dateStamp)

    thmac = ectx.hmac(childHandle, dateStamp, TPM2_ALG.SHA256)
    ectx.flush_context(childHandle)
    kDate = thmac.__bytes__()
    ectx.close()

    kRegion = sign(kDate, regionName)
    kService = sign(kRegion, serviceName)
    kSigning = sign(kService, "aws4_request")
    return kSigning

# Sign the string    
signing_key = getSignatureKey(secret_key, datestamp, region, service)
signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest()

# Add signing information to the request
authorization_header = (algorithm + ' ' + 'Credential=' + access_key + '/' + credential_scope + ', ' +  
                        'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature)

print(authorization_header)
# Make the request
headers = {'content-type': 'application/x-www-form-urlencoded',  
           'host': host,
           'x-amz-date': amzdate,
           'Authorization': authorization_header}
request_url = 'https://' + host + canonical_uri


response = requests.post(request_url, headers=headers, data=payload,allow_redirects=False, timeout=5)
response.raise_for_status()

print(response.text)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment