Skip to content

Instantly share code, notes, and snippets.

@OutRite
Last active May 16, 2024 23:14
Show Gist options
  • Save OutRite/09ae4c231c9be44ffa6010cddcbe76d2 to your computer and use it in GitHub Desktop.
Save OutRite/09ae4c231c9be44ffa6010cddcbe76d2 to your computer and use it in GitHub Desktop.
# CLIpclops
import requests
import json
import sys
import base64
import getpass
pds = 'put the DM endpoint domain here'
handle_cache = {}
session = requests.Session()
real_pds = None
def resolve_did(did, session=session):
if did in handle_cache:
return handle_cache[did]
# there doesn't seem to be a lexicon for this, so we can't get the correct handle (ignoring xnbug) of the did according to the appview
headers = {'User-Agent': 'SkyBox/0.1 CLIpclops/1.1'}
j = None
if 'did:web:' in did:
# Resolve did:web to handle
try:
r = session.get(f'https://{did[8:]}/.well-known/did.json', headers=headers)
j = r.json()
except:
j = {}
else:
# Resolve did:plc to handle
r = session.get(f'https://plc.directory/{did}')
j = r.json()
if j and 'alsoKnownAs' in j and len(j['alsoKnownAs']) > 0 and len(j['alsoKnownAs'][0]) > 5:
handle = j['alsoKnownAs'][0][5:]
elif j and 'handle' in j: # legacy DID format
handle = j['handle']
else:
handle = did
pds = None
if j and 'service' in j:
for service in j['service']:
if service['type'] == 'AtprotoPersonalDataServer':
pds = service['serviceEndpoint'].split('://')[-1] # We're doing [-1] here just in case the endpoint is "pds.example.com" instead of "https://pds.example.com"
handle = handle.replace('\n', '').replace(' ', '').replace('\x1b', '') # basic anti-me code. should be replaced with a proper character allowlist
if 'did:plc' in did: # attempt to account for samehandle. you can still get around this if you have a 10-char plc collision though. =(
handle += f' ({did[4:8+10]})'
else:
handle += f' ({did[4:]})'
handle_cache[did] = (handle, pds)
return (handle, pds)
def xrpc(id, data, method, pds='bsky.social', session=requests, plain_auth=None):
headers = {'User-Agent': 'SkyBox/0.1 CLIpclops/1.1'}
if plain_auth is not None:
headers['Authorization'] = plain_auth
if real_pds is not None and pds != 'bsky.social':
proxy_pds = pds
headers['atproto-proxy'] = 'did:web:'+proxy_pds+'#bsky_chat'
pds = real_pds
if method.lower() == 'get': # FIXME: There's probably a nicer way to do this with requests
url='https://'+pds+'/xrpc/'+id+'?'
for key in data:
url += key+'='+str(data[key])+'&'
url = url[:-1]
r=session.get(url, headers=headers)
else:
r=session.request(method, 'https://'+pds+'/xrpc/'+id, json=data, headers=headers)
return r.text
def send_message(source, target, message, embed=None):
r=xrpc('chat.bsky.convo.getConvoForMembers', {'members': target}, 'get', pds=pds, plain_auth=source)
j=json.loads(r)
id = j['convo']['id']
# print(f"convo: {id} {source}->{target} - {message}")
if embed:
r=xrpc('chat.bsky.convo.sendMessage', {'convoId': id, 'message': {'text': message, 'embed': embed}}, 'post', pds=pds, plain_auth=source)
pass
else:
r=xrpc('chat.bsky.convo.sendMessage', {'convoId': id, 'message': {'text': message}}, 'post', pds=pds, plain_auth=source)
pass
def get_did(username):
j=json.loads(xrpc('com.atproto.identity.resolveHandle', {'handle': username}, 'get'))
return j['did']
def menu(login_did):
while True:
j=xrpc('chat.bsky.convo.listConvos', {}, 'get', pds=pds, plain_auth=login_did)
j=json.loads(j)
ids = []
if 'convos' not in j:
print(j)
mx=0
else:
mx = len(j['convos'])
for convo_i in range(mx):
convo=j['convos'][convo_i]
ids.append(convo['id'])
members=[]
for member in convo['members']:
members.append(member['handle'])
print(f"[{convo_i}] - {', '.join(members)}")
print(f"[{mx}] - New chat")
print(f"[{mx+1}] - Exit")
x = None
while x is None:
try:
x=int(input('>>'))
except KeyboardInterrupt:
exit()
except ValueError:
print("err: please select from the options provided.")
if x < len(j['convos']):
id = ids[x]
members = j['convos'][x]['members']
for member in members:
if 'Bearer ' in login_did:
# To get our DID in authenticated mode, we need to decode the JWT.
j=json.loads(base64.urlsafe_b64decode(login_did.split('.')[1]+'====')) # The ==== is a workaround to avoid 'Incorrect padding'
if member['did'] != j['sub']:
target = member['did']
elif member['did'] != login_did:
target = member['did']
conversation(login_did, target, id)
elif x == mx:
# New chat
create_conversation(login_did)
elif x == mx+1:
# Exit
print("Goodbye.")
exit()
def create_conversation(login_did):
target = input("Target: ")
if 'did:' not in target:
target = get_did(target)
j = xrpc('chat.bsky.convo.getConvoForMembers', {'members': target}, 'get', pds=pds, plain_auth=login_did)
j = json.loads(j)
if 'convo' not in j:
if 'error' in j and j['error'] == 'InvalidRequest':
print(f"Failed to start the conversation, {j['message']}.")
else:
print(j)
return
conversation(login_did, target, j['convo']['id'])
def conversation(login_did, target, id):
while 1:
j=xrpc('chat.bsky.convo.getMessages', {'convoId': id, 'limit': 50}, 'get', pds=pds, plain_auth=login_did)
j=json.loads(j)
messages=j['messages'][::-1] # this is in reverse, ig so you can get the 50 most recent or so. odd
for message in messages:
sender, _ = resolve_did(message['sender']['did'])
text = message['text'].replace('\x1b', '\x1b[38;5;12m\\x1b\x1b[38;0;m')
print(f"[{sender}] {text}")
sent=False
while not sent:
nm=input(f">> ").replace('\\x1b', '\x1b')
if nm.startswith('/'):
fullcomm = nm[1:].split(' ')
command = fullcomm[0]
if command == 'help':
print('/help - Shows this help message')
print('/refresh - Refreshes the chat')
print('/send [message] - Sends a message, including ones that start with "/"')
print('/quit - Exit the current chat (Alias: /exit)')
print('/embed [did] [rkey] - Embeds a post into the chat')
elif command == 'refresh':
sent=True
elif command == 'send':
send_message(login_did, target, ' '.join(fullcomm[1:]))
sent=True
elif command == 'quit' or command == 'exit':
return
elif command == 'embed':
at_uri = f"at://{fullcomm[1]}/app.bsky.feed.post/{fullcomm[2]}"
cid='bafyreibidezzodaxz3acl2ovg576u5hixzs7re4qjtfgidiqgbks2plw5i' # random CID since it probably doesn't matter
ref = {'$type':'com.atproto.repo.strongRef', 'uri': at_uri, 'cid': cid}
embed={'$type': 'app.bsky.embed.record', 'record': ref}
send_message(login_did, target, '', embed=embed)
else:
if len(nm)>0:
send_message(login_did, target, nm)
sent=True
#new_messages, cursor = get_new_messages(login_did, id, cursor) # TODO: actually add this so we don't need to redisplay all messages =)
if __name__ == '__main__':
print("CLIpclops - Bsky DM client")
print("Use /help in any conversation to access the help menu")
if len(sys.argv)>1:
login_did = sys.argv[1]
else:
login_did = input("Username/DID: ")
if 'did:' not in login_did:
# Handle?
login_did = get_did(login_did)
# Check if the API supports did-only login
r = xrpc('chat.bsky.convo.listConvos', {}, 'get', pds=pds, plain_auth=login_did)
j = json.loads(r)
if 'error' in j and j['error'] == 'AuthenticationRequired':
# An actual login is required to use this API
print("This service requires you to login.")
password = getpass.getpass("Enter your app password: ")
# First, we need to get the PDS of the user.
_, user_pds = resolve_did(login_did)
if user_pds is None:
print("This account doesn't seem to have a PDS attached. Did you type your DID or handle correctly?")
exit()
# Next, we need to login with the password.
r = xrpc('com.atproto.server.createSession', {'identifier': login_did, 'password': password}, 'post', pds=user_pds)
j = json.loads(r)
if 'accessJwt' not in j:
# Login failed. Incorrect password?
print("Failed to login. Please double-check your username and password.")
exit()
token = j['accessJwt']
login_did = 'Bearer '+token
real_pds = user_pds
menu(login_did)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment