Last active
August 17, 2018 08:39
-
-
Save vdboor/91457eb38bbfc62368c875cae4c6cb58 to your computer and use it in GitHub Desktop.
Show dmesg IPTABLES messages with DNS name resolving (needs apt install python3-dnspython or pip3 install dnspython)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# This runs dmesg to show iptable logs, and replaces the IP addresses with | |
# actual DNS names. Resolved names are cached for speedup. | |
# | |
# Uses dnspython for proper resolver timeout handling: | |
# pip3 install dnspython (or apt install python3-dnspython) | |
from collections import defaultdict | |
import dns.exception | |
import dns.resolver | |
import dns.reversename | |
import re | |
import subprocess | |
RE_SRC = re.compile(' SRC=([0-9\.aA-F]+)') | |
RE_DST = re.compile(' DST=([0-9\.aA-F]+)') | |
def main(): | |
lookup = DnsLookup() | |
for line in execute(['dmesg', '--human', '--color=always', '--follow']): | |
if '[IPTABLES]' in line: | |
src = RE_SRC.search(line) | |
dst = RE_DST.search(line) | |
src_host = lookup[src.group(1)] | |
dst_host = lookup[dst.group(1)] | |
line = line.replace(src.group(0), ' SRC=' + src_host) | |
line = line.replace(dst.group(0), ' DST=' + dst_host) | |
print(line, end='') | |
def execute(cmd): | |
popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True) | |
aborted = False | |
try: | |
for stdout_line in iter(popen.stdout.readline, ""): | |
yield stdout_line | |
except KeyboardInterrupt: | |
aborted = True | |
popen.stdout.close() | |
return_code = popen.wait() | |
if return_code and not aborted: | |
raise subprocess.CalledProcessError(return_code, cmd) | |
class DnsLookup(defaultdict): | |
def __init__(self): | |
# Using dnspython to handle timeouts properly | |
self.resolver = dns.resolver.Resolver() | |
self.resolver.timeout = 1 | |
self.resolver.lifetime = 1 | |
def __missing__(self, ip): | |
try: | |
reverse_name = dns.reversename.from_address(ip) | |
addr = self.resolver.query(reverse_name, "PTR")[0].to_text()[:-1] | |
except dns.exception.DNSException: | |
addr = ip | |
self[ip] = addr | |
return addr | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment