Created
October 11, 2018 21:52
-
-
Save JustinAzoff/4086a0a0a84f2e5b85d15dafcc83cb45 to your computer and use it in GitHub Desktop.
bro zeromq examples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
event log_one (n:count) | |
{ | |
Reporter::info(fmt("Hello %d", n)); | |
if(n != 0) { | |
schedule 1sec { log_one(n-1) }; | |
} | |
} | |
event bro_init() | |
{ | |
event log_one(10000000); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@load NCSA/ZeroMQWriter | |
redef LogZeroMQ::endpoint = "tcp://127.0.0.1:9999"; | |
redef Log::default_scope_sep="_"; | |
event bro_init() | |
{ | |
for ( stream_id in Log::active_streams ) | |
Log::remove_default_filter(stream_id); | |
} | |
type Extension: record { | |
write_ts: time &log; | |
#stream: string &log; | |
system_name: string &log; | |
}; | |
function add_extension(path: string): Extension | |
{ | |
return Extension($write_ts = network_time(), | |
#$stream = path, | |
$system_name = peer_description); | |
} | |
redef Log::default_ext_func = add_extension; | |
event new_connection(c: connection) { | |
print c$id; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
import json | |
import sys | |
import zmq | |
def sub(bind="tcp://*:9999"): | |
context = zmq.Context() | |
socket = context.socket(zmq.SUB) | |
socket.bind(bind) | |
socket.setsockopt_string(zmq.SUBSCRIBE, u"http") | |
socket.setsockopt_string(zmq.SUBSCRIBE, u"ssl") | |
while True: | |
stream = socket.recv_string() | |
entry = socket.recv_string() | |
rec = json.loads(entry) | |
try: | |
if stream == "http": | |
print("{id_orig_h} {method} http://{host}:{id_resp_p}{uri}".format(**rec)) | |
elif stream == "ssl": | |
print("{id_orig_h} unknown https://{server_name}:{id_resp_p} unknown".format(**rec)) | |
except: | |
print(rec) | |
if __name__ == "__main__": | |
try: | |
bind = sys.argv[1] | |
except IndexError: | |
bind = "tcp://*:9999" | |
sub(bind=bind) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
import psycopg2 | |
import sys | |
import zmq | |
def sub(bind="tcp://*:9999", topics=None): | |
context = zmq.Context() | |
socket = context.socket(zmq.SUB) | |
socket.bind(bind) | |
if not topics: | |
topics = u"" | |
for topic in topics.split(u","): | |
socket.setsockopt_string(zmq.SUBSCRIBE, topic) | |
while True: | |
stream = socket.recv_string() | |
entry = socket.recv_string() | |
yield stream, entry | |
def insert(conn, records): | |
cur = conn.cursor() | |
cur.execute(''' | |
create table if not exists logs ( | |
stream varchar(30), | |
rec jsonb | |
) | |
''') | |
cur.execute(' create index if not exists logs_stream on logs(stream) ') | |
conn.commit() | |
cur.execute(''' | |
prepare insert as | |
insert into logs (stream, rec) values ($1, $2) | |
''') | |
for i, (stream, rec) in enumerate(records): | |
cur.execute("execute insert (%s, %s)", (stream, rec)) | |
if i % 10 == 0: | |
conn.commit() | |
print("Inserted", i) | |
if __name__ == "__main__": | |
try: | |
bind = sys.argv[1] | |
except IndexError: | |
bind = "tcp://*:9999" | |
try: | |
pg = sys.argv[2] | |
except IndexError: | |
pg = "dbname='bro' user='postgres' host='192.168.99.100'" | |
try: | |
topics = sys.argv[3] | |
except IndexError: | |
topics = "" | |
conn = psycopg2.connect(pg) | |
records = sub(bind=bind, topics=topics) | |
insert(conn, records) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
import psycopg2 | |
import sys | |
import zmq | |
def sub(bind="tcp://*:9999", topics=None): | |
context = zmq.Context() | |
socket = context.socket(zmq.SUB) | |
socket.bind(bind) | |
if not topics: | |
topics = u"" | |
for topic in topics.split(u","): | |
socket.setsockopt_string(zmq.SUBSCRIBE, topic) | |
while True: | |
stream = socket.recv_string() | |
entry = socket.recv_string() | |
yield stream, entry | |
def insert(conn, records): | |
cur = conn.cursor() | |
cur.execute(''' | |
create stream if not exists logs ( | |
log varchar(30), | |
rec jsonb | |
) | |
''') | |
#cur.execute(' create index if not exists logs_log on logs(log) ') | |
#conn.commit() | |
cur.execute(''' | |
prepare insert as | |
insert into logs (log, rec) values ($1, $2) | |
''') | |
for i, (stream, rec) in enumerate(records): | |
cur.execute("execute insert (%s, %s)", (stream, rec)) | |
if i % 10 == 0: | |
conn.commit() | |
print("Inserted", i) | |
if __name__ == "__main__": | |
try: | |
bind = sys.argv[1] | |
except IndexError: | |
bind = "tcp://*:9999" | |
try: | |
pg = sys.argv[2] | |
except IndexError: | |
pg = "dbname='pipeline' user='pipeline' password='pipeline' host='192.168.99.100'" | |
try: | |
topics = sys.argv[3] | |
except IndexError: | |
topics = "" | |
conn = psycopg2.connect(pg) | |
records = sub(bind=bind, topics=topics) | |
insert(conn, records) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DROP CONTINUOUS VIEW top_dns; | |
CREATE CONTINUOUS VIEW top_dns WITH (sw = '2 minute') AS | |
SELECT rec->'query' as query, | |
count(*) as c, | |
count ( DISTINCT rec->'id_orig_h') as sources | |
FROM logs | |
WHERE log='dns' | |
GROUP BY rec->'query'; | |
DROP CONTINUOUS VIEW top_dns_clients; | |
CREATE CONTINUOUS VIEW top_dns_clients WITH (sw = '2 minute') AS | |
SELECT rec->'id_orig_h' as query, | |
count(*) as c, | |
count ( DISTINCT rec->'query') as queries | |
FROM logs | |
WHERE log='dns' | |
GROUP BY rec->'id_orig_h'; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/json" | |
"flag" | |
"log" | |
"net/http" | |
"os" | |
"sync" | |
"time" | |
"github.com/go-zeromq/zmq4" | |
) | |
var ( | |
zmqBind = flag.String("zmq", "tcp://127.0.0.1:9999", "endpoint to bind the zmq sub socket to") | |
) | |
type Conn struct { | |
Orig string `json:"id_orig_h"` | |
Resp string `json:"id_resp_h"` | |
} | |
type Data struct { | |
conns map[string][]string | |
sync.Mutex | |
} | |
func recv(data *Data) { | |
sub := zmq4.NewSub(context.Background()) | |
defer sub.Close() | |
log.Printf("Binding zmq SUB socket to %q", *zmqBind) | |
sub.Listen(*zmqBind) | |
err := sub.SetOption(zmq4.OptionSubscribe, "conn") | |
if err != nil { | |
log.Fatalf("could not subscribe: %v", err) | |
} | |
for { | |
// Read envelope with address | |
msg, err := sub.Recv() | |
if err != nil { | |
log.Fatal(err) | |
} | |
contents := msg.Frames[1] | |
var record Conn | |
if err := json.Unmarshal(contents, &record); err != nil { | |
log.Printf("Can't decode %q", contents) | |
continue | |
} | |
if record.Orig == "" { | |
continue | |
} | |
log.Printf("%v", record) | |
data.Lock() | |
data.conns[record.Orig] = append(data.conns[record.Orig], record.Resp) | |
if len(data.conns[record.Orig]) > 10 { | |
data.conns[record.Orig] = data.conns[record.Orig][1:] | |
} | |
data.Unlock() | |
} | |
} | |
func web(data *Data) { | |
http.HandleFunc("/recent", func(w http.ResponseWriter, r *http.Request) { | |
query := r.FormValue("q") | |
if query == "" { | |
http.Error(w, "Missing parameter: q", http.StatusBadRequest) | |
return | |
} | |
data.Lock() | |
conns := data.conns[query] | |
json.NewEncoder(w).Encode(conns) | |
data.Unlock() | |
}) | |
log.Fatal(http.ListenAndServe(":8080", nil)) | |
} | |
func main() { | |
flag.Usage = func() { | |
flag.PrintDefaults() | |
} | |
flag.Parse() | |
conns := make(map[string][]string) | |
data := Data{conns: conns} | |
go recv(&data) | |
go web(&data) | |
for { | |
data.Lock() | |
log.Printf("Tracking %d sources", len(data.conns)) | |
data.Unlock() | |
time.Sleep(2 * time.Second) | |
} | |
os.Exit(0) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"flag" | |
"log" | |
"os" | |
"github.com/go-zeromq/zmq4" | |
) | |
var ( | |
zmqBind = flag.String("zmq", "tcp://127.0.0.1:9999", "endpoint to bind the zmq sub socket to") | |
) | |
func main() { | |
flag.Usage = func() { | |
flag.PrintDefaults() | |
} | |
flag.Parse() | |
sub := zmq4.NewSub(context.Background()) | |
defer sub.Close() | |
log.Printf("Binding zmq SUB socket to %q", *zmqBind) | |
sub.Listen(*zmqBind) | |
err := sub.SetOption(zmq4.OptionSubscribe, "conn") | |
if err != nil { | |
log.Fatalf("could not subscribe: %v", err) | |
} | |
for { | |
msg, err := sub.Recv() | |
if err != nil { | |
log.Fatal(err) | |
} | |
log.Printf("%s %s", msg.Frames[0], msg.Frames[1]) | |
} | |
os.Exit(0) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
import json | |
import pprint | |
import sys | |
import time | |
import zmq | |
import pprint | |
def sub(bind="tcp://*:9999", topics=None): | |
context = zmq.Context() | |
socket = context.socket(zmq.SUB) | |
socket.bind(bind) | |
if not topics: | |
topics = u"" | |
for topic in topics.split(u","): | |
socket.setsockopt_string(zmq.SUBSCRIBE, topic) | |
while True: | |
stream = socket.recv_string() | |
entry = socket.recv_string() | |
rec = json.loads(entry) | |
#pprint.pprint((stream, rec)) | |
print(entry) | |
if __name__ == "__main__": | |
try: | |
bind = sys.argv[1] | |
except IndexError: | |
bind = "tcp://*:9999" | |
try: | |
topics = sys.argv[2] | |
except IndexError: | |
topics = "" | |
sub(bind=bind, topics=topics) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import time | |
import zmq | |
port = "9999" | |
context = zmq.Context() | |
socket = context.socket(zmq.SUB) | |
socket.bind("tcp://127.0.0.1:%s" % port) | |
socket.setsockopt(zmq.SUBSCRIBE, "") | |
x = bytes = lines = 0 | |
last = time.time() | |
while True: | |
stream, entry = socket.recv_multipart() | |
lines += 1 | |
x += 1 | |
bytes += len(entry) | |
if x % 20 == 0: | |
x = 0 | |
now = time.time() | |
dur = now - last | |
if dur >= 1: | |
last = now | |
print("{}s lines={} bytes={} mbytes={}".format(dur, lines, bytes, bytes/1024/1024)) | |
lines = bytes = 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
psql -h $(docker-machine ip) -U pipeline pipeline |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
docker run -t -i --rm -p 5432:5432 pipelinedb/pipelinedb |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment