Last active
April 22, 2024 05:28
-
-
Save CrackerHax/61882cf814cde4d9cbc6f5a709e51c34 to your computer and use it in GitHub Desktop.
Python script to get all Solana NFT metadata media assets from a users wallet (gif, png, mp4, fbx, etc) and cache to redis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from solana.publickey import PublicKey | |
from solana.rpc.api import Client | |
from solana.rpc.types import TokenAccountOpts | |
import base64 | |
import base58 | |
import struct | |
import json | |
import requests | |
import redis | |
#TODO: get your own solana rpc node | |
#devnet | |
solana_client = Client("https://api.devnet.solana.com") | |
METADATA_PROGRAM_ID = PublicKey('metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s') | |
def get_nft_pda(mint_key): | |
return(PublicKey.find_program_address([b'metadata', bytes(METADATA_PROGRAM_ID), bytes(PublicKey(mint_key))],METADATA_PROGRAM_ID)[0]) | |
R = redis.Redis(host='127.0.0.1', port= 6379, db =0, charset="utf-8", decode_responses=True) | |
def unpack_metadata_account(data): | |
assert(data[0] == 4) | |
i = 1 | |
source_account = base58.b58encode(bytes(struct.unpack('<' + "B"*32, data[i:i+32]))) | |
i += 32 | |
mint_account = base58.b58encode(bytes(struct.unpack('<' + "B"*32, data[i:i+32]))) | |
i += 32 | |
name_len = struct.unpack('<I', data[i:i+4])[0] | |
i += 4 | |
name = struct.unpack('<' + "B"*name_len, data[i:i+name_len]) | |
i += name_len | |
symbol_len = struct.unpack('<I', data[i:i+4])[0] | |
i += 4 | |
symbol = struct.unpack('<' + "B"*symbol_len, data[i:i+symbol_len]) | |
i += symbol_len | |
uri_len = struct.unpack('<I', data[i:i+4])[0] | |
i += 4 | |
uri = struct.unpack('<' + "B"*uri_len, data[i:i+uri_len]) | |
i += uri_len | |
fee = struct.unpack('<h', data[i:i+2])[0] | |
i += 2 | |
has_creator = data[i] | |
i += 1 | |
creators = [] | |
verified = [] | |
share = [] | |
if has_creator: | |
creator_len = struct.unpack('<I', data[i:i+4])[0] | |
i += 4 | |
for _ in range(creator_len): | |
creator = base58.b58encode(bytes(struct.unpack('<' + "B"*32, data[i:i+32]))) | |
creators.append(creator) | |
i += 32 | |
verified.append(data[i]) | |
i += 1 | |
share.append(data[i]) | |
i += 1 | |
primary_sale_happened = bool(data[i]) | |
i += 1 | |
is_mutable = bool(data[i]) | |
metadata = { | |
"update_authority": source_account, | |
"mint": mint_account, | |
"data": { | |
"name": bytes(name).decode("utf-8").strip("\x00"), | |
"symbol": bytes(symbol).decode("utf-8").strip("\x00"), | |
"uri": bytes(uri).decode("utf-8").strip("\x00"), | |
"seller_fee_basis_points": fee, | |
"creators": creators, | |
"verified": verified, | |
"share": share, | |
}, | |
"primary_sale_happened": primary_sale_happened, | |
"is_mutable": is_mutable, | |
} | |
return metadata | |
def get_nftspk(pkey): | |
try: | |
opts = TokenAccountOpts(program_id="TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA", encoding="jsonParsed") | |
resp = solana_client.get_token_accounts_by_owner(pkey, opts) | |
list = resp["result"]["value"] | |
out = [] | |
for tok in list: | |
program = tok["account"]["data"]["program"] | |
amount = int(tok["account"]["data"]["parsed"]["info"]["tokenAmount"]["amount"]) | |
if(program == 'spl-token' and amount >= 1): | |
mint = tok["account"]["data"]["parsed"]["info"]["mint"] | |
out.append(mint) | |
return(out) | |
except Exception as e: | |
print('error:', e) | |
return None | |
def get_metadata(mint_key): | |
data = base64.b64decode(solana_client.get_account_info(get_nft_pda(mint_key))['result']['value']['data'][0]) | |
return(unpack_metadata_account(data)) | |
def get_nft(mint_key): | |
files = {} | |
out = {} | |
try: | |
meta = get_metadata(mint_key)['data'] | |
json = requests.get(meta['uri']).json() | |
except: | |
return #probably not a nft | |
try: | |
out["name"] = meta["name"] | |
except: | |
return | |
try: | |
f = json['properties']['files'] | |
except: | |
return(out) | |
for file in f: | |
if(file["type"] == "image/gif" or file["type"] == "image/png" or file["type"] == "fbx"or file["type"] == "video/mp4" ): | |
files[file["uri"]] = file["type"] | |
out["files"] = files | |
out["id"] = mint_key | |
return(out) | |
def get_nft_media_from_wallet(wallet: str): | |
nfts = {} | |
mintlist = get_nftspk(wallet) | |
# debug on single nft | |
#print(get_nft(mintlist[0])) | |
#return | |
for mint in mintlist: # go through list getting what we can from redis | |
if R.exists(mint): | |
nfts[mint] = json.loads(R.hget(mint, "data")) | |
else: # whatever we didnt get from redis we must query through api and send it to redis | |
try: | |
nfts[mint] = get_nft(mint) | |
except: | |
continue | |
R.hset(mint,"data", json.dumps(nfts[mint])) | |
return json.dumps(nfts) | |
If the wallet has a lot of NFTs the first run can be slow but once cached it is nearly instant. If you use this on a site or something you may want to provide a refresh button for each NFT otherwise they will never refresh with new metadata.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example Output:
{"HhuPWiTbRko5GoaGm8iSFwwT1UifohKCj16y8mFagGrP": {"name": "Pixelinvader #2169", "files": {"https://bafybeiafzr6zfnonsktfeu356ihnpvsma4v3v6okmb3kn7om4hqrk6sz5a.ipfs.dweb.link/1548.png?ext=png": "image/png"}, "id": "HhuPWiTbRko5GoaGm8iSFwwT1UifohKCj16y8mFagGrP"}}