Created
December 15, 2018 14:37
-
-
Save maqp/50a736ce509e066b5f43523b68d2b703 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.6 | |
# -*- coding: utf-8 -*- | |
import base64 | |
import hashlib | |
import os | |
import random | |
import shlex | |
import socket | |
import subprocess | |
import tempfile | |
import time | |
import stem | |
from stem.control import Controller | |
from stem import SocketClosed | |
from stem import process as stem_process | |
def get_available_port(min_port: int, max_port: int) -> str: | |
with socket.socket() as tmpsock: | |
while True: | |
try: | |
tmpsock.bind(('127.0.0.1', random.randint(min_port, max_port))) | |
break | |
except OSError: | |
pass | |
_, port = tmpsock.getsockname() | |
return str(port) | |
class Tor(object): | |
"""Tor class manages the starting and stopping of Tor client with Stem.""" | |
def __init__(self) -> None: | |
self.tor_data_directory = None | |
self.tor_process = None | |
self.controller = None | |
def connect(self, port: str) -> bool: | |
"""Launch Tor as a subprocess.""" | |
self.tor_data_directory = tempfile.TemporaryDirectory() | |
tor_control_socket = os.path.join(self.tor_data_directory.name, 'control_socket') | |
try: | |
self.tor_process = stem_process.launch_tor_with_config( | |
config={'DataDirectory': self.tor_data_directory.name, | |
'SocksPort': port, | |
'ControlSocket': tor_control_socket, | |
'AvoidDiskWrites': '1', | |
'Log': 'notice stdout', | |
'GeoIPFile': '/usr/share/tor/geoip', | |
'GeoIPv6File ': '/usr/share/tor/geoip6'}, | |
tor_cmd='{}/stemtest/tor/src/app/tor'.format(os.getenv("HOME"))) | |
except OSError: | |
return False | |
start_ts = time.monotonic() | |
self.controller = Controller.from_socket_file(path=tor_control_socket) | |
self.controller.authenticate() | |
while True: | |
time.sleep(0.1) | |
try: | |
response = self.controller.get_info("status/bootstrap-phase") | |
except SocketClosed: | |
raise SystemExit("Tor socket closed.") | |
res_parts = shlex.split(response) | |
summary = res_parts[4].split('=')[1] | |
if summary == 'Done': | |
return True | |
if time.monotonic() - start_ts > 15: | |
return False | |
def stop(self) -> None: | |
"""Stop the Tor subprocess.""" | |
if self.tor_process: | |
self.tor_process.terminate() | |
time.sleep(0.1) | |
if not self.tor_process.poll(): | |
self.tor_process.kill() | |
def stem_compatible_base64_blob_from_private_key(private_key: bytes) -> str: | |
"""Create stem compatible key blob from private key. | |
This code is based on Tor's testing code at | |
https://github.com/torproject/tor/blob/8e84968ffbf6d284e8a877ddcde6ded40b3f5681/src/test/ed25519_exts_ref.py#L48 | |
""" | |
b = 256 | |
def bit(h: bytes, i: int) -> int: | |
return (h[i // 8] >> (i % 8)) & 1 | |
def encode_int(y: int) -> bytes: | |
bits = [(y >> i) & 1 for i in range(b)] | |
return b''.join([bytes([(sum([bits[i * 8 + j] << j for j in range(8)]))]) for i in range(b // 8)]) | |
def expand_private_key(sk: bytes) -> bytes: | |
h = hashlib.sha512(sk).digest() | |
a = 2 ** (b - 2) + sum(2 ** i * bit(h, i) for i in range(3, b - 2)) | |
k = b''.join([bytes([h[i]]) for i in range(b // 8, b // 4)]) | |
assert len(k) == 32 | |
return encode_int(a) + k | |
expanded_private_key = expand_private_key(private_key) | |
return base64.b64encode(expanded_private_key).decode() | |
def kill_background_tor(): | |
"""Kill any Tor instances left open.""" | |
try: | |
pids = subprocess.check_output("ps aux |grep '[t]orrc' | awk '{print $2}' 2>/dev/null", shell=True).split(b'\n') | |
for pid in pids: | |
subprocess.Popen("kill {}".format(int(pid)), shell=True).wait() | |
except ValueError: | |
pass | |
def main() -> None: | |
"""Repeatedly launch v3 Tor Onion Service using Stem.""" | |
kill_background_tor() | |
private_key = os.urandom(32) | |
stem_key_data = stem_compatible_base64_blob_from_private_key(private_key) | |
while True: | |
tor_port = get_available_port(1000, 65535) | |
tor = Tor() | |
if not tor.connect(tor_port): | |
print("\nTor timed out!") | |
continue | |
print('Starting Onion Service... ', end='', flush=True) | |
try: | |
service = tor.controller.create_ephemeral_hidden_service(ports={80: 5000}, | |
key_type='ED25519-V3', | |
key_content=stem_key_data, | |
await_publication=True) | |
except stem.OperationFailed as e: | |
print("OperationFailed:\n{}".format(e)) | |
tor.stop() | |
raise | |
print('Onion Service {} is now online'.format(service.service_id)) | |
tor.controller.remove_hidden_service(service.service_id) | |
tor.stop() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment