Skip to content

Instantly share code, notes, and snippets.

@bndabbs
Last active January 4, 2018 22:28
Show Gist options
  • Save bndabbs/1a073529d43079578864d139c8a3ea4b to your computer and use it in GitHub Desktop.
Save bndabbs/1a073529d43079578864d139c8a3ea4b to your computer and use it in GitHub Desktop.
input {
kafka {
topics => ["bro-raw"]
add_field => { "[@metadata][stage]" => "broraw_kafka" }
# Set this to one per kafka partition to scale up
#consumer_threads => 4
group_id => "bro_logstash"
bootstrap_servers =>
codec => json
auto_offset_reset => "earliest"
}
}
filter {
if "_jsonparsefailure" in [tags] {
drop { }
}
if [@metadata][stage] == "broraw_kafka" {
# Set the timestamp
date { match => [ "ts", "ISO8601" ] }
# move metadata to new field
mutate {
rename => {
"@stream" => "[@meta][stream]"
"@system" => "[@meta][system]"
"@proc" => "[@meta][proc]"
}
}
# Rename ID field from file analyzer logs
if [@meta][stream] in ["pe", "x509", "files"] {
mutate { rename => { "id" => "fuid" } }
mutate {
add_field => { "[@meta][event_type]" => "file" }
add_field => { "[@meta][id]" => "%{fuid}" }
}
} else if [@meta][stream] in ["intel", "notice", "notice_alarm", "signatures", "traceroute"] {
mutate { add_field => { "[@meta][event_type]" => "detection" } }
} else if [@meta][stream] in [ "capture_loss", "cluster", "communication", "loaded_scripts", "packet_filter", "prof", "reporter", "stats", "stderr", "stdout" ] {
mutate { add_field => { "[@meta][event_type]" => "diagnostic" } }
} else if [@meta][stream] in ["netcontrol", "netcontrol_drop", "netcontrol_shunt", "netcontrol_catch_release", "openflow"] {
mutate { add_field => { "[@meta][event_type]" => "netcontrol" } }
} else if [@meta][stream] in ["known_certs", "known_devices", "known_hosts", "known_modbus", "known_services", "software"] {
mutate { add_field => { "[@meta][event_type]" => "observations" } }
} else if [@meta][stream] in ["barnyard2", "dpd", "unified2", "weird"] {
mutate { add_field => { "[@meta][event_type]" => "miscellaneous" } }
} else {
# Network type
mutate {
add_field => {
"[@meta][event_type]" => "network"
}
}
cidr {
address => [ "%{[id_orig_h]}" ]
network => [ "0.0.0.0/32", "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "fc00::/7", "127.0.0.0/8", "::1/128","169.254.0.0/16", "fe80::/10","224.0.0.0/4", "ff00::/8","255.255.255.255/32" ]
add_field => { "[@meta][orig_host_routable]" => "false" }
}
cidr {
address => [ "%{[id_resp_h]}" ]
network => [ "0.0.0.0/32", "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "fc00::/7", "127.0.0.0/8", "::1/128","169.254.0.0/16", "fe80::/10","224.0.0.0/4", "ff00::/8","255.255.255.255/32" ]
add_field => { "[@meta][resp_host_routable]" => "false" }
}
if ![@meta][orig_host_routable] {
mutate {
add_field => {
"[@meta][orig_host_routable]" => "true"
}
}
}
if ![@meta][resp_host_routable] {
mutate {
add_field => {
"[@meta][resp_host_routable]" => "true"
}
}
}
if [@meta][orig_host_routable] == "true" {
geoip {
source => "id_orig_h"
target => "[@meta][geoip_orig]"
}
}
if [@meta][resp_host_routable] == "true" {
geoip {
source => "id_resp_h"
target => "[@meta][geoip_resp]"
}
}
mutate {
convert => {
"id_orig_p" => "integer"
"id_resp_p" => "integer"
}
rename => {
"uid" => "[@meta][id]"
"id_orig_h" => "[@meta][orig_host]"
"id_orig_p" => "[@meta][orig_port]"
"id_resp_h" => "[@meta][resp_host]"
"id_resp_p" => "[@meta][resp_port]"
}
}
}
# Tie related records
mutate { add_field => { "[@meta][related_ids]" => [] }}
if [uid] {
mutate { merge => {"[@meta][related_ids]" => "uid" }}
}
if [fuid] {
mutate { merge => {"[@meta][related_ids]" => "fuid" }}
}
if [related_fuids] {
mutate { merge => { "[@meta][related_ids]" => "related_fuids" }}
}
if [orig_fuids] {
mutate { merge => { "[@meta][related_ids]" => "orig_fuids" }}
}
if [resp_fuids] {
mutate { merge => { "[@meta][related_ids]" => "resp_fuids" }}
}
if [conn_uids] {
mutate { merge => { "[@meta][related_ids]" => "conn_uids" }}
}
if [cert_chain_fuids] {
mutate { merge => { "[@meta][related_ids]" => "cert_chain_fuids" }}
}
mutate { add_field => {"[@metadata][stage]" => "broraw_kafka" } }
}
}
output {
kafka {
codec => json
topic_id => "bro-clean"
bootstrap_servers =>
}
if [@meta][event_type] == "network" {
elasticsearch {
hosts =>
index => 'bro-network-%{[@meta][stream]}-%{+YYYY.MM.dd}'
}
} else if [@meta][event_type] == "netcontrol" {
elasticsearch {
hosts =>
index => 'bro-netcontrol-%{+YYYY.MM.dd}'
}
} else if [@meta][event_type] == "observations" {
elasticsearch {
hosts =>
index => 'bro-observations-%{+YYYY.MM.dd}'
}
} else if [@meta][event_type] == "miscellaneous" {
elasticsearch {
hosts =>
index => 'bro-miscellaneous-%{+YYYY.MM.dd}'
}
} else if [@meta][event_type] == "detection" {
elasticsearch {
hosts =>
index => 'bro-detection-%{+YYYY.MM.dd}'
} else if [@meta][event_type] == "file" {
elasticsearch {
hosts =>
index => 'bro-file-%{+YYYY.MM.dd}'
}
} else if [@meta][event_type] == "diagnostic" {
elasticsearch {
hosts =>
index => 'bro-diag-%{+YYYY.MM.dd}'
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment