使用技术:Filebeat + kafka + logstash + elasticsearch + kibana
(日志采集) (传输) (数据收集整理) (数据存储搜索分析) (数据分析可视化)
这里使用slf4j+logback的组合,首先需要注意是否与某些依赖继承的log4j冲突,需要把它exculusion,如果存在,slf4j会报找到多个binding的实现,然后会自己选取一种。
slf4J 是 简单日志门面(simple logger facade),并没有具体的日志实现,只提供一组接口,这样用户就无需,因为不同的底层实现而编写不同的代码。
首先需要依赖logback-classic和slf4j-api。如果使用的是log4j还需要slf4j-log4j12 作为连接。
对于logback其配置文件为resource下的logback.xml。
配置例如:
<logger name="ELKLogger" level="DEBUG" additivity="true">
<appender-ref ref="ELK" />
</logger>
<appender name="ELK" class="ch.qos.logback.core.rolling.RollingFileAppender">
<encoder charset="utf-8">
<pattern>[%d{yyyy-MM-dd HH:mm:ss,SSS}] [%-5level] [%logger{36}] [%thread] %m%n
</pattern>
</encoder>
<!--<filter class="ch.qos.logback.classic.filter.LevelFilter">-->
<!--<level>DEBUG</level>-->
<!--<onMatch>ACCEPT</onMatch>-->
<!--<onMismatch>DENY</onMismatch>-->
<!--</filter>-->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${ROOT}/AAA_QQQ_%d{}_%i.log</fileNamePattern>
<maxHistory>${MAXHISTORY}</maxHistory>
<timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>${FILESIZE}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
</appender>
private static Logger logger=LoggerFactory.getLogger("ELKLogger");//这里的name和<logger name="">相同说明使用这个logger的配置。这个名字可以是包名,这样包下类的所有logger就都用这个,代码里getLogger("classA.class")就行。<appender>代表一种输出配置,通过名字在配置文件中进行映射,在其中可以对输出目录,输出格式等进行设置
安装
linux:
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.6.0-linux-x86_64.tar.gz
tar xzvf filebeat-6.6.0-linux-x86_64.tar.gz
win:下载解压后,重命名文件夹名字为Filebeat,使用管理员权限打开PowerShell,输入:
PS > cd 'C:\Program Files\Filebeat'
PS C:\Program Files\Filebeat> .\install-service-filebeat.ps1
配置
https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-configuration.html
默认配置文件为filebeat.yml还有一个全选项配置的参考文件filebeat.reference.yml
filebeat.inputs:
- type: log
enabled: true #默认为false 记得改为true
paths:
- /var/log/*.log #支持所有Glob的匹配形式,从服务器根目录开始的绝对路径,注意-后面有空格。如果没有-空格则为filebeat文件夹下的相对路径。
#- c:\programdata\elasticsearch\logs\*
#/var/log/*/*.log. 不匹配/var/log/本身下的.log文件
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
# message topic selection + partitioning
topic: '%{[fields.log_topic]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
output.elasticsearch:
hosts: ["myEShost:9200"]
username: "filebeat_internal"
password: "YOUR_PASSWORD"
setup.kibana:
host: "mykibanahost:5601"
username: "my_kibana_user"
password: "YOUR_PASSWORD"
sudo ./filebeat -e -c filebeat.yml -d "publish" #启动filebeat
sudo rm data/registry # 重启前删除注册表的缓存信息,让其重新加载完整的信息。
用于对大量的消息日志进行缓冲
-
broker:一个kafka集群包含一个或多个服务器,每个服务器被称为broker,用于保存producer发送的消息。
-
controller leader:Kafka集群中有多个broker,当每个broker启动的时候,都会创建Kafka Controller对象,然后去zookeeper竞争一个Controller leader节点。leader会向zookeeper上注册Watcher,其他broker几乎不用监听zookeeper的状态变化。负责管理整个集群中分区和副本的状态.
-
producer:消息生产者,就是向kafka broker发消息的客户端。
-
topic :每条被发送到broker的消息都有一个逻辑上的类别,比如这个是服务a的日志,那个是服务b的日志,这个类别被称为topic。
-
partition:一个topic中的消息由多个分区存储的,每个分区被称为一个partition。每个partition中保证消息有序。
-
replica:即replication,由replication-factor设定,大于等于1,。这些备份必须在不同的broker上。即replica数要小于等于broker数。(注意,虽然叫做备份,但是并没有本体的概念,类似本体的概念是下面的leader!)
-
leader:一个partition的所有replica中只有一个leader,用于读取和写入消息。
-
follower: 一个partition的所有replica中除了leader被称为follower,只用于备份数据,实现高可用。
-
ISR:即in-sync Replica,partition的同步机制。每个leader维护一份与其基本保持同步的follower列表。如果一个follower中的消息比起leader落后太多,则会被从列表中被删除。当所有列表中的follower向leader发送ACK——表明已经从leader中主动拉取数据并备份完毕之后——leader才进行commit提交这次事务,表示已经从producer接收到数据。
-
ACK:通过request.required.acks参数设置级别。0表示producer发送消息过去就完事,不关心broker是否处理成功;1表示producer发送消息过去,leader成功接受到就完事。-1表示producer发送过去,leader及其ISR中所有follower成功才完事,即上条所述。
-
offset:偏移量。kafka为每条在partition的消息保存一个偏移量offset,表示现在已经消费到的位置。保存在一个名叫__consumeroffsets__ 的topic中。
-
consumer:消息读取客户端,通过订阅topic的消息从broker拉取消息。
-
consumer gruop:实际情况是以一个consumer gruop来订阅topic的,一个消费组里面可以有多个consumer。topic中每个partition只会对应的被consumer gruop中的某一个consumer所消费,用于保持消息的有序性。但是可以被多个consumer gruop同时订阅。
-
消息投递语义:At most once:最多一次,消息可能会丢失,但不会重复。先获取数据,再commit offset,最后进行业务处理。
At least once:最少一次,消息不会丢失,可能会重复。先获取数据,再进行业务处理,业务处理成功后commit offset(常用)
Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11中实现,仅限于下游也是kafka)。
下载安装LogStash(?)
配置
主要有三部分组成: input、filte、output
# The # character at the beginning of a line indicates a comment. Use
# comments to describe your configuration.
input {
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
}
在logstash目录下新建first-pipeline.conf中进行配置。
在input端将使用logstash-input-kafka 以此从kafka topic读入数据。 在input中填入kafka的设置例如:
kafka{
bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
client_id => "test"
group_id => "test"
auto_offset_reset => "latest" //从最新的偏移量开始消费
consumer_threads => 5
}
详细参数https://www.elastic.co/guide/en/logstash/6.6/plugins-inputs-kafka.html 注意版本!老版本使用zookeeper地址,新版本使用kafka实例地址。
验证配置:
bin/logstash -f first-pipeline.conf --config.test_and_exit
启动logstash:
bin/logstash -f first-pipeline.conf --config.reload.automatic
警告中的pipelines.yml用于一个实例中的multi-pipeline配置。
在filter中可以使用 grok filter plugin,例如设置:
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
更多关于grok的介绍https://www.elastic.co/guide/en/logstash/6.6/plugins-filters-grok.html
在output中使用elasticsearch例如设置:
elasticsearch {
hosts => [ "localhost:9200" ]
}
在三个部分中都可以配置多个插件 具体插件的选择可以见文档都是你xxx{}嵌在每个部分中。 样例
input {
kafka{
bootstrap_servers => ""
topics_pattern => "log-.*"
type => "log"
codec => json
}
}
filter {
if [fields][log_topic] == "log-trade" { //kafka中的topic名
grok {
match => {"message" => "(?<UID>(?<=UID><).*?(?=><IUID)).*(?<ReqTime>(?<=T1><).*?(?=><T4)).*(?<ResTime>(?<=T4><).*?(?=><RAP)) }
date{
match=>["ReqTime","yyyyMMddHHmmssSSS"]
target=>"@timestamp"
timezone=>"+08:00"
}
}
if [fields][log_topic] == "log-exception" {
grok {
match => {"message" => "(?<ExceptionID>(?<=id:).*)"}
}
}
}
output {
elasticsearch {
hosts => [""]
action => "index"
codec => line{format => "%{message}"}
index=>"%{[fields][log_topic]}-%{+YYYY-MM-dd}"
template_name => "log*" //随意
manage_template => true
template_overwrite => true
template => "/ifspt/logstash/logstash-6.6.0/template/log.json" //使用index模版文件,启动时会被自动安装到es.该文件中的template名字必须能匹配index名字。最简单的模版就仅仅是指定
名字。使用模版后,可以在高版本的kibana中 直接管理index!
}
stdout { codec => rubydebug }
}
cluster: 一个cluster由一个或多个node组成,由不同的名字所区别,默认为"elasticsearch"。一个cluster只有一个node也是有效的。
node: 一个node就是一个服务器,用于存储处理数据。,由不同的名字所区别,默认为随机的UUID。如果当前没有任何node在运行,启动一个node,它会自动称为一个名字为"elasticsearch"的single-node cluster。
index: 是有一些相似特征的ducument的集合。由不同的名字所区别,必须全部小写。
document: 是可以被索引的最小单位,例如单个的产品document,单个的客户document,以JSON格式表示。
shard: 当一个index数据量太大是,可以分为多个shard在不同node上,每个shard都将是独立且功能完整的。为了保证高可用,Elasticsearch可以对shard进行备份,称为replica shards。显然备份不会和原数据在同一个node上,这还增加了横向扩展的能力。这些都在index创建的时候进行定义,当然之后也可以动态修改。默认每个index有5个shard,每个shard有一份replica。
./elasticsearch -Ecluster.name=my_cluster_name -Enode.name=my_node_name
可以启动时设置名字。
curl -X GET "localhost:9200/_cat/health?v"
查看cluster情况,默认为9200端口。
curl -X GET "localhost:9200/_cat/nodes?v"
查看node情况。
curl -X GET "localhost:9200/_cat/indices?v"
查看index情况。
curl -X PUT "localhost:9200/customer?pretty"
创建一个名为customer的index pretty表示漂亮的打印json格式。
curl -X GET "localhost:9200/_cat/indices?v"
再查看一下。
curl -X PUT "localhost:9200/customer/_doc/1?pretty" -H 'Content-Type: application/json' -d'
{
"name": "John Doe"
}
在ID为customer的index中放入一个ID为1的ducument。如果没有ID为customer的index,则会自动创建。
curl -X GET "localhost:9200/customer/_doc/1?pretty"
查看该document。