Last active
June 22, 2024 11:41
-
-
Save stkptr/712d3bdbd4d300bbfce13ad60b2cff17 to your computer and use it in GitHub Desktop.
Minecraft (Java and Bedrock) LAN spoofer/pinger
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import hashlib | |
import os | |
import argparse | |
import select | |
import socket | |
import time | |
import random | |
import hmac | |
import io | |
from collections import namedtuple | |
import warnings | |
import struct | |
import re | |
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
# written by stkptr in 2024 | |
# this file is available under the Unlicense, CC0, or MIT, at your discretion | |
# Java | |
# https://github.com/mpontillo/minecraft-lan-announce/ | |
# RakNet information | |
# https://wiki.bedrock.dev/servers/raknet-and-mcpe.html | |
# https://wiki.vg/Raknet_Protocol | |
# NetherNet information | |
# https://github.com/df-mc/nethernet-spec | |
pepper = os.urandom(64) | |
# this hides any IP address that is printed to the terminal | |
# this was used since I (stkptr) requested people to run this script | |
# it's not strictly necessary as they should all be private addresses | |
# but some prefer to not have their private addresses leaked | |
def hide_ip(address, length=8): | |
m = hashlib.sha256() | |
m.update(address.encode('utf8')) | |
m.update(pepper) # use a pepper to prevent rainbow table attacks | |
return m.hexdigest()[:length] | |
def wassert(cond): | |
if not cond: | |
warnings.warn('Assertion failed', stacklevel=2) | |
epoch = time.monotonic_ns() / 1000 | |
# random ID | |
guid = random.randrange(0, 0x7FFFFFFFFFFFFFFF) | |
# to big endian | |
to_be = lambda n, c=8: bytes( | |
[(n >> (8 * (c - i - 1))) & 0xFF for i in range(c)]) | |
# from big endian | |
from_be = lambda b: sum( | |
[v << (8 * (len(b) - i - 1)) for i, v in enumerate(b)]) | |
# to little endian | |
to_le = lambda n, c=8: bytes( | |
[(n >> (8 * i)) & 0xFF for i in range(c)]) | |
# from little endian | |
from_le = lambda b: sum( | |
[v << (8 * i) for i, v in enumerate(b)]) | |
def socket_loop( | |
socket, | |
reader=lambda peer, data: None, | |
ping=lambda: None, | |
ping_interval=1, | |
poll_timeout=0.5): | |
last_ping = time.monotonic() | |
while True: | |
# ping periodically | |
now = time.monotonic() | |
if now - last_ping >= ping_interval: | |
ping() | |
last_ping = now | |
# check if there's some data on the socket | |
read, _, _ = select.select([socket], [], [socket], poll_timeout) | |
for r in read: | |
bufsize = 4096 | |
data, peer = r.recvfrom(bufsize) | |
reader(peer, data) | |
JavaAnnouncement = namedtuple('JavaAnnouncement', 'motd port') | |
def java_encode(packet): | |
return f'[MOTD]{packet.motd}[/MOTD][AD]{packet.port}[/AD]'.encode('utf8') | |
def java_decode(packet): | |
# the format is more flexible than this, but this catches typical games | |
matched = re.match( | |
r'\[MOTD\]([^[]+)\[/MOTD\]\[AD\]([0-9]+)\[/AD\]', | |
packet.decode('utf8') | |
) | |
if matched: | |
return JavaAnnouncement(matched.group(1), int(matched.group(2))) | |
def java_announce(): | |
send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
def ping(): | |
print('Send ping') | |
packet = java_encode(JavaAnnouncement('Custom LAN Game', 25565)) | |
send.sendto(packet, ('224.0.2.60', 4445)) | |
ping() | |
socket_loop(send, ping=ping, ping_interval=1.5) | |
def java_query(): | |
listen = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
mreq = struct.pack("4sl", socket.inet_aton('224.0.2.60'), socket.INADDR_ANY) | |
listen.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) | |
listen.bind(('', 4445)) | |
def receive_data(peer, data): | |
print( | |
f'announcement {hide_ip(peer[0])}:{peer[1]} = {java_decode(data)}' | |
) | |
socket_loop(listen, receive_data) | |
# this is a magic number used by Raknet | |
# see https://wiki.bedrock.dev/servers/raknet-and-mcpe.html | |
raknet_magic = bytes([ | |
0x00, 0xff, 0xff, 0x00, | |
0xfe, 0xfe, 0xfe, 0xfe, | |
0xfd, 0xfd, 0xfd, 0xfd, | |
0x12, 0x34, 0x56, 0x78, | |
]) | |
UnconnectedPing = namedtuple('UnconnectedPing', 'id uptime') | |
UnconnectedPong = namedtuple('UnconnectedPong', [ | |
'id', | |
'uptime', | |
'edition', | |
'server_name', | |
'protocol_version', | |
'version_number', | |
'player_count', | |
'max_player_count', | |
'level_name', | |
'game_mode_name', | |
'game_mode', | |
'portv4', | |
'portv6' | |
]) | |
def raknet_encode(packet): | |
p = io.BytesIO() | |
if isinstance(packet, UnconnectedPing): | |
p.write(to_be(0x01, 1)) | |
p.write(to_be(packet.uptime, 8)) | |
p.write(raknet_magic) | |
p.write(to_be(packet.id, 8)) | |
elif isinstance(packet, UnconnectedPong): | |
p.write(to_be(0x1c, 1)) | |
p.write(to_be(packet.uptime, 8)) | |
p.write(to_be(packet.id, 8)) | |
p.write(raknet_magic) | |
s = [ | |
packet.edition, | |
packet.server_name, | |
str(packet.protocol_version), | |
packet.version_number, | |
str(packet.player_count), | |
str(packet.max_player_count), | |
str(packet.id), | |
packet.level_name, | |
packet.game_mode_name, | |
str(packet.game_mode), | |
str(packet.portv4), | |
str(packet.portv6) | |
] | |
response_s = ';'.join(s) + ';' | |
p.write(to_be(len(response_s), 2)) | |
p.write(response_s.encode('utf8')) | |
p.seek(0) | |
return p.read() | |
def raknet_decode(packet): | |
p = io.BytesIO(packet) | |
ptype = from_be(p.read(1)) | |
if ptype == 0x01: | |
uptime = from_be(p.read(8)) | |
wassert(raknet_magic == p.read(16)) | |
uid = from_be(p.read(8)) | |
return UnconnectedPing(uid, uptime) | |
elif ptype == 0x1c: | |
uptime = from_be(p.read(8)) | |
uid = from_be(p.read(8)) | |
wassert(raknet_magic == p.read(16)) | |
strlen = from_be(p.read(2)) | |
s = p.read().decode('utf8').split(';') | |
return UnconnectedPong( | |
uid, | |
uptime, | |
s[0], | |
s[1], | |
int(s[2]), | |
s[3], | |
int(s[4]), | |
int(s[5]), | |
s[7], | |
s[8], | |
int(s[9]), | |
int(s[10]), | |
int(s[11]) | |
) | |
# this announces a server, responding to pings | |
def raknet_announce(): | |
print('Announcing a server to the network') | |
# listener socket | |
listen = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
listen.bind(('', 19132)) | |
# unconnected pong | |
def ping_response(peer, data): | |
print( | |
f'query from {hide_ip(peer[0])}:{peer[1]} = {raknet_decode(data)}' | |
) | |
if data[0] != 0x01: | |
return | |
decoded = raknet_decode(data) | |
response = raknet_encode(UnconnectedPong( | |
guid, | |
decoded.uptime, | |
'MCPE', | |
'Custom LAN Game', | |
649, | |
'1.20.62', | |
3, | |
11, | |
'Game Level', | |
'Survival', | |
1, | |
19132, | |
19133 | |
)) | |
listen.sendto(response, peer) | |
socket_loop(listen, ping_response) | |
def raknet_query(broadcast): | |
print('Querying the network for servers') | |
send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
send.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
def ping(): | |
print('Send ping') | |
ping_message = raknet_encode(UnconnectedPing( | |
guid, | |
int(time.monotonic_ns() / 1000 - epoch) | |
)) | |
send.sendto(ping_message, (broadcast, 19132)) | |
def receive_packet(peer, data): | |
print( | |
'response from' | |
f' {hide_ip(peer[0])}:{peer[1]} = {raknet_decode(data)}' | |
) | |
ping() | |
socket_loop(send, receive_packet, ping, ping_interval=1) | |
nethernet_key = hashlib.sha256(to_le(0xDEADBEEF, 8)).digest() | |
def encrypt(data): | |
cipher = Cipher(algorithms.AES(nethernet_key), modes.ECB()) | |
encryptor = cipher.encryptor() | |
extra = (16 - len(data) % 16) | |
extra = extra if extra else 16 | |
# PKCS5/7 | |
toenc = data + bytes([extra] * extra) | |
return encryptor.update(toenc) + encryptor.finalize() | |
def decrypt(data): | |
cipher = Cipher(algorithms.AES(nethernet_key), modes.ECB()) | |
decryptor = cipher.decryptor() | |
d = decryptor.update(data) + decryptor.finalize() | |
return d[:-d[-1]] | |
def checksum(data): | |
return hmac.digest(nethernet_key, data, 'sha256') | |
DiscoveryRequestPacket = namedtuple('DiscoveryRequestPacket', 'id') | |
DiscoveryResponsePacket = namedtuple('DiscoveryResponsePacket', [ | |
'id', | |
'version', | |
'server_name', | |
'level_name', | |
'game_type', | |
'player_count', | |
'max_player_count', | |
'is_editor_world', | |
'transport_layer' | |
]) | |
DiscoveryMessagePacket = namedtuple('DiscoveryMessagePacket', 'id recipient_id data') | |
def write_string(b, s, lenbytes=1): | |
b.write(to_le(len(s), lenbytes)) | |
b.write(s) | |
def read_string(b, lenbytes=1): | |
l = from_le(b.read(lenbytes)) | |
return b.read(l) | |
def nethernet_encode(packet): | |
p = io.BytesIO() | |
ptype = ( | |
0 if isinstance(packet, DiscoveryRequestPacket) else | |
1 if isinstance(packet, DiscoveryResponsePacket) else | |
2 if isinstance(packet, DiscoveryMessagePacket) else | |
None | |
) | |
p.write(to_le(ptype, 2)) # type | |
p.write(to_le(packet.id, 8)) | |
p.write(to_le(0, 8)) # padding | |
if isinstance(packet, DiscoveryRequestPacket): | |
pass # nothing else to encode | |
elif isinstance(packet, DiscoveryResponsePacket): | |
h = io.BytesIO() | |
h.write(to_le(packet.version, 1)) | |
write_string(h, packet.server_name.encode('utf8'), 1) | |
write_string(h, packet.level_name.encode('utf8'), 1) | |
h.write(to_le(packet.game_type, 4)) | |
h.write(to_le(packet.player_count, 4)) | |
h.write(to_le(packet.max_player_count, 4)) | |
h.write(to_le(int(packet.is_editor_world), 1)) | |
h.write(to_le(packet.transport_layer, 4)) | |
h.seek(0) | |
write_string(p, h.read().hex().encode('ascii'), 4) | |
elif isinstance(packet, DiscoveryMessagePacket): | |
p.write(to_le(packet.recipient_id, 8)) | |
write_string(p, packet.data.encode('utf8'), 4) | |
length = p.tell() + 2 | |
p.seek(0) | |
p = bytes([length & 0xFF, length >> 8]) + p.read() | |
return checksum(p) + encrypt(p) | |
def nethernet_decode(packet): | |
raw = io.BytesIO(packet) | |
cs = raw.read(32) | |
dec = io.BytesIO(decrypt(raw.read())) | |
wassert(cs == checksum(dec.read())) # checksum verification | |
plen = dec.tell() | |
dec.seek(0) | |
wassert(plen == from_le(dec.read(2))) # length verification | |
ptype = from_le(dec.read(2)) | |
sid = from_le(dec.read(8)) | |
wassert(dec.read(8) == bytes([0] * 8)) # padding | |
if ptype == 0: | |
return DiscoveryRequestPacket(sid) | |
elif ptype == 1: | |
hexdata = read_string(dec, 4) | |
dat = io.BytesIO(bytes.fromhex(hexdata.decode('ascii'))) | |
version = from_le(dat.read(1)) | |
server_name = read_string(dat).decode('utf8') | |
level_name = read_string(dat).decode('utf8') | |
game_type = from_le(dat.read(4)) | |
player_count = from_le(dat.read(4)) | |
max_players = from_le(dat.read(4)) | |
is_editor = bool(from_le(dat.read(1))) | |
transport_layer = from_le(dat.read(4)) | |
return DiscoveryResponsePacket( | |
sid, | |
version, | |
server_name, | |
level_name, | |
game_type, | |
player_count, | |
max_players, | |
is_editor, | |
transport_layer | |
) | |
elif ptype == 2: | |
recipient = from_le(dec.read(8)) | |
data = read_string(dec, 4).decode('utf8') | |
return DiscoveryMessagePacket(sid, recipient, data) | |
def nethernet_announce(): | |
print('Announcing a NetherNet server to the network') | |
# listener socket | |
listen = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
listen.bind(('', 7551)) | |
def ping_response(peer, data): | |
decoded = nethernet_decode(data) | |
if decoded.id == guid: | |
return | |
print(f'query from {hide_ip(peer[0])}:{peer[1]} = {decoded}') | |
packet = DiscoveryResponsePacket( | |
guid, | |
2, | |
'Custom LAN Game', | |
'Game Level', | |
1, | |
3, | |
11, | |
False, | |
2 | |
) | |
listen.sendto(nethernet_encode(packet), (peer[0], 7551)) | |
socket_loop(listen, ping_response) | |
def nethernet_query(broadcast): | |
print('Querying the network for NetherNet servers') | |
send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
send.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
def ping(): | |
print('Send ping') | |
packet = nethernet_encode(DiscoveryRequestPacket(guid)) | |
send.sendto(packet, (broadcast, 7551)) | |
def receive_packet(peer, data): | |
decoded = nethernet_decode(data) | |
if decoded.id != guid: | |
print(f'response from {hide_ip(peer[0])}:{peer[1]} = {decoded}') | |
ping() | |
socket_loop(send, receive_packet, ping, ping_interval=2) | |
def main(): | |
parser = argparse.ArgumentParser( | |
description='Spoof or search for Minecraft servers on the network.') | |
parser.add_argument('--java', '-j', action='store_true', | |
help='If provided, spoof/scan for Java instead of Bedrock servers') | |
parser.add_argument('--nethernet', '-n', action='store_true', | |
help='If provided, use NetherNet instead of Raknet protocol') | |
parser.add_argument('--broadcast', '-b', default='255.255.255.255', | |
help='Address to send broadcast packets to') | |
parser.add_argument('mode', choices=['announce', 'query'], | |
help='If announce, present a fake server to the network. ' | |
'Otherwise, if query, ping for servers on the network.') | |
args = parser.parse_args() | |
print(f'Our GUID is {guid}') | |
try: | |
if args.java: | |
if args.mode == 'announce': | |
java_announce() | |
elif args.mode == 'query': | |
java_query() | |
elif args.nethernet: | |
if args.mode == 'announce': | |
nethernet_announce() | |
elif args.mode == 'query': | |
nethernet_query(args.broadcast) | |
else: | |
if args.mode == 'announce': | |
raknet_announce() | |
elif args.mode == 'query': | |
raknet_query(args.broadcast) | |
except KeyboardInterrupt: | |
print('KeyboardInterrupt, exiting') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment