Created
July 3, 2024 15:29
-
-
Save kernoelpanic/423c61f90e81e4c9d473ff6fda783559 to your computer and use it in GitHub Desktop.
Python script for recovering the secp256k1 public key from an ethereum (mainnet) transaction hash.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# | |
# Command line script that recovers the secp256k1 public key | |
# from an ethereum transaction hash. | |
# Requires a HTTP URL of an ethereum node (geth) to connect to | |
# to fetch the transaction information. | |
# | |
# Other requirements are: | |
# $ cat requirements.txt | |
# cryptography | |
# ecdsa | |
# safe-pysha3 | |
# secp256k1 | |
# pytest | |
# web3 | |
# eth-accoun | |
# | |
# Example usage with some random tx: | |
# $ python eth_ecrecover.py tx --http-url http://172.22.0.1:8545 --tx-hash 0x7b2b3dfd8b320f844b8bca5b04ee2193c499e1bdda862b6d9511807469f5e083 | |
# 0x7b2b3dfd8b320f844b8bca5b04ee2193c499e1bdda862b6d9511807469f5e083 | |
# for type hints | |
from typing import Tuple | |
from collections.abc import Iterator | |
# eth addr | |
from sha3 import keccak_256 | |
# ecc arithmetic | |
from ecdsa.curves import SECP256k1 # https://pypi.org/project/ecdsa/ | |
from ecdsa.ellipticcurve import Point | |
# web3 EC | |
import web3 | |
from eth_account._utils.signing import extract_chain_id, to_standard_v | |
from eth_account._utils.legacy_transactions import serializable_unsigned_transaction_from_dict | |
# rlp encoding for ethereum tx | |
import rlp | |
from rlp.sedes import ( | |
Binary, | |
big_endian_int, | |
binary, | |
List, | |
) | |
type_0_keys = [ | |
# 'chainId', # not included in legacy transactions, added on demand in code later | |
'gas', | |
'gasPrice', | |
'nonce', | |
'to', | |
'value'] | |
type_1_keys = [ | |
"to", | |
"nonce", | |
"value", | |
"gas", | |
'gasPrice', | |
"chainId", | |
"type"] | |
type_2_keys = [ | |
"to", | |
"nonce", | |
"value", | |
"gas", | |
"chainId", | |
"maxFeePerGas", | |
"maxPriorityFeePerGas", | |
"type"] | |
type_3_keys = [ | |
"to", | |
"nonce", | |
"value", | |
"gas", | |
"chainId", | |
"maxFeePerGas", | |
"maxPriorityFeePerGas", | |
"maxFeePerBlobGas", | |
"blobVersionedHashes", | |
"type"] | |
def _secp256k1_decompress(pubkey: bytes) -> bytes: | |
""" Decompress a secp256k1 public key. | |
For further information see: https://bitcoin.stackexchange.com/questions/86234/how-to-uncompress-a-public-key | |
:param pubkey: The secp256k1 public key in compressed format given as bytes | |
:return: The secp256k1 public key in uncompressed format given as bytes | |
""" | |
if not isinstance(pubkey, bytes): | |
raise ValueError("Input pubkey must be bytes") | |
if len(pubkey) != 33: | |
raise ValueError("Input pubkey must be 33 bytes long, if it is 65 bytes long it is probably uncompressed") | |
p = 0x_FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F | |
x = int.from_bytes(pubkey[1:33], byteorder='big') | |
y_sq = (pow(x, 3, p) + 7) % p # y^2 = x^3 + 7 (mod p) | |
y = pow(y_sq, (p + 1) // 4, p) # quadratic residue | |
if y % 2 != pubkey[0] % 2: | |
# check against the first byte to identify the correct | |
# y out of the two possibel values y and -y | |
y = p - y | |
y = y.to_bytes(32, byteorder='big') | |
return b'\x04' + pubkey[1:33] + y | |
def _secp256k1_compress(pubkey: bytes) -> bytes: | |
""" Compress a secp256k1 public key. | |
:param pubkey: The secp256k1 public key in uncompressed format given as bytes | |
:return: The secp256k1 public key in comprssed format given as bytes | |
""" | |
if not isinstance(pubkey, bytes): | |
raise ValueError("Input pubkey must be bytes") | |
if len(pubkey) != 65: | |
raise ValueError("Input pubkey must be 65 bytes long, if it is 33 bytes long it is probably already compressed") | |
x_bytes, y_bytes = _secp256k1_extract_coordinates(pubkey) | |
if isinstance(x_bytes, bytes) and isinstance(y_bytes, bytes): | |
y = int.from_bytes(y_bytes, "big") | |
x = int.from_bytes(x_bytes, "big") | |
else: | |
raise ValueError("x and y must be bytes!") | |
prefix = b'\x02' if y % 2 == 0 else b'\x03' | |
compressed_public_key = prefix + x.to_bytes(32, 'big') | |
return compressed_public_key | |
def _secp256k1_extract_coordinates(pubkey: bytes) -> Tuple[bytes, bytes]: | |
""" Extract the x and y coordinates of a secp256k1 public key. | |
:param pubkey: The secp256k1 public key in either compressed or uncompressed format given as bytes | |
:return: A tuple of (x,y) given as bytes | |
""" | |
if not isinstance(pubkey,bytes): | |
raise ValueError("pubkey given as bytes expected") | |
if len(pubkey) == 33: | |
decomp_pubkey = _secp256k1_decompress(pubkey) | |
elif len(pubkey) == 65: | |
decomp_pubkey = pubkey | |
else: | |
raise ValueError("Invalid length, if not 33 or 65 bytes its probably not a compressed or uncompressed key") | |
x_bytes = decomp_pubkey[1:33] | |
y_bytes = decomp_pubkey[-32:] | |
return (x_bytes,y_bytes) | |
def _secp256k1_pubkey_to_eth_addr(pubkey: bytes) -> bytes: | |
""" Generate std. Ethereum address out of uncompressed public key | |
:param pubkey: The secp256k1 public key in uncomressed format given as bytes | |
(note that no check is performed if the key is really uncompressed) | |
:return: The Ethereum address as bytes without "0x" prefix and without checksum. | |
""" | |
return keccak_256(pubkey[1:]).digest()[-20:] | |
def is_valid_secp256k1_pubkey(pk: bytes) -> bool: | |
""" Takes a compressed or uncompressed secp256k1 public key and checks if it is a valid public key for that curve | |
For more information see SEC1v2 https://www.secg.org/sec1-v2.pdf#subsubsection.3.2.2 | |
""" | |
x_bytes,y_bytes = _secp256k1_extract_coordinates(pk) | |
x = int.from_bytes(x_bytes, byteorder='big') | |
y = int.from_bytes(y_bytes, byteorder='big') | |
#print(x) | |
#print(y) | |
# Check if pk is point at infinity (encoded as 0x00 usually) | |
if x == 0 or y == 0: | |
return False | |
# Check if coordinates are greater or equal p | |
if x >= SECP256k1.curve.p() or y >= SECP256k1.curve.p(): | |
return False | |
# Check if point on curve | |
if (y**2 - (x**3 + 7)) % SECP256k1.curve.p() != 0: | |
return False | |
# Check if n*pk is point at infinity useing ecdsa libarary | |
# (would not be needed if cofactor h = 1, as with secp256k1) | |
point_Q = Point(SECP256k1.curve, x, y) | |
point_O = SECP256k1.generator.order() * point_Q | |
if point_O.x() is None and point_O.y() is None: | |
return True | |
return False | |
def extract_tx_from_archive_node(http_url: str, tx_hash: str): | |
""" Extract a secp256k1 public key from an ethereum transaction hash given a HTTP URL of a queryable archive node (geth). | |
:param http_url: The URL for the ethereum node (geth) to query | |
:param tx_hash: The ethereum transaction hash of the transaction to recover the public key from. | |
:return: Dictionary that contains the public key and data about the transaction. | |
""" | |
w3 = web3.Web3(web3.HTTPProvider(http_url)) | |
#tx = w3.eth.get_transaction("0x7f21e0d18374e2707458c72b0ce2510b2781e9b5db0789b2b68a8b44c2371a3a") | |
tx = w3.eth.get_transaction(tx_hash) | |
tx = dict(tx) | |
sig = w3.eth.account._keys.Signature(vrs=( | |
to_standard_v(extract_chain_id(tx["v"])[1]), | |
w3.to_int(tx["r"]), | |
w3.to_int(tx["s"]) | |
)) | |
resp = signature_to_pk(sig, tx) | |
return resp | |
def signature_to_pk(sig: Tuple[int, int, int], tx): | |
""" Recover the secp256k1 public key from the given signature and transaction data. | |
:param sig: Tuple of (v,r,s) of the ECDSA signature | |
:param tx: Dict of transaction data used to re-create the hash of the unsigned transaction | |
that was used to create the signature (i.e., the signed message). | |
Note that this is not the transaction hash used to identify the tx (this includes the signature) | |
:return: Dictionary that contains the public key and data about the transaction. | |
""" | |
resp = dict() | |
try: | |
resp["blockNumber"] = tx["blockNumber"] | |
resp["hash"] = tx["hash"].hex() | |
resp["type"] = tx["type"] | |
resp["from"] = tx["from"] | |
keys_to_get = list() | |
if tx["type"] == 0: | |
keys_to_get = type_0_keys | |
elif tx["type"] == 1: | |
keys_to_get = type_1_keys | |
elif tx["type"] == 2: | |
keys_to_get = type_2_keys | |
elif tx["type"] == 3: | |
keys_to_get = type_3_keys | |
else: | |
resp["pk"] = "" | |
resp["error"] = f"Unsupported tx type: {tx['type']}" | |
resp["valid"] = False | |
return resp | |
tt = {k:tx[k] for k in keys_to_get} | |
if tx["type"] == 0 and tx["v"] != 27 and tx["v"] != 28: | |
# EIP155 unaffected values when y parity {0,1} + 27 | |
# i.e., if v = 27 or v = 28 | |
# Tx with these values do not include a dedicated "chainId" | |
# Therefore, it has to be set manually on those tx | |
# https://eips.ethereum.org/EIPS/eip-155 | |
tt["chainId"] = 1 | |
tt["data"] = tx["input"] # 'data' is called 'input' in the web3 API | |
if "accessList" in tx and len(tx["accessList"]) > 0: | |
# non-empty access list: | |
# The access lists are currently not handled correctly | |
# by serializable_unsigned_transaction_from_dict so this code | |
# created the unsigned transaction hash for these tx instead | |
tx_payload = list() | |
access_list = list() | |
for j in tx["accessList"]: | |
storage_keys = [bytes.fromhex(k.hex()[2:]) for k in j["storageKeys"]] | |
#access_tuple = AccessTuple(address=bytes.fromhex(j["address"][2:]), storageKeys=storage_key) | |
access_tuple = [bytes.fromhex(j["address"][2:]), storage_keys] | |
access_list.append(access_tuple) | |
if tx["type"] == 0x01: | |
tx_payload.extend(( | |
tx["chainId"], | |
tx["nonce"], | |
tx["gasPrice"], | |
tx["gas"], | |
bytes.fromhex(tx["to"][2:]), | |
tx["value"], | |
tx["input"], | |
access_list, | |
#bytes.fromhex(tx["input"].hex()[2:]), | |
)) | |
elif tx["type"] == 0x02: | |
tx_payload.extend(( | |
tx["chainId"], | |
tx["nonce"], | |
tx["maxPriorityFeePerGas"], | |
tx["maxFeePerGas"], | |
tx["gas"], | |
bytes.fromhex(tx["to"][2:]), | |
tx["value"], | |
tx["input"], | |
access_list, | |
#bytes.fromhex(tx["input"].hex()[2:]), | |
)) | |
elif tx["type"] == 0x03: | |
tx_payload.extend(( | |
tx["chainId"], | |
tx["nonce"], | |
tx["maxPriorityFeePerGas"], | |
tx["maxFeePerGas"], | |
tx["gas"], | |
bytes.fromhex(tx["to"][2:]), | |
tx["value"], | |
tx["input"], | |
access_list, | |
tx["maxFeePerBlobGas"], | |
tx["blobVersionedHashes"], | |
#bytes.fromhex(tx["input"].hex()[2:]), | |
)) | |
ut_hash = keccak_256(tx["type"].to_bytes(1,byteorder="big") + rlp.encode(tx_payload)).digest() | |
else: | |
# empty accessList: | |
ut = serializable_unsigned_transaction_from_dict(tt) | |
ut_hash = ut.hash() | |
public_key = sig.recover_public_key_from_msg_hash(ut_hash) | |
from_address = public_key.to_checksum_address() | |
#print(type(public_key)) | |
resp["pk"] = "0x04" + str(public_key)[2:] | |
if not is_valid_secp256k1_pubkey(b'\x04' + bytes.fromhex(str(public_key)[2:])): | |
resp["error"] = f"Invalid secp256k1 public key {bytes.fromhex(str(public_key)[2:])}" | |
resp["valid"] = False | |
elif from_address != tx["from"]: | |
resp["error"] = f"From address mismatch: {from_address} with ut hash: {ut_hash}" | |
resp["valid"] = False | |
else: | |
resp["error"] = "" | |
resp["valid"] = True | |
except Exception as e: | |
error_msg = f"An exception occurred: {type(e).__name__} - {e} for transaction: {tx} and signature: {sig}" | |
logging.error(error_msg) | |
resp["error"] = error_msg | |
resp["valid"] = False | |
return resp | |
def main(): | |
parser = argparse.ArgumentParser(description="Ethereum pubkey extraction tool") | |
subparsers = parser.add_subparsers(dest="command", required=True) | |
# Subparser for cli 'block' command | |
parser_generate = subparsers.add_parser('block', help='Extract Ethereum public keys') | |
parser_generate.add_argument('--http-url', type=str, default="http://localhost:8545", help='HTTP API url of archival node and port e.g., http://localhost:8545') | |
parser_generate.add_argument('--start-block', type=int, required=True, help='Start block number') | |
parser_generate.add_argument('--end-block', type=int, default=0, help='End block number') | |
# Subparser for cli 'tx' command | |
parser_generate = subparsers.add_parser('tx', help='Extract Ethereum public keys') | |
parser_generate.add_argument('--http-url', type=str, default="http://localhost:8545", help='HTTP API url of archival node and port e.g., http://localhost:8545') | |
parser_generate.add_argument('--tx-hash', type=str, required=True, help='Single tx hash as string 0x....') | |
args = parser.parse_args() | |
if args.command == 'block': | |
extract_block_from_archive_node(args.http_url, args.start_block, args.end_block) | |
elif args.command == 'tx': | |
print(extract_tx_from_archive_node(args.http_url, args.tx_hash)) | |
if __name__ == "__main__": | |
main() | |
# Test with some random transaction ids representing corner cases: | |
# Adapt the HTTP_URL to the geth archive node first then run: | |
# $ python -m pytest -v -s THISFILE | |
HTTP_URL="http://172.22.0.1:8545" | |
def test_type_0_with_chainId(): | |
resp = extract_tx_from_archive_node(HTTP_URL,"0x3bc58b62d0e0b17d2fd1513cb015fa1ee391448725033314732fe7c17630116d") | |
print(type(resp)) | |
print(resp) | |
assert resp["error"] == "" | |
assert resp["valid"] == True | |
assert resp["pk"].lower() == "0x044ce79c7ee22ef0d6c8eb38f1a695448166d52dbcd9332f8c6d187d414ad24eb1e596617fea0a787f48b868a57e31293b3587ffb23b095e3c2288824aafec5a7f" | |
assert resp["from"].lower() == "0x7bb0CA0C21dC8DC530913BC8BD17087276FD30DA".lower() | |
def test_type_0_without_chainId(): | |
resp = extract_tx_from_archive_node(HTTP_URL,"0xbbef75a593ae50fda372b7f9faca4a94daa9b8f9e35788fb9aaa4e981bcc59de") | |
print(type(resp)) | |
print(resp) | |
assert resp["error"] == "" | |
assert resp["valid"] == True | |
assert resp["pk"].lower() == "0x0476dd7a566ea736121d0a1a676d9cceb412e01a194dbb83f09a766e3abc6a9ca162e1024ae8570b524d54803961dfb9055768cb29b27c8ce1d501b5f25dc4da70" | |
assert resp["from"].lower() == "0xf0bC028Ac6c0265F5724Ec09d3A427B6b8898D21".lower() | |
def test_type_1_with_empty_accessList(): | |
resp = extract_tx_from_archive_node(HTTP_URL,"0xd9178098b113eab23ecea6cc4d7c5bf0d7d4ec1c6e397d08ff4ba84563cf3dc2") | |
print(type(resp)) | |
print(resp) | |
assert resp["error"] == "" | |
assert resp["valid"] == True | |
assert resp["pk"].lower() == "0x045eae08f69bf4a8396fff0448f751b6ab47a49b012eccf571855b0a5f6bd371cfdedfc2a5604d36bfd57ed87d01adf3f8ae147be2340a1b0477a899778e90f8e0" | |
assert resp["from"].lower() == "0x264bd8291fAE1D75DB2c5F573b07faA6715997B5".lower() | |
def test_type_1_with_accessList(): | |
resp = extract_tx_from_archive_node(HTTP_URL,"0xe1675964fbf07504bfbd6d16f721071f236681bbb16e6b131b1fb109cf143e15") | |
print(type(resp)) | |
print(resp) | |
assert resp["error"] == "" | |
assert resp["valid"] == True | |
assert resp["pk"].lower() == "0x0430829f0f7eec451e6e2ea96bce3d6676018f71e9d34b9b38b953835cd327ae10d099c9c1aa4530fbb23e59b6dad77502dffab51a7c531a4b157f76c7b7ce2086" | |
assert resp["from"].lower() == "0x678111a6cA5749f1744b5E080A855CEC8d631E20".lower() | |
def test_type_2_with_empty_accessList(): | |
resp = extract_tx_from_archive_node(HTTP_URL,"0x285a16daf4161d1a18a28226a011619b33446c627a8adc914352eeeeb9725a7d") | |
print(type(resp)) | |
print(resp) | |
assert resp["error"] == "" | |
assert resp["valid"] == True | |
assert resp["pk"].lower() == "0x044071ba98bea38c5eb09b9fdb22ebc46ec02b33eadd14935a883708d38c71a470b5735b1e509b10ac65fa33005453842fff73bccdcc85318c1a9aa7ad65e19010" | |
assert resp["from"].lower() == "0x817f128Ccf3F8FC943cd9aAFAcd23b91E27668CD".lower() | |
def test_type_2_with_accessList(): | |
resp = extract_tx_from_archive_node(HTTP_URL,"0x26fe1e5b9ceffc1e53c66b81045ea16e95f0222360d0f181b20a40c114dc6feb") | |
print(type(resp)) | |
print(resp) | |
assert resp["error"] == "" | |
assert resp["valid"] == True | |
#assert resp["pk"].lower() == "" | |
#assert resp["from"].lower() == "".lower() | |
def test_type_3_with_empty_accessList(): | |
resp = extract_tx_from_archive_node(HTTP_URL,"0x157b911664d7e306e73d5241db7181bb0056dd3da1caaa5055998730482faae1") | |
print(type(resp)) | |
print(resp) | |
assert resp["error"] == "" | |
assert resp["valid"] == True | |
#assert resp["pk"].lower() == "" | |
#assert resp["from"].lower() == "".lower() | |
def test_type_3_with_accessList(): | |
resp = extract_tx_from_archive_node(HTTP_URL,"0x214db8b9691aa4fb034a2bbe4b369984b9d5dbb4ef9b34747460ca9dde1c6dec") | |
print(type(resp)) | |
print(resp) | |
assert resp["error"] == "" | |
assert resp["valid"] == True | |
#assert resp["pk"].lower() == "" | |
#assert resp["from"].lower() == "".lower() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment