Created
May 23, 2019 17:24
-
-
Save mIcHyAmRaNe/a0febdab4a1649fcee6160a570d2747a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os.path import curdir, realpath | |
from os import makedirs | |
import stem.process | |
import stem.control | |
import requests | |
import socks | |
import socket | |
import json | |
SOCKS_PORT = 9060 | |
config = { | |
'SocksPort': f'{SOCKS_PORT}', | |
'ControlPort': f'{SOCKS_PORT+1}', | |
'DataDirectory': f'{realpath(curdir)}/tor/DataDirectory', | |
'CookieAuthFile': f'{realpath(curdir)}/tor/control.authcookie', | |
'CookieAuthentication': '1', | |
'CookieAuthFileGroupReadable': '1', | |
'__LeaveStreamsUnattached': '1' | |
} | |
def init_msg_handler(line): | |
print(line) | |
def enable_socks_proxy(): | |
socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, '127.0.0.1', SOCKS_PORT) | |
socket.socket = socks.socksocket | |
socket.getaddrinfo = lambda *args: [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (args[0], args[1]))] | |
def disable_socks_proxy(): | |
socket.socket = socks._orgsocket | |
def get_new_circ(): | |
circuit_id = controller.new_circuit(await_build=True) | |
circ = controller.get_circuit(circuit_id) | |
return circ | |
def circ_info(circ): | |
ret = f'Circuit {circ.id} ({circ.purpose})\n' | |
for path_index, entry in enumerate(circ.path): | |
div = '┗' if (path_index == len(circ.path) - 1) else '┣' | |
fingerprint, nickname = entry | |
desc = controller.get_network_status(fingerprint, None) | |
address = desc.address if desc else 'unknown' | |
location = controller.get_info(f'ip-to-country/{address}', 'unknown') | |
ret += f' {div}━ {fingerprint} [{location}] {address.ljust(15)} {nickname}\n' | |
return ret | |
def stream_handler(ctrl, circ, func): | |
def attach_stream(stream): | |
print(stream) | |
print(stream.status, stream.id, stream.circ_id, stream.remote_reason) | |
# print('\n'.join(dir(stream))) | |
if stream.status == 'NEW': | |
ctrl.attach_stream(stream.id, circ.id) | |
ctrl.add_event_listener(attach_stream, stem.control.EventType.STREAM) | |
try: | |
return func() | |
finally: | |
ctrl.remove_event_listener(attach_stream) | |
makedirs(config.get('DataDirectory'), exist_ok=True) | |
tor_process = stem.process.launch_tor_with_config( | |
config=config, | |
init_msg_handler=init_msg_handler, | |
take_ownership=True | |
) | |
controller = stem.control.Controller.from_port(port=SOCKS_PORT+1) | |
controller.authenticate() | |
enable_socks_proxy() | |
for i in range(0, 10): | |
new_circ = get_new_circ() | |
print(circ_info(new_circ)) | |
r = stream_handler(controller, new_circ, func=lambda: requests.get(url='https://httpbin.org/anything')) | |
try: | |
print(f'\n{json.dumps(r.json(), indent=4)}') | |
except json.decoder.JSONDecodeError: | |
print(r.text) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment