Skip to content

Instantly share code, notes, and snippets.

@johanek
Created February 21, 2019 19:13
Show Gist options
  • Save johanek/a2ae48ac61b834371c1a7834c264edad to your computer and use it in GitHub Desktop.
Save johanek/a2ae48ac61b834371c1a7834c264edad to your computer and use it in GitHub Desktop.
Sensu Kafka Extension
{
"kafka": {
"servers": [
"kafka1:9092",
"kafka2:9092"
],
"topic": "sensu"
}
}
require 'sensu-plugin/check/cli'
require 'json'
require 'kafka'
module Sensu
module Extension
class KafkaExtension < Handler
def name
'kafka'
end
def description
'Send handled events to kafka'
end
def post_init
@kafka_config = @settings['kafka']
logger.info("Kafka extension initialized with servers #{@kafka_config[:servers]} and topic #{@kafka_config[:topic]}")
@kafka = Kafka.new(@kafka_config[:servers])
@producer = @kafka.async_producer(
delivery_threshold: 10,
delivery_interval: 10
)
end
def filter_occurrences(event)
event = JSON.load(event)
check = event[:check]
check_occurrences = str2int(check[:occurrences]) || 1
event_occurrences = str2int(event[:occurrences]) || 1
refresh = str2int(check[:refresh]) || 1800
if event[:action] == 'resolve' && event[:occurrences_watermark] >= check_occurrences
return true
else
if event_occurrences < check_occurrences
return false
end
if event_occurrences > check_occurrences && ['create', 'flapping'].include?(event[:action])
interval = str2int(check[:interval]) || 60
count = refresh.fdiv(interval).to_i
unless count == 0 || (event_occurrences - check_occurrences) % count == 0
return false
end
end
end
true
end
def str2int(x)
if x.is_a?(Integer)
return x
elsif x.is_a?(String)
begin
result = Integer(x)
rescue
result = nil
end
end
end
def run(event)
@producer.produce(event.to_json, topic: @kafka_config[:topic]) if filter_occurrences(event)
yield('', 0)
end
def logger
Sensu::Logger.get
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment