Skip to content

Instantly share code, notes, and snippets.

@lrhazi
Last active August 15, 2024 18:37
Show Gist options
  • Save lrhazi/66d7b6d1650cf88596c29f6ccf9c46cb to your computer and use it in GitHub Desktop.
Save lrhazi/66d7b6d1650cf88596c29f6ccf9c46cb to your computer and use it in GitHub Desktop.
import pyshark
import json
from collections import defaultdict
def process_pcap(pcap_file, target_uri, ssl_keylog_file=None):
# Set up capture with TLS decryption if keylog file is provided
display_filter = f'http.request.uri contains "{target_uri}" or http.response'
if ssl_keylog_file:
cap = pyshark.FileCapture(pcap_file,
display_filter=display_filter,
tshark_path='path/to/tshark', # Specify the path to tshark
override_prefs={'ssl.keylog_file': ssl_keylog_file})
else:
cap = pyshark.FileCapture(pcap_file, display_filter=display_filter)
# Use a dictionary to store streams
streams = defaultdict(lambda: {'request': None, 'response': None})
for packet in cap:
if 'TCP' in packet and 'HTTP' in packet:
stream_id = f"{packet.ip.src}:{packet.tcp.srcport}-{packet.ip.dst}:{packet.tcp.dstport}"
if hasattr(packet.http, 'request_uri'):
streams[stream_id]['request'] = {
'uri': packet.http.request_uri,
'method': packet.http.request_method,
'src_ip': packet.ip.src,
'dst_ip': packet.ip.dst,
'time': packet.sniff_time
}
if 'TLS' in packet:
streams[stream_id]['request']['tls_version'] = packet.tls.record_version
elif hasattr(packet.http, 'response_code'):
streams[stream_id]['response'] = {
'status_code': packet.http.response_code,
'content_type': packet.http.content_type if hasattr(packet.http, 'content_type') else None,
'time': packet.sniff_time
}
if 'TLS' in packet:
streams[stream_id]['response']['tls_version'] = packet.tls.record_version
cap.close()
# Filter completed transactions containing the target URI
completed_transactions = [
stream for stream in streams.values()
if stream['request'] and stream['response'] and target_uri in stream['request']['uri']
]
return completed_transactions
def main():
pcap_file = 'capture.pcap'
target_uri = '/specific/path'
ssl_keylog_file = '/path/to/your/sslkeylog.txt' # Set to None if not needed
transactions = process_pcap(pcap_file, target_uri, ssl_keylog_file)
print(json.dumps(transactions, indent=2, default=str))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment