Skip to content

Instantly share code, notes, and snippets.

@indiv0
Created June 3, 2018 00:37
Show Gist options
  • Save indiv0/cd1f124184dd263a036044e3257a3c8f to your computer and use it in GitHub Desktop.
Save indiv0/cd1f124184dd263a036044e3257a3c8f to your computer and use it in GitHub Desktop.
from curio import run
from curio.socket import *
import os
import sys
from common import send_message, read_message, b8zs_encode
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 8080
async def b8zs_client(address, b8zs_str):
sock = socket(AF_INET, SOCK_STREAM)
try:
await sock.connect(address)
except ConnectionRefusedError:
print('Connection refused', address)
return
print('Connected at', address)
s = sock.as_stream()
async with s:
print("Sending request-to send")
await send_message(s, b'request-to send')
message = await read_message(s)
if not message:
return
elif message == b'clear-to send':
print('Received clear-to send')
await send_message(s, (b8zs_str + '\n').encode())
else:
raise ValueError('Invalid message:', message)
message = await read_message(s)
if not message:
return
elif message == b'acknowledged':
print('Received acknowledge')
else:
raise ValueError('Invalid message:', message)
await s.close()
print('Connection closed')
if __name__ == "__main__":
host = os.environ.get('HOST', DEFAULT_HOST)
port = int(os.environ.get('PORT', DEFAULT_PORT))
try:
data = sys.argv[1]
except IndexError:
print('Please input a string of zeroes and ones')
sys.exit(1)
try:
b8zs_str = b8zs_encode(data)
except Exception as e:
print('Exception:', e)
sys.exit(1)
run(b8zs_client, (host, port), b8zs_str)
import re
from enum import Enum
BINARY_STR_REGEX = re.compile(r'^([01]+)$')
B8ZS_STR_REGEX = re.compile(r'^([0+-]+)$')
async def read_message(stream):
return (await stream.readline()).strip()
async def send_message(stream, message):
await stream.write(message + b'\n')
class Polarity(Enum):
Negative = -1
Positive = 1
def is_binary_str(string):
return bool(BINARY_STR_REGEX.search(string))
def is_b8zs_str(string):
return bool(B8ZS_STR_REGEX.search(string))
def b8zs_decode(string):
if not is_b8zs_str(string):
raise ValueError('Invalid B8ZS bitstring')
return string \
.replace("000-+0+-", "00000000") \
.replace("000+-0-+", "00000000") \
.replace("-", "1") \
.replace("+", "1")
def b8zs_encode(string):
if not is_binary_str(string):
raise ValueError('Invalid bitstring:', string)
last_polarity = Polarity.Negative
consecutive_zeroes = 0
s = ""
for i in range(0, len(string)):
c = string[i]
if c == "1":
consecutive_zeroes = 0
if last_polarity is Polarity.Negative:
s += "+"
last_polarity = Polarity.Positive
else:
s += "-"
last_polarity = Polarity.Negative
elif c == "0":
s += "0"
consecutive_zeroes += 1
# An octet of all zeroes is encoded as "000+-0-+" if the last
# voltage pulse preceding was positive.
# An octet of all zeroes is encoded as "000-+0+-" if the last
# voltage pulse preceding was negative.
if consecutive_zeroes == 8:
if last_polarity is Polarity.Positive:
s = s[:-8] + "000+-0-+"
else:
s = s[:-8] + "000-+0+-"
consecutive_zeroes = 0
else:
raise ValueError("Unexpected character found in binary string")
return s
from curio import run, spawn
from curio.socket import *
import os
from common import send_message, read_message, b8zs_decode
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 8080
DEFAULT_MAX_CONNECT_REQUESTS = 5
async def b8zs_server(address, max_connect_requests):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(max_connect_requests)
print('Server listening at', address)
async with sock:
while True:
client, addr = await sock.accept()
print('Connection from', addr)
server = B8ZSServer(client, addr)
await spawn(server.run(), daemon=True)
class B8ZSServer:
"""TODO"""
def __init__(self, sock, addr):
self.sock = sock
self.addr = addr
async def run(self):
s = self.sock.as_stream()
async with s:
message = await read_message(s)
if not message:
return
elif message == b'request-to send':
print('Received request-to send')
await send_message(s, b'clear-to send')
else:
raise ValueError('Invalid message:', message)
message = await read_message(s)
if not message:
return
encoded = message.decode()
print("Encoded data:", encoded)
try:
decoded = b8zs_decode(encoded)
except ValueError:
print('Invalid data:', encoded)
return
print("Decoded data:", decoded)
await send_message(s, b"acknowledged")
print('Connection closed')
await s.close()
if __name__ == "__main__":
host = os.environ.get('HOST', DEFAULT_HOST)
port = int(os.environ.get('PORT', DEFAULT_PORT))
max_connect_requests = os.environ.get('MAX_CONNECT_REQUESTS', DEFAULT_MAX_CONNECT_REQUESTS)
run(b8zs_server, (host, port), max_connect_requests)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment