Skip to content

Instantly share code, notes, and snippets.

@TruncatedDinoSour
Created May 17, 2024 16:05
Show Gist options
  • Save TruncatedDinoSour/8350f07f481b7b137fcd8bfdc3f13086 to your computer and use it in GitHub Desktop.
Save TruncatedDinoSour/8350f07f481b7b137fcd8bfdc3f13086 to your computer and use it in GitHub Desktop.
Example of encrypted an socket connection in Python 3 (For TruncatedDinoSour/armour Standard Network Application Programming Interface (SNAPI) version 0): ECDH (Elliptic Curve Diffie-Hellman) with the SECP521R1 curve, ChaCha20-Poly1305, SHA3, and BLAKE2
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Sample SNAPI client"""
import hashlib
import os
import socket
from warnings import filterwarnings as filter_warnings
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.ec import (
EllipticCurvePrivateKey, EllipticCurvePublicKey)
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
HOST: str = "127.0.0.1"
PORT: int = 3838
def main() -> int:
"""entry/main function"""
client: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((HOST, PORT))
# Generate client's keys.
private_key: EllipticCurvePrivateKey = ec.generate_private_key(ec.SECP521R1())
public_key: EllipticCurvePublicKey = private_key.public_key()
serialized_public: bytes = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
# Send the client identifier when ready.
client.sendall(b"Sample client 1.0.0")
# Get server and its version version (print it). Wait for the server.
server_id: bytes = client.recv(64)
print("Public data:", server_id)
# Send the public ECDH key to the server.
client.sendall(serialized_public)
# Receive the public key of the server.
server_der: bytes = client.recv(158)
server_public = serialization.load_der_public_key(server_der)
# Derive the key.
salt: bytes = hashlib.sha3_256(serialized_public + server_der).digest()
info: bytes = b"Sample client 1.0.0 <=> " + server_id
material: bytes = private_key.exchange(ec.ECDH(), server_public)
key: bytes = HKDF(
algorithm=hashes.SHA3_512(),
length=64,
salt=salt,
info=info,
).derive(material)
nonce: bytes = HKDF(
algorithm=hashes.SHA3_512(),
length=48,
salt=hashlib.sha3_256(salt + key).digest(),
info=info,
).derive(material)
assoc: bytes = HKDF(
algorithm=hashes.SHA3_512(),
length=32,
salt=hashlib.sha3_256(nonce + key + salt).digest(),
info=info,
).derive(material)
print("Key:", key)
print("Nonce:", nonce)
print("Assoc:", assoc)
# Send ciphered data
extra: bytes = os.urandom(32)
sending_pt: bytes = b"Hello, Server!"
padding_size: int = 63 - (len(sending_pt) % 64)
if padding_size > 0:
sending_pt += os.urandom(padding_size)
sending_pt += bytes([padding_size])
client.sendall(
extra
+ ChaCha20Poly1305(
hashlib.blake2s(key + extra, digest_size=32).digest()
).encrypt(
hashlib.blake2s(nonce + extra, digest_size=12).digest(),
sending_pt,
hashlib.blake2b(assoc + extra, digest_size=52).digest(),
)
)
# Receive ciphered data
exct: bytes = client.recv(1024)
ex, ct = exct[:32], exct[32:]
print("Extra:", ex)
print("Cypher-text:", ct)
recv_pt: bytes = ChaCha20Poly1305(
hashlib.blake2s(key + ex, digest_size=32).digest()
).decrypt(
hashlib.blake2s(nonce + ex, digest_size=12).digest(),
ct,
hashlib.blake2b(assoc + ex, digest_size=52).digest(),
)
recv_pt = recv_pt[: -(recv_pt[-1] + 1)]
print(
"Decrypted:",
recv_pt,
)
# Close connections.
client.close()
return 0
if __name__ == "__main__":
assert main.__annotations__.get("return") is int, "main() should return an integer"
filter_warnings("error", category=Warning)
raise SystemExit(main())
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""SNAPI example server"""
import hashlib
import os
import socket
from warnings import filterwarnings as filter_warnings
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.ec import (
EllipticCurvePrivateKey, EllipticCurvePublicKey)
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
HOST: str = "127.0.0.1"
PORT: int = 3838
def main() -> int:
"""entry / main function"""
server: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((HOST, PORT))
server.listen(1)
print(f"Server is listening on {HOST}:{PORT}")
conn, addr = server.accept()
print(f"Incoming connection from {addr}")
# Generate server's ECDH key.
private_key: EllipticCurvePrivateKey = ec.generate_private_key(ec.SECP521R1())
public_key: EllipticCurvePublicKey = private_key.public_key()
serialized_public: bytes = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
# Wait for the client to be ready. Get the client and its version.
client_id: bytes = conn.recv(64)
print("Public data:", client_id)
# Send the server's identifier.
conn.sendall(b"Sample server 1.0.0")
# Receive the public ECDH key of the client.
client_der: bytes = conn.recv(158)
client_public = serialization.load_der_public_key(client_der)
# Send the public ECDH key to the client.
conn.sendall(serialized_public)
# Derive the key.
salt: bytes = hashlib.sha3_256(client_der + serialized_public).digest()
info: bytes = client_id + b" <=> Sample server 1.0.0"
material: bytes = private_key.exchange(ec.ECDH(), client_public)
key: bytes = HKDF(
algorithm=hashes.SHA3_512(),
length=64,
salt=salt,
info=info,
).derive(material)
nonce: bytes = HKDF(
algorithm=hashes.SHA3_512(),
length=48,
salt=hashlib.sha3_256(salt + key).digest(),
info=info,
).derive(material)
assoc: bytes = HKDF(
algorithm=hashes.SHA3_512(),
length=32,
salt=hashlib.sha3_256(nonce + key + salt).digest(),
info=info,
).derive(material)
print("Key:", key)
print("Nonce:", nonce)
print("Assoc:", assoc)
# Receive ciphered data.
exct: bytes = conn.recv(1024)
ex, ct = exct[:32], exct[32:]
print("Extra:", ex)
print("Cypher-text:", ct)
recv_pt: bytes = ChaCha20Poly1305(
hashlib.blake2s(key + ex, digest_size=32).digest()
).decrypt(
hashlib.blake2s(nonce + ex, digest_size=12).digest(),
ct,
hashlib.blake2b(assoc + ex, digest_size=52).digest(),
)
recv_pt = recv_pt[: -(recv_pt[-1] + 1)]
print(
"Decrypted:",
recv_pt,
)
# Send ciphered data.
extra: bytes = os.urandom(32)
sending_pt: bytes = b"Hello, Client!"
padding_size: int = 63 - (len(sending_pt) % 64)
if padding_size > 0:
sending_pt += os.urandom(padding_size)
sending_pt += bytes([padding_size])
conn.sendall(
extra
+ ChaCha20Poly1305(
hashlib.blake2s(key + extra, digest_size=32).digest()
).encrypt(
hashlib.blake2s(nonce + extra, digest_size=12).digest(),
sending_pt,
hashlib.blake2b(assoc + extra, digest_size=52).digest(),
)
)
# Close connections.
conn.close()
server.close()
return 0
if __name__ == "__main__":
assert main.__annotations__.get("return") is int, "main() should return an integer"
filter_warnings("error", category=Warning)
raise SystemExit(main())
@TruncatedDinoSour
Copy link
Author

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment