Created
May 17, 2024 16:05
-
-
Save TruncatedDinoSour/8350f07f481b7b137fcd8bfdc3f13086 to your computer and use it in GitHub Desktop.
Example of encrypted an socket connection in Python 3 (For TruncatedDinoSour/armour Standard Network Application Programming Interface (SNAPI) version 0): ECDH (Elliptic Curve Diffie-Hellman) with the SECP521R1 curve, ChaCha20-Poly1305, SHA3, and BLAKE2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
"""Sample SNAPI client""" | |
import hashlib | |
import os | |
import socket | |
from warnings import filterwarnings as filter_warnings | |
from cryptography.hazmat.primitives import hashes, serialization | |
from cryptography.hazmat.primitives.asymmetric import ec | |
from cryptography.hazmat.primitives.asymmetric.ec import ( | |
EllipticCurvePrivateKey, EllipticCurvePublicKey) | |
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 | |
from cryptography.hazmat.primitives.kdf.hkdf import HKDF | |
HOST: str = "127.0.0.1" | |
PORT: int = 3838 | |
def main() -> int: | |
"""entry/main function""" | |
client: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
client.connect((HOST, PORT)) | |
# Generate client's keys. | |
private_key: EllipticCurvePrivateKey = ec.generate_private_key(ec.SECP521R1()) | |
public_key: EllipticCurvePublicKey = private_key.public_key() | |
serialized_public: bytes = public_key.public_bytes( | |
encoding=serialization.Encoding.DER, | |
format=serialization.PublicFormat.SubjectPublicKeyInfo, | |
) | |
# Send the client identifier when ready. | |
client.sendall(b"Sample client 1.0.0") | |
# Get server and its version version (print it). Wait for the server. | |
server_id: bytes = client.recv(64) | |
print("Public data:", server_id) | |
# Send the public ECDH key to the server. | |
client.sendall(serialized_public) | |
# Receive the public key of the server. | |
server_der: bytes = client.recv(158) | |
server_public = serialization.load_der_public_key(server_der) | |
# Derive the key. | |
salt: bytes = hashlib.sha3_256(serialized_public + server_der).digest() | |
info: bytes = b"Sample client 1.0.0 <=> " + server_id | |
material: bytes = private_key.exchange(ec.ECDH(), server_public) | |
key: bytes = HKDF( | |
algorithm=hashes.SHA3_512(), | |
length=64, | |
salt=salt, | |
info=info, | |
).derive(material) | |
nonce: bytes = HKDF( | |
algorithm=hashes.SHA3_512(), | |
length=48, | |
salt=hashlib.sha3_256(salt + key).digest(), | |
info=info, | |
).derive(material) | |
assoc: bytes = HKDF( | |
algorithm=hashes.SHA3_512(), | |
length=32, | |
salt=hashlib.sha3_256(nonce + key + salt).digest(), | |
info=info, | |
).derive(material) | |
print("Key:", key) | |
print("Nonce:", nonce) | |
print("Assoc:", assoc) | |
# Send ciphered data | |
extra: bytes = os.urandom(32) | |
sending_pt: bytes = b"Hello, Server!" | |
padding_size: int = 63 - (len(sending_pt) % 64) | |
if padding_size > 0: | |
sending_pt += os.urandom(padding_size) | |
sending_pt += bytes([padding_size]) | |
client.sendall( | |
extra | |
+ ChaCha20Poly1305( | |
hashlib.blake2s(key + extra, digest_size=32).digest() | |
).encrypt( | |
hashlib.blake2s(nonce + extra, digest_size=12).digest(), | |
sending_pt, | |
hashlib.blake2b(assoc + extra, digest_size=52).digest(), | |
) | |
) | |
# Receive ciphered data | |
exct: bytes = client.recv(1024) | |
ex, ct = exct[:32], exct[32:] | |
print("Extra:", ex) | |
print("Cypher-text:", ct) | |
recv_pt: bytes = ChaCha20Poly1305( | |
hashlib.blake2s(key + ex, digest_size=32).digest() | |
).decrypt( | |
hashlib.blake2s(nonce + ex, digest_size=12).digest(), | |
ct, | |
hashlib.blake2b(assoc + ex, digest_size=52).digest(), | |
) | |
recv_pt = recv_pt[: -(recv_pt[-1] + 1)] | |
print( | |
"Decrypted:", | |
recv_pt, | |
) | |
# Close connections. | |
client.close() | |
return 0 | |
if __name__ == "__main__": | |
assert main.__annotations__.get("return") is int, "main() should return an integer" | |
filter_warnings("error", category=Warning) | |
raise SystemExit(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
"""SNAPI example server""" | |
import hashlib | |
import os | |
import socket | |
from warnings import filterwarnings as filter_warnings | |
from cryptography.hazmat.primitives import hashes, serialization | |
from cryptography.hazmat.primitives.asymmetric import ec | |
from cryptography.hazmat.primitives.asymmetric.ec import ( | |
EllipticCurvePrivateKey, EllipticCurvePublicKey) | |
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 | |
from cryptography.hazmat.primitives.kdf.hkdf import HKDF | |
HOST: str = "127.0.0.1" | |
PORT: int = 3838 | |
def main() -> int: | |
"""entry / main function""" | |
server: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server.bind((HOST, PORT)) | |
server.listen(1) | |
print(f"Server is listening on {HOST}:{PORT}") | |
conn, addr = server.accept() | |
print(f"Incoming connection from {addr}") | |
# Generate server's ECDH key. | |
private_key: EllipticCurvePrivateKey = ec.generate_private_key(ec.SECP521R1()) | |
public_key: EllipticCurvePublicKey = private_key.public_key() | |
serialized_public: bytes = public_key.public_bytes( | |
encoding=serialization.Encoding.DER, | |
format=serialization.PublicFormat.SubjectPublicKeyInfo, | |
) | |
# Wait for the client to be ready. Get the client and its version. | |
client_id: bytes = conn.recv(64) | |
print("Public data:", client_id) | |
# Send the server's identifier. | |
conn.sendall(b"Sample server 1.0.0") | |
# Receive the public ECDH key of the client. | |
client_der: bytes = conn.recv(158) | |
client_public = serialization.load_der_public_key(client_der) | |
# Send the public ECDH key to the client. | |
conn.sendall(serialized_public) | |
# Derive the key. | |
salt: bytes = hashlib.sha3_256(client_der + serialized_public).digest() | |
info: bytes = client_id + b" <=> Sample server 1.0.0" | |
material: bytes = private_key.exchange(ec.ECDH(), client_public) | |
key: bytes = HKDF( | |
algorithm=hashes.SHA3_512(), | |
length=64, | |
salt=salt, | |
info=info, | |
).derive(material) | |
nonce: bytes = HKDF( | |
algorithm=hashes.SHA3_512(), | |
length=48, | |
salt=hashlib.sha3_256(salt + key).digest(), | |
info=info, | |
).derive(material) | |
assoc: bytes = HKDF( | |
algorithm=hashes.SHA3_512(), | |
length=32, | |
salt=hashlib.sha3_256(nonce + key + salt).digest(), | |
info=info, | |
).derive(material) | |
print("Key:", key) | |
print("Nonce:", nonce) | |
print("Assoc:", assoc) | |
# Receive ciphered data. | |
exct: bytes = conn.recv(1024) | |
ex, ct = exct[:32], exct[32:] | |
print("Extra:", ex) | |
print("Cypher-text:", ct) | |
recv_pt: bytes = ChaCha20Poly1305( | |
hashlib.blake2s(key + ex, digest_size=32).digest() | |
).decrypt( | |
hashlib.blake2s(nonce + ex, digest_size=12).digest(), | |
ct, | |
hashlib.blake2b(assoc + ex, digest_size=52).digest(), | |
) | |
recv_pt = recv_pt[: -(recv_pt[-1] + 1)] | |
print( | |
"Decrypted:", | |
recv_pt, | |
) | |
# Send ciphered data. | |
extra: bytes = os.urandom(32) | |
sending_pt: bytes = b"Hello, Client!" | |
padding_size: int = 63 - (len(sending_pt) % 64) | |
if padding_size > 0: | |
sending_pt += os.urandom(padding_size) | |
sending_pt += bytes([padding_size]) | |
conn.sendall( | |
extra | |
+ ChaCha20Poly1305( | |
hashlib.blake2s(key + extra, digest_size=32).digest() | |
).encrypt( | |
hashlib.blake2s(nonce + extra, digest_size=12).digest(), | |
sending_pt, | |
hashlib.blake2b(assoc + extra, digest_size=52).digest(), | |
) | |
) | |
# Close connections. | |
conn.close() | |
server.close() | |
return 0 | |
if __name__ == "__main__": | |
assert main.__annotations__.get("return") is int, "main() should return an integer" | |
filter_warnings("error", category=Warning) | |
raise SystemExit(main()) |
Author
TruncatedDinoSour
commented
May 17, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment