Created
November 17, 2017 17:28
-
-
Save mrocklin/62ec0d232fea6b9463a9eebc52582553 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
def parse(line): | |
words = line.split() | |
time = words[0] | |
protocol = words[1] | |
if protocol == 'IP': | |
src_ip, src_port = words[2].rsplit('.', 1) | |
dst_ip, dst_port = words[4].strip(':').rsplit('.', 1) | |
src_port = int(src_port) | |
dst_port = int(dst_port) | |
else: | |
return None | |
try: | |
length = int(words[-1].strip('()')) | |
except ValueError: | |
length = 0 | |
return {'source_ip': src_ip, | |
'source_port': src_port, | |
'destination_ip': dst_ip, | |
'destination_port': dst_port, | |
'time': pd.Timestamp(time), | |
'protocol': protocol, | |
'length': length} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from tornado.ioloop import IOLoop | |
import math | |
import pcap | |
from streamz import Stream | |
import toolz | |
import pandas as pd | |
from distributed.utils import log_errors | |
import sys | |
with open('text.pcap') as f: | |
lines = [pcap.parse(line) for line in toolz.take(5, f)] | |
columns=['time', 'source_ip', 'destination_ip', 'source_port', | |
'destination_port', 'protocol', 'length'] | |
example = pd.DataFrame(lines, columns=columns) | |
import socket | |
@toolz.memoize | |
def hostname(ip): | |
try: | |
return socket.gethostbyaddr(ip)[0] | |
except Exception: | |
return ip | |
def domain(addr): | |
if addr.split('.')[-1].isnumeric(): # probably an IP address | |
return addr | |
else: | |
return '.'.join(addr.split('.')[-2:]) | |
source = Stream.from_textfile(sys.stdin) | |
records = source.rate_limit(0.01).map(pcap.parse).filter(None) | |
full_df = records.timed_window(0.200).filter(None).map(pd.DataFrame, columns=columns).to_dataframe(example=example).set_index('time') | |
full_df['source_domain'] = full_df.source_ip.map(hostname).map(domain) | |
full_df['destination_domain'] = full_df.destination_ip.map(hostname).map(domain) | |
histograms = {'source-domain': lambda df: df.source_domain.value_counts().reset_index(), | |
'source-port': lambda df: df.source_port.value_counts().reset_index(), | |
'destination-domain': lambda df: df.destination_domain.value_counts().reset_index(), | |
'destination-port': lambda df: df.destination_port.value_counts().reset_index()} | |
from bokeh.server.server import Server | |
from bokeh.application.handlers.function import FunctionHandler | |
from bokeh.application import Application | |
from bokeh.plotting import ColumnDataSource, figure | |
from bokeh.models import DataTable, TableColumn | |
from bokeh.layouts import row, column | |
def stream(df, source=None, doc=None, backlog=100): | |
d = {c: df[c] for c in df.columns} | |
doc.add_next_tick_callback(lambda: source.stream(d, backlog)) | |
def update(df, source=None, doc=None): | |
d = {c: df[c].values for c in df.columns} | |
doc.add_next_tick_callback(lambda: source.data.update(d)) | |
def factor_update(df, source=None, doc=None, fig=None, n=20): | |
with log_errors(): | |
column = [c for c in df.columns if c != 'index'][0] | |
df = df.sort_values(column, ascending=False).iloc[:n] | |
d = {c: df[c].values for c in df.columns} | |
d['index'] = list(map(str, df['index'])) | |
def _(): | |
fig.x_range.factors = d['index'] | |
source.data.update(d) | |
doc.add_next_tick_callback(_) | |
def on_selected(attr, old, new): | |
print('-' * 20) | |
print(attr) | |
print(old) | |
print(new) | |
def make_histograms(histograms, full, selections=None, on_change=None): | |
out = {} | |
for name, func in histograms.items(): | |
df = func(full) | |
source = ColumnDataSource({c: [] for c in df.columns}) | |
col = [c for c in df.columns if c != 'index'][0] | |
fig = figure(title=col.replace('_', ' ').title(), x_range=[], | |
width=400, height=400, tools='tap') | |
fig.xaxis.major_label_orientation = math.pi/4 | |
fig.vbar(source=source, x='index', top=col, width=0.9) | |
if selections is not None: | |
def on_selected(fig, col, attr, old, new): | |
with log_errors(): | |
selections[col] = [fig.x_range.factors[ind] | |
for ind in new['1d']['indices']] | |
if on_change: | |
on_change() | |
source.on_change('selected', toolz.partial(on_selected, fig, col)) | |
# updater = df.stream.map(factor_update, source=source, doc=doc, fig=fig) | |
# doc._update_streams.add(updater) | |
out[name] = {'source': source, 'fig': fig} | |
for d in out.values(): | |
d['fig'].y_range = fig.y_range | |
return out | |
def stringify(x): | |
if isinstance(x, str): | |
return '"%s"' % x | |
else: | |
return x | |
def main_page(doc): | |
doc._update_streams = set() | |
figs = [] | |
selections = dict() | |
# Data table | |
table_source = ColumnDataSource({c: [] for c in full_df.example.reset_index().columns}) | |
columns = [TableColumn(field=c, title=c.title()) for c in full_df.columns] | |
table = DataTable(source=table_source, columns=columns, width=1000, height=200) | |
last_updater = [None] | |
query_updaters = set() | |
query_source_figs = dict() | |
def on_change(): | |
if last_updater[0]: | |
last_updater[0].destroy() | |
# doc._update_streams.remove(last_updater[0]) | |
df = full_df | |
if selections: | |
cond = ' and '.join('(' + ' or '.join('%s == %s' % (column, stringify(value)) for value in v) + ')' | |
for column, v in selections.items() | |
if v) | |
if cond.strip(): | |
df = df.query(cond, engine='python') | |
query_df = df | |
updater = query_df.reset_index().stream.map(stream, source=table_source, doc=doc, backlog=20) | |
doc._update_streams.add(updater) | |
last_updater[0] = updater | |
for s in query_updaters: | |
s.destroy() | |
query_updaters.clear() | |
windowed_query_df = query_df.window(value='20s') | |
for name, func in histograms.items(): | |
d = query_source_figs[name] | |
df = histograms[name](windowed_query_df) | |
updater = df.stream.map(factor_update, source=d['source'], doc=doc, fig=d['fig']) | |
query_updaters.add(updater) | |
# Full Histograms | |
source_figs = make_histograms(histograms, full_df.example, selections, on_change) | |
# Query Histograms | |
query_source_figs.update(make_histograms(histograms, full_df.example, selections, on_change)) | |
on_change() | |
windowed_df = full_df.window(value='20s') | |
for name, d in source_figs.items(): | |
df = histograms[name](windowed_df) | |
updater = df.stream.map(factor_update, source=d['source'], doc=doc, fig=d['fig']) | |
doc._update_streams.add(updater) | |
full_figs = [v['fig'] for v in source_figs.values()] | |
query_figs = [v['fig'] for v in query_source_figs.values()] | |
doc.add_root(column(row(*full_figs), table, row(*query_figs))) | |
server = Server({'/main': main_page}) | |
server.start() | |
IOLoop.current().add_callback(source.start) | |
IOLoop.current().start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment