Created
January 15, 2021 11:47
-
-
Save whoisjeeva/b685ee4df9fb78832a8b4eda59fc7b64 to your computer and use it in GitHub Desktop.
Socks5 proxy server in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import threading | |
import select | |
SOCKS_VERSION = 5 | |
class Proxy: | |
def __init__(self): | |
self.username = "username" | |
self.password = "password" | |
def handle_client(self, connection): | |
# greeting header | |
# read and unpack 2 bytes from a client | |
version, nmethods = connection.recv(2) | |
# get available methods [0, 1, 2] | |
methods = self.get_available_methods(nmethods, connection) | |
# accept only USERNAME/PASSWORD auth | |
if 2 not in set(methods): | |
# close connection | |
connection.close() | |
return | |
# send welcome message | |
connection.sendall(bytes([SOCKS_VERSION, 2])) | |
if not self.verify_credentials(connection): | |
return | |
# request (version=5) | |
version, cmd, _, address_type = connection.recv(4) | |
if address_type == 1: # IPv4 | |
address = socket.inet_ntoa(connection.recv(4)) | |
elif address_type == 3: # Domain name | |
domain_length = connection.recv(1)[0] | |
address = connection.recv(domain_length) | |
address = socket.gethostbyname(address) | |
# convert bytes to unsigned short array | |
port = int.from_bytes(connection.recv(2), 'big', signed=False) | |
try: | |
if cmd == 1: # CONNECT | |
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
remote.connect((address, port)) | |
bind_address = remote.getsockname() | |
print("* Connected to {} {}".format(address, port)) | |
else: | |
connection.close() | |
addr = int.from_bytes(socket.inet_aton(bind_address[0]), 'big', signed=False) | |
port = bind_address[1] | |
reply = b''.join([ | |
SOCKS_VERSION.to_bytes(1, 'big'), | |
int(0).to_bytes(1, 'big'), | |
int(0).to_bytes(1, 'big'), | |
int(1).to_bytes(1, 'big'), | |
addr.to_bytes(4, 'big'), | |
port.to_bytes(2, 'big') | |
]) | |
except Exception as e: | |
# return connection refused error | |
reply = self.generate_failed_reply(address_type, 5) | |
connection.sendall(reply) | |
# establish data exchange | |
if reply[1] == 0 and cmd == 1: | |
self.exchange_loop(connection, remote) | |
connection.close() | |
def exchange_loop(self, client, remote): | |
while True: | |
# wait until client or remote is available for read | |
r, w, e = select.select([client, remote], [], []) | |
if client in r: | |
data = client.recv(4096) | |
if remote.send(data) <= 0: | |
break | |
if remote in r: | |
data = remote.recv(4096) | |
if client.send(data) <= 0: | |
break | |
def generate_failed_reply(self, address_type, error_number): | |
return b''.join([ | |
SOCKS_VERSION.to_bytes(1, 'big'), | |
error_number.to_bytes(1, 'big'), | |
int(0).to_bytes(1, 'big'), | |
address_type.to_bytes(1, 'big'), | |
int(0).to_bytes(4, 'big'), | |
int(0).to_bytes(4, 'big') | |
]) | |
def verify_credentials(self, connection): | |
version = ord(connection.recv(1)) # should be 1 | |
username_len = ord(connection.recv(1)) | |
username = connection.recv(username_len).decode('utf-8') | |
password_len = ord(connection.recv(1)) | |
password = connection.recv(password_len).decode('utf-8') | |
if username == self.username and password == self.password: | |
# success, status = 0 | |
response = bytes([version, 0]) | |
connection.sendall(response) | |
return True | |
# failure, status != 0 | |
response = bytes([version, 0xFF]) | |
connection.sendall(response) | |
connection.close() | |
return False | |
def get_available_methods(self, nmethods, connection): | |
methods = [] | |
for i in range(nmethods): | |
methods.append(ord(connection.recv(1))) | |
return methods | |
def run(self, host, port): | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.bind((host, port)) | |
s.listen() | |
print("* Socks5 proxy server is running on {}:{}".format(host, port)) | |
while True: | |
conn, addr = s.accept() | |
print("* new connection from {}".format(addr)) | |
t = threading.Thread(target=self.handle_client, args=(conn,)) | |
t.start() | |
if __name__ == "__main__": | |
proxy = Proxy() | |
proxy.run("127.0.0.1", 3000) |
hello sir can you edit it to support udp please
NO NO NO NO
I ALREADY WROTE THIS MUCH CODE
APREECIATE IT
hello sir can you edit it to support udp please
NO NO NO NO I ALREADY WROTE THIS MUCH CODE APREECIATE IT
MY BAD IM VErY SORRY not
hello sir can you edit it to support udp please
Here is an untested version of the original with a few add-ons and UDP -- thank to UnKnownCoder:
import socket
import threading
import select
SOCKS_VERSION = 5
class Proxy:
def init(self):
self.username = "username"
self.password = "password"
def handle_client(self, connection):
try:
# Greeting header
version, nmethods = connection.recv(2)
methods = self.get_available_methods(nmethods, connection)
if 2 not in set(methods):
connection.close()
return
connection.sendall(bytes([SOCKS_VERSION, 2]))
if not self.verify_credentials(connection):
return
version, cmd, _, address_type = connection.recv(4)
if address_type == 1: # IPv4
address = socket.inet_ntoa(connection.recv(4))
elif address_type == 3: # Domain name
domain_length = connection.recv(1)[0]
address = connection.recv(domain_length)
address = socket.gethostbyname(address)
port = int.from_bytes(connection.recv(2), 'big', signed=False)
if cmd == 1: # CONNECT
self.handle_tcp(connection, address, port)
elif cmd == 3: # UDP ASSOCIATE
self.handle_udp(connection, address, port)
else:
connection.close()
except Exception as e:
print(f"Error handling client: {e}")
def handle_tcp(self, connection, address, port):
try:
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote.connect((address, port))
bind_address = remote.getsockname()
print(f"* Connected to {address} {port}")
addr = int.from_bytes(socket.inet_aton(bind_address[0]), 'big', signed=False)
port = bind_address[1]
reply = b''.join([
SOCKS_VERSION.to_bytes(1, 'big'),
int(0).to_bytes(1, 'big'),
int(0).to_bytes(1, 'big'),
int(1).to_bytes(1, 'big'),
addr.to_bytes(4, 'big'),
port.to_bytes(2, 'big')
])
except Exception as e:
reply = self.generate_failed_reply(1, 5)
print(f"TCP connection error: {e}")
connection.sendall(reply)
if reply[1] == 0:
self.exchange_loop(connection, remote)
connection.close()
def handle_udp(self, connection, address, port):
try:
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
bind_address = udp_socket.getsockname()
print(f"* UDP Associate with {address} {port}")
addr = int.from_bytes(socket.inet_aton(bind_address[0]), 'big', signed=False)
port = bind_address[1]
reply = b''.join([
SOCKS_VERSION.to_bytes(1, 'big'),
int(0).to_bytes(1, 'big'),
int(0).to_bytes(1, 'big'),
int(1).to_bytes(1, 'big'),
addr.to_bytes(4, 'big'),
port.to_bytes(2, 'big')
])
connection.sendall(reply)
while True:
data, client_address = udp_socket.recvfrom(4096)
udp_socket.sendto(data, (address, port))
except Exception as e:
print(f"UDP Associate error: {e}")
connection.close()
def exchange_loop(self, client, remote):
while True:
r, w, e = select.select([client, remote], [], [])
if client in r:
data = client.recv(4096)
if remote.send(data) <= 0:
break
if remote in r:
data = remote.recv(4096)
if client.send(data) <= 0:
break
def generate_failed_reply(self, address_type, error_number):
return b''.join([
SOCKS_VERSION.to_bytes(1, 'big'),
error_number.to_bytes(1, 'big'),
int(0).to_bytes(1, 'big'),
address_type.to_bytes(1, 'big'),
int(0).to_bytes(4, 'big'),
int(0).to_bytes(4, 'big')
])
def verify_credentials(self, connection):
version = connection.recv(1)[0] # should be 1
username_len = connection.recv(1)[0]
username = connection.recv(username_len).decode('utf-8')
password_len = connection.recv(1)[0]
password = connection.recv(password_len).decode('utf-8')
if username == self.username and password == self.password:
response = bytes([version, 0])
connection.sendall(response)
return True
response = bytes([version, 0xFF])
connection.sendall(response)
connection.close()
return False
def get_available_methods(self, nmethods, connection):
return [connection.recv(1)[0] for _ in range(nmethods)]
def run(self, host, port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((host, port))
s.listen()
print(f"* SOCKS5 proxy server is running on {host}:{port}")
while True:
conn, addr = s.accept()
print(f"* New connection from {addr}")
threading.Thread(target=self.handle_client, args=(conn,)).start()
if name == "main":
proxy = Proxy()
proxy.run("127.0.0.1", 3000)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
hello sir can you edit it to support udp please