Skip to content

Instantly share code, notes, and snippets.

@ulidtko
Forked from altescy/bash_detection_server.py
Last active July 31, 2024 14:49
Show Gist options
  • Save ulidtko/5b75ca17c9af69effcea217d1be47726 to your computer and use it in GitHub Desktop.
Save ulidtko/5b75ca17c9af69effcea217d1be47726 to your computer and use it in GitHub Desktop.
import dataclasses
import datetime
import re
import socket
import socketserver
import time
class config:
SOCKET_TIMEOUT = 10
BUFFER_SIZE = 87380
PACKET = (
"HTTP/1.1 200 OK\n"
+ "Server: Apache\n"
+ "Content-Type: text/plain; charset=us-ascii\n"
+ "Transfer-Encoding: chunked\n"
+ "Connection: keep-alive\n\n"
)
MAX_PADDING = 16
PADDING = chr(0) * BUFFER_SIZE
@dataclasses.dataclass
class Payload:
null: str = "sleep 3\n"
good: str = "echo 'Hello there :)'\n"
bad: str = "echo 'This is a malicious script :D'\n"
min_jump: float = 2.0
max_variance: float = 0.1
def var(values: list[float]) -> float:
mean = sum(values) / len(values)
variance = sum((x - mean) ** 2 for x in values) / len(values)
return variance
def popmax(values: list[float]) -> float:
max_value = max(values)
values.pop(values.index(max_value))
return max_value
class HTTPHandler(socketserver.BaseRequestHandler):
PAYLOADS: dict[str, Payload] = {}
def sendchunk(self, chunk: str) -> None:
self.request.sendall(bytes(f"{len(chunk):x}\n", "us-ascii"))
self.request.sendall(bytes(chunk, "us-ascii"))
self.request.sendall(bytes("\n", "us-ascii"))
def log(self, message: str) -> None:
print(
f"[{datetime.datetime.now().isoformat()}] "
f"{self.client_address[0]} {message}"
)
def handle(self) -> None:
self.log("Inbuond request")
self.request.settimeout(config.SOCKET_TIMEOUT)
self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.request.setsockopt(
socket.SOL_SOCKET,
socket.SO_SNDBUF,
config.BUFFER_SIZE,
)
data = None
try:
data = str(self.request.recv(1024), "us-ascii")
except socket.timeout:
self.log("No data received")
return
uri = re.search(r"GET ([^ ]+) HTTP/[0-9.]+", data)
if not uri:
self.log("HTTP request malformed")
return
request_uri = uri.group(1)
self.log(f"Request for shell script {request_uri}")
if request_uri not in self.PAYLOADS:
self.log(f"No payload found for {request_uri}")
return
self.request.sendall(bytes(config.PACKET, "us-ascii"))
payload = self.PAYLOADS[request_uri]
self.sendchunk(payload.null)
if not re.search("User-Agent: (curl|Wget)", data):
self.sendchunk(payload.good)
self.sendchunk("")
self.log("Request not via wget/curl. Returning good payload.")
return
timing = []
stime = time.time()
for _ in range(config.MAX_PADDING):
self.sendchunk(config.PADDING)
etime = time.time()
timing.append(etime - stime)
max_array = [t - s for s, t in zip(timing, timing[1:])]
jump = popmax(max_array)
variance = var(max_array)
self.log(f"Maximum Jump: {jump}, Variance: {variance}")
if variance < payload.max_variance and jump > payload.min_jump:
self.log("Execution through bash detected - sending bad payload :D")
self.sendchunk(payload.bad)
else:
self.log("Sending good payload :(")
self.sendchunk(payload.good)
self.sendchunk("")
self.log("Connection closed")
class BashDetectionServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
daemon_threads = True
allow_reuse_address = True
def __init__(self, server_address: tuple[str, int]) -> None:
super().__init__(server_address, HTTPHandler)
def setpayload(self, uri: str, payload: Payload) -> None:
HTTPHandler.PAYLOADS[uri] = payload
if __name__ == "__main__":
HOST, PORT = "0.0.0.0", 5555
server = BashDetectionServer((HOST, PORT))
server.setpayload("/setup.sh", Payload())
print(f"Listening on http://{HOST}:{PORT}")
server.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment