How to aggregate search results over specific fields (Buckets Aggregations), calculate their properties (Metrics Aggregation) and filter buckets on their properties (Pipeline Aggregation).
This is the search scenario (what we would like to catch):
Find out a potential web sweep (an attacker looking for listening HTTP servers in the network). If a single IP try to connect on too amby hosts on the same port, it may indicates a suspicous activity.
Search for all documents with a dest_port
field matching the value 80
over the past 3 days.
"size": 0,
"query": {
"bool": {
"must": [
{
"bool": {
"must": [
{ "match": {"dest_port": 80 } }
]
}
}
],
"filter": [
{
"range": {
"@timestamp": {
"gte": "now-3d/d",
"lte": "now-1d/d"
}
}
}
]
}
},
Aggregate using a Term Aggregation on the fields src_ip.keyword
and dest_ip.keyword
. The src_ip
bucket will be named attacker_ip and the dest_ip
bucket will be known as target_ip.
We obtain the set of target_ip
by attacker_ip
.
"aggs": {
"attacker_ip": {
"terms": {
"field": "src_ip.keyword"
},
"aggs": {
"target_ip": {
"terms": {
"field": "dest_ip.keyword"
}
}
This calculation is made using a Cardinality Aggregation. The aggregation is performed under the attacker_ip
one.
We obtain the number of target_ip
by attacker_ip
.
"target_ip_count": {
"cardinality": {
"field": "dest_ip.keyword"
}
}
This filter is made using a Pipeline Aggregation. The aggregation is performed under the attacker_ip
one, after the target_ip
and target_ip_count
ones.
We keep only the attacker_ip
buckets which have a target_ip_count
value upper than 10.
"target_ip_bucket_filter": {
"bucket_selector": {
"buckets_path": {
"totalTargetIP": "target_ip_count"
},
"script": "params.totalTargetIP > 10"
}
}
{
"size": 0,
"query": {
"bool": {
"must": [
{
"bool": {
"must": [
{ "match": {"dest_port": 80 } }
]
}
}
],
"filter": [
{
"range": {
"@timestamp": {
"gte": "now-3d/d",
"lte": "now-1d/d"
}
}
}
]
}
},
"aggs": {
"attacker_ip": {
"terms": {
"field": "src_ip.keyword"
},
"aggs": {
"target_ip": {
"terms": {
"field": "dest_ip.keyword"
}
},
"target_ip_count": {
"cardinality": {
"field": "dest_ip.keyword"
}
},
"target_ip_bucket_filter": {
"bucket_selector": {
"buckets_path": {
"totalTargetIP": "target_ip_count"
},
"script": "params.totalTargetIP > 10"
}
}
}
}
}
}
Merged while Gist into a single Markdown document.
Updated code description.