Skip to content

Instantly share code, notes, and snippets.

@kraftwerk28
Created December 6, 2022 23:40
Show Gist options
  • Save kraftwerk28/89cd0c62c7deb55f48a3cea6a057b4d6 to your computer and use it in GitHub Desktop.
Save kraftwerk28/89cd0c62c7deb55f48a3cea6a057b4d6 to your computer and use it in GitHub Desktop.
aioftp
import sys
from asyncio import open_connection, run
import aiofiles
async def main(args):
reader, writer = await open_connection(port=8080)
if len(args) >= 2 and args[0] == "--send":
writer.write(b"send\n")
_, filename = args
writer.write(f"{filename}\n".encode())
async with aiofiles.open(filename, "rb") as file:
contents = await file.read()
writer.write(f"{len(contents)}\n".encode())
writer.write(contents)
await writer.drain()
writer.close()
elif len(args) >= 2 and args[0] == "--receive":
writer.write(b"receive\n")
_, filename = args
writer.write(f"{filename}\n".encode())
file_size_or_err = await reader.readline()
try:
file_size = int(file_size_or_err)
except ValueError:
print(file_size_or_err.decode())
return
contents = await reader.read(file_size)
async with aiofiles.open(filename, "wb") as file:
await file.write(contents)
writer.close()
elif len(args) >= 1 and args[0] == "--list":
writer.write(b"list\n")
file_count = int(await reader.readline())
for _ in range(file_count):
line = await reader.readline()
print(line.decode().strip())
else:
raise Exception("Invalid argument")
print("Invalid argument")
if __name__ == "__main__":
run(main(sys.argv[1:]))
from asyncio import StreamReader, StreamWriter, run, start_server
files = {}
async def on_client(reader: StreamReader, writer: StreamWriter):
command = (await reader.readline()).decode().strip()
print(f"command: '{command}'")
if command == "send":
filename = (await reader.readline()).decode().strip()
file_size = int(await reader.readline())
print(f"receiving '{filename}' ({file_size} bytes)")
contents = await reader.read(file_size)
files[filename] = contents
writer.close()
elif command == "receive":
filename = (await reader.readline()).decode().strip()
if filename not in files:
writer.write(b"not found")
await writer.drain()
writer.close()
return
contents = files[filename]
file_size = len(contents)
print(f"sending '{filename}' ({file_size} bytes)")
writer.write(f"{file_size}\n".encode())
writer.write(contents)
await writer.drain()
writer.close()
elif command == "list":
writer.write(f"{len(files)}\n".encode())
for k, v in files.items():
writer.write(f"{k} ({len(v)} bytes)\n".encode())
await writer.drain()
writer.close()
writer.close()
async def main():
server = await start_server(on_client, port=8080)
async with server:
await server.serve_forever()
if __name__ == "__main__":
run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment