-
-
Save BluBb-mADe/e87ec9a7d2efc0f53420f99ac7ad82d2 to your computer and use it in GitHub Desktop.
Apex Legends IP logger
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!venv/Scripts/python | |
from scapy.all import sniff, get_working_if | |
from pynput import keyboard | |
import colorama | |
import sys | |
import time | |
from datetime import datetime | |
import threading | |
import socket | |
import subprocess | |
import traceback | |
import ttlru | |
import collections | |
# requirements: | |
# pip install scapy pynput | |
# external dependency npcap (https://nmap.org/npcap/) | |
# mtr in cygwin (this was the easiest way I could figure out to get mtr running in python. There are probably better ways of doing this) | |
# | |
# I adjusted this script to verify that my occasional packet loss issues are indeed the fault of their server infrastructure | |
# so its specifically designed to help me with that and not whatever purpose the original one served. | |
# | |
# edit: in case you are interested, yes I could confirm that sometimes their infrastructure just drops 30-80% of packets when connecting to certain server ips. | |
# But as expected respawn isn't even investigating the issue and instead continues to blame user ISPs even though this script provides undeniable proof that its an issue on their end. | |
nl_count = 0 | |
def cprint(*args, end="\n"): | |
global nl_count | |
nl_count += (''.join(args) + end).count('\n') | |
print(*args, end=end) | |
def clear(): | |
global nl_count | |
print(f"{colorama.ansi.clear_line()}{colorama.Cursor.UP()}" * nl_count + f"{colorama.ansi.clear_line()}\r", end="") | |
nl_count = 0 | |
class CaptureServer: | |
def __init__(self, local_ip4, local_ip6): | |
self.local_ips = (local_ip4, local_ip6) | |
self.cur_proc = None | |
self.ip_log = ttlru.TTLRU(100, ttl=30 * 1000000000) | |
self.thread = threading.Thread(target=self.worker) | |
self.thread.daemon = True | |
self.thread.start() | |
def worker(self): | |
p_count = collections.Counter() | |
while True: | |
if self.cur_proc is not None: | |
self.cur_proc.wait() | |
self.cur_proc = None | |
p = sniff(filter="udp and portrange 37000-40000", timeout=.5, iface=get_working_if()) | |
# if not p and self.server_ip and self.detections > 10: | |
# print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] No longer connected") | |
p_count.clear() | |
for pack in p: | |
ip_layer = pack.getlayer("IP") or pack.getlayer("IP6") | |
if not ip_layer or ip_layer.src == ip_layer.dst: | |
continue | |
udp_layer = pack.getlayer("UDP") | |
if ip_layer.src in self.local_ips: | |
dest_ip, lport, dport = ip_layer.dst, udp_layer.sport, udp_layer.dport | |
else: | |
dest_ip, lport, dport = ip_layer.src, udp_layer.dport, udp_layer.sport | |
p_count[(dest_ip, dport, lport)] += 1 | |
time.sleep(.2) | |
for ip, count in p_count.items(): | |
ele = self.ip_log.get(ip, [0, count * 2.]) | |
self.ip_log[ip] = [ele[0] + count, (ele[1] + count * 2.) / 2.] | |
clear() | |
for ip, count in self.ip_log.items()[:10]: | |
cprint(f"ip: {ip[0]} lport: {ip[1]} dport: {ip[2]}, packets ≈ {count[0]} rate ≈ {count[1]:.2f}") | |
def get_ip(self): | |
if len(self.ip_log) == 0: | |
return None | |
return self.ip_log.keys()[0][0] | |
def log_ip(self): | |
ip = self.get_ip() | |
if not ip: | |
return | |
print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] IP logged") | |
with open('ApexServerIPList.txt', 'a') as f: | |
f.write(f"{datetime.now():%Y-%m-%d %H:%M:%S},{ip}\n") | |
def traceroute(self): | |
ip = self.get_ip() | |
if not ip: | |
return | |
if self.cur_proc and self.cur_proc.poll() is None: | |
self.cur_proc.terminate() | |
self.cur_proc.wait() | |
self.cur_proc = subprocess.Popen( | |
["y:/cygwin/bin/bash", "-l", "-c", "/home/User/mtr/mtr.exe --displaymode 2 " + ip], stdin=subprocess.PIPE) | |
# self.cur_proc = subprocess.Popen(["mtr", "--displaymode", "2", ip]) | |
def cancel_trace(self): | |
if self.cur_proc: | |
self.cur_proc.stdin.write(b'q\n') | |
self.cur_proc.stdin.close() | |
def main(): | |
colorama.init() | |
print('Apex Legends Game Server IP finder\n') | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
local_ip4 = '127.0.0.1' | |
try: | |
s.connect(('10.255.255.255', 1)) | |
local_ip4 = s.getsockname()[0] | |
except: | |
pass | |
print(f'local ip4 {local_ip4}') | |
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) | |
local_ip6 = '::1' | |
try: | |
s.connect(('fd98:ab6a:1f49:413d::ffff', 1)) | |
local_ip6 = s.getsockname()[0] | |
except: | |
pass | |
s.close() | |
print(f'local ip6 {local_ip6}\n') | |
print('numpad 0 -> start traceroute') | |
print('numpad 1 -> log gameserver IP') | |
print('numpad 7 -> stop traceroute') | |
print('numpad 9 -> exit\n') | |
cs = CaptureServer(local_ip4, local_ip6) | |
with keyboard.GlobalHotKeys({ | |
'<97>': cs.log_ip, | |
'<96>': cs.traceroute, | |
'<103>': cs.cancel_trace, | |
'<105>': sys.exit}) as h: | |
h.join() | |
if __name__ == "__main__": | |
try: | |
main() | |
except: | |
print(traceback.format_exc()) | |
input() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment