Skip to content

Instantly share code, notes, and snippets.

Created March 19, 2015 18:51
Show Gist options
  • Save jasisk/c17583dd2dfd99200f95 to your computer and use it in GitHub Desktop.
Save jasisk/c17583dd2dfd99200f95 to your computer and use it in GitHub Desktop.
Heka aggregation question
url = "http://localhost:8000"
ticker_interval = 2
success_severity = 6
error_severity = 1
decoder = "KappaStatusDecoder"
type = "SandboxDecoder"
script_type = "lua"
filename = "lua_decoders/kappa_status.lua"
type = "KappaStatus"
type = "SandboxFilter"
script_type = "lua"
filename = "lua_filters/kappa_aggregate.lua"
message_matcher = "Type == 'KappaStatus'"
ticker_interval = 5
sec_per_row = 6
rows = 40
alert_throttle = 1
anomaly_config = 'mww_nonparametric("4xx", 4, 2, 5, 0.55)'
encoder = "RstEncoder"
message_matcher = "Type == 'heka.sandbox-output' && Fields[payload_type] == 'alert'"
local annotation = require "annotation"
local anomaly = require "anomaly"
local alert = require "alert"
require "circular_buffer"
require "string"
local hosts = {}
local cbufs = {}
local titles = {}
local hosts_size = 0
local rows = read_config("rows") or 288 -- 24 hours @ 5 mins
local sec_per_row = read_config("sec_per_row") or 300 -- 5 mins
local alert_throttle = read_config("alert_throttle") or 5 * 60 * 1e9 -- 5 mins
local anomaly_config = anomaly.parse_config(read_config("anomaly_config"))
function init_cb()
local cb =, 6, sec_per_row)
for i=1,5 do cb:set_header(i, i*100) end
cb:set_header(6, "Unknown")
return cb
function process_message ()
local ts = read_message("Timestamp")
local hostname = read_message("Hostname")
local status = read_message("Fields[status]")
local count = read_message("Fields[count]")
local host = hosts[hostname]
if not host then
hosts_size = hosts_size + 1
hosts[hostname] = {last_update = ts, index = hosts_size}
titles[hosts_size] = string.format("%s http status", hostname)
annotation.set_prune(titles[hosts_size], rows * sec_per_row * 1e9)
host = hosts[hostname]
local statuses = cbufs[host.index]
if not statuses then
cbufs[host.index] = init_cb()
statuses = cbufs[host.index]
local col = status/100
if col >= 1 and col < 6 then
statuses:add(ts, col, count) -- col will be truncated to an int
statuses:add(ts, 6, count)
return 0
function timer_event(ns)
for host, meta in pairs(hosts) do
local cbuf = cbufs[meta.index]
local title = titles[meta.index]
if anomaly_config then
if not alert.throttled(ns) then
local msg4xx, annos4xx = anomaly.detect(ns, "4xx", cbuf, anomaly_config)
if msg4xx then
annotation.concat(title, annos4xx)
local sum, rows = cbuf:compute("sum", 4, ns - (1 * 60 * 1e9))
alert.queue(ns, string.format("*%s* has seen *%d* 4xx statuses in the last *1* minute", host, sum))
inject_payload("cbuf", title, annotation.prune(title, ns), cbuf)
inject_payload("cbuf", title, cbuf)
local alert = require("alert")
require "string"
require "cjson"
require "table"
local type = read_config("type")
local previously_error = false
function process_message()
local raw_message = read_message("Payload")
local original_type = read_message("Type")
if original_type == 'heka.httpinput.error' then
-- get the query out of the payload because it might contain sensitive
-- information like credentials
local logger = read_message("Logger")
logger = string.gsub(logger, "%p", "%%%1")
local new_payload = string.gsub(raw_message, logger, "[REDACTED]")
local m = {
Payload = new_payload,
Fields = {
previous_error = previously_error
inject_message(m) -- change the original payload ...
previously_error = true
return 0
previously_error = false
local ok, json = pcall(cjson.decode, raw_message)
if not ok then
return -1 -- if we can't parse the body, that's a problem
if json[1] ~= nil then
local points = json[1]["points"]
for i, row in ipairs(points) do
local count, host, status = row[2], row[3], row[4]
local ok, json = pcall(cjson.encode, row)
if not ok then
json = table.concat(row, ':')
local msg = {
Type = type,
Payload = json,
Hostname = host,
Fields = {
count = count,
status = status
-- inject a new message into the pipeline
if not pcall(inject_message, msg) then return -1 end
return 0
var http = require('http');
var server = http.createServer();
var count = 1;
server.on('request', function (req, res) {
var x = count % 43 ? 1 : count + 43;
res.writeHead(200, {'content-type': 'application/json'});
res.end(JSON.stringify([{points: [[1, x, 'A', 404]]}]));
console.log('Req', count++, x);
server.listen(8000, function () {
console.log('Listening on %d ...', server.address().port);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment