Skip to content

Instantly share code, notes, and snippets.

@dhardtke
Created February 5, 2015 17:58
Show Gist options
  • Save dhardtke/4d89db4d2f4a1017622f to your computer and use it in GitHub Desktop.
Save dhardtke/4d89db4d2f4a1017622f to your computer and use it in GitHub Desktop.
Start Wireshark capturing on Fritz!Box devices and stream it using Python's subprocess-module to wireshark
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import subprocess
import requests
import http.client
from http.client import HTTPConnection
from xml.dom import minidom
import hashlib
import re
import sys
USER_AGENT="Mozilla/5.0 (U; Windows NT 5.1; rv:5.0) Gecko/20100101 Firefox/5.0"
def loginToServer(server,password,port=80):
conn = HTTPConnection(server+':'+str(port))
headers = { "Accept" : "application/xml",
"Content-Type" : "text/plain",
"User-Agent" : USER_AGENT}
initialPage="/login_sid.lua"
conn.request("GET", initialPage, "", headers)
response = conn.getresponse()
data = response.read()
if response.status != 200:
print("%s %s" % (response.status, response.reason))
print(data)
sys.exit(0)
else:
theXml = minidom.parseString(data)
sidInfo = theXml.getElementsByTagName('SID')
sid=sidInfo[0].firstChild.data
if sid == "0000000000000000":
challengeInfo = theXml.getElementsByTagName('Challenge')
challenge=challengeInfo[0].firstChild.data
challenge_bf = (challenge + '-' + password).encode("utf-16le") #.decode('iso-8859-1').encode('utf-16le')
m = hashlib.md5()
m.update(challenge_bf)
response_bf = challenge + '-' + m.hexdigest().lower()
else:
return sid
headers = { "Accept" : "text/html,application/xhtml+xml,application/xml",
"Content-Type" : "application/x-www-form-urlencoded",
"User-Agent" : USER_AGENT}
loginPage="/login_sid.lua?&username=" + username + "&response=" + response_bf
conn.request("GET", loginPage, '', headers)
response = conn.getresponse()
data = response.read().decode("utf-8")
if response.status != 200:
print("%s %s" % (response.status, response.reason))
print(data)
sys.exit(0)
else:
sid = re.search('<SID>(.*?)</SID>', data).group(1)
if sid == "0000000000000000":
print("ERROR - No SID received because of invalid password")
sys.exit(0)
return sid
server="fritz.box"
username="xxx"
password="xxx"
sid = loginToServer(server, password)
if not sid:
print("ERROR logging on")
sys.exit(0)
s = requests.Session()
def streaming(url, sid):
payload = {"capture": "Start", "snaplen": "1600", "ifaceorminor": "1-wlan", "sid": sid}
headers = {'connection': 'keep-alive', 'content-type': 'application/json', 'x-powered-by': 'Express', 'transfer-encoding': 'chunked'}
req = requests.Request("GET", url, headers=headers, params=payload).prepare()
resp = s.send(req, stream=True)
for data in resp.iter_content(decode_unicode=False, chunk_size=1600):
if data:
yield data
p = subprocess.Popen(["wireshark", "-k", "-i", "-"], stdin=subprocess.PIPE)
for data in streaming("http://192.168.178.1/cgi-bin/capture_notimeout", sid):
#data = data.decode("latin-1")
#p.communicate(data)
try:
p.stdin.write(data)
p.stdin.flush()
except:
break
p.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment