Last active June 24, 2021 18:08
Logstash configuration file.
# #####################################################################
# DESC: Logstash configuration file. Typically forwarding logs to
# Elasticsearch instance.
# #####################################################################
# Where to get input
input {
# Get input from standard input device/interface
stdin {
type => "stdin-type"
# Get input from Apache logs
file {
type => "apache-access"
path => [ "/var/log/apache/access.log" ]
start_position => "beginning"
# Get input from Nginx logs
file {
type => "nginx-access"
path => [ "/var/log/nginx/access.log" ]
# Get input from Tomcat logs
file {
type => "tomcat"
path => [ "/var/log/tomcat/catalina.out" ]
codec => multiline {
pattern => "(^\d+\serror)|(^.+Exception: .+)|(^\s+at .+)|(^\s+... \d+ more)|(^\s*Caused by:.+)"
what => "previous"
# Get input from Kafka queue
kafka {
zk_connect => "kafka1.mmlac.local:2181"
group_id => "logs"
topic_id => "logstash"
reset_beginning => false
consumer_threads => 1
consumer_restart_on_error => true
consumer_restart_sleep_ms => 100
decorate_events => true
# Get input over TCP port 5000 as JSON lines
tcp {
type => "json"
port => 5000
codec => json_lines
# Get input from syslog file
file {
type => "syslog"
path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ]
start_position => "beginning"
# Get input from syslog over port 5010
tcp {
port => 5010
type => "syslog"
udp {
port => 5010
type => "syslog"
# Get input from syslog (RFC3164) over 5015
syslog {
port => 5015
# Get Lumberjack over TCP port 5020
lumberjack {
port => 5020
ssl_certificate => "/mnt/logstash-forwarder/keys/logstash-forwarder.crt"
ssl_key => "/mnt/logstash-forwarder/keys/logstash-forwarder.key"
type => "syslog"
# Get application logs via log4j over TCP port 5025
log4j {
port => 5025
# Some Filtering
filter {
# SYSLOG filter
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
if !("_grokparsefailure" in [tags]) {
mutate {
replace => [ "message", "%{syslog_message}" ]
mutate {
remove_field => [ "syslog_message" ]
# Remove spurious fields that have names changed or been aggregated
mutate {
remove_field => [ "syslog_hostname", "syslog_timestamp" ]
# Apache Access Log filter
if [type] == "apache-access" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
# Nginx Access Log filter
if [type] == "nginx-access" {
grok {
match => { "message" => "%{NGINXACESS}" }
# Tomcat filter
if [type] == "tomcat" and [message] !~ /(.+)/ {
drop { }
# Docker filter
if [type] == "docker" {
json {
source => "message"
mutate {
rename => [ "log", "message" ]
date {
match => [ "time", "ISO8601" ]
# Where to send output
output {
# Send output to standard output device/interface
stdout {
codec => rubydebug
# Parse failed syslog messages
if [type] == "syslog" and "_grokparsefailure" in [tags] {
file { path => "/var/log/failed_syslog_events-%{+YYYY-MM-dd}" }
# Send output to Elasticsearch over HTTP interface.
elasticsearch {
protocol => 'http'
host => ES_HOST
port => ES_PORT
cluster => ES_CLUSTER
# Send output to Kafka topic
kafka {
codec => plain {
format => "%{message}"
# Send output metrics to statsd for statistics aggregation
statsd {
# Count one hit every event by response
increment => "apache.response.%{response}"
# Use the 'bytes' field from the apache log as the count value.
count => [ "apache.bytes", "%{bytes}" ]
statsd {
host => ''
count => [ "tomcat.bytes", "%{bytes}" ]
statsd {
host => ''
increment => "tomcat.response.%{response}"
statsd {
host => ''
timing => [ "tomcat.indextime", "%{indextime}" ]
