Filebeat > Logstash > Kafka 使用步骤
使用
filebeat
收集日志到logstash
中,再由logstash
再生产数据到kafka
,如果kafka
那边没有kerberos
认证也可以直接收集到kafka
中。
使用方法
PS:截至目前时间2018-09-02
为止logstash的版本为6.4.0
有Integer转Long的Bug,官方说预计会在本月修复,所以这里先降低一下logstash
的版本,暂时使用6.2.2
。
下载logstash
cd /usr/share
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.2.tar.gz
mv logstash-6.2.2 logstash
Filebeat
安装filebeat
yum install filebeat
filebeat配置文件 /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
publish_async: true
paths:
## 日志位置
- /etc/filebeat/logs/*.log
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 3
output.logstash:
hosts: ["127.0.0.1:5044"]
processors:
- drop_fields:
fields: []
Logstash
安装kafka
插件
/usr/share/logstash/bin/logstash-plugin install logstash-output-kafka
配置logstash
mkdir /etc/logstash
logstash pipelines文件 /etc/logstash/pipelines.yml
- pipeline.id: another_test
queue.type: persisted
path.config: "/etc/logstash/conf.d/*.conf"
pipeline.id: mylogs
pipeline.workers: 8
pipeline.batch.size: 1000
pipeline.batch.delay: 50
pipeline.output.workers: 1
logstash 配置文件 /etc/logstash/logstash.yml
pipeline:
batch:
size: 125
delay: 5
path.data: /var/lib/logstash
pipeline.workers: 8
pipeline.batch.size: 1000
path.logs: /var/log/logstash
jvm 配置 /etc/logstash/jvm.options
-Xms10g
-Xmx10g
-XX:+UseParNewGC
-XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly
-Djava.awt.headless=true
-Dfile.encoding=UTF-8
-Djruby.compile.invokedynamic=true
-Djruby.jit.threshold=0
-XX:+HeapDumpOnOutOfMemoryError
-Djava.security.egd=file:/dev/urandom
/etc/logstash/conf.d/kafka/kafka_jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
principal="admin/admin@demo.com"
useKeyTab=true
serviceName="kafka"
keyTab="/etc/security/keytabs/admin.keytab"
client=true;
};
kafka认证文件 /etc/logstash/conf.d/logstash.conf
input {
beats{
port => 5044
}
}
output{
kafka {
topic_id => "test_hello"
bootstrap_servers => "t2.demo.com:9092,t3.demo.com:9092,t4.demo.com:9092"
compression_type => "snappy"
jaas_path => "/etc/logstash/conf.d/kafka/kafka_jaas.conf"
kerberos_config => "/etc/krb5.conf"
##名字一定要跟kafka_jaas.conf里面的serviceName一样
sasl_kerberos_service_name => "kafka"
security_protocol => "SASL_PLAINTEXT"
client_id => "log.demo.com"
}
}
运行服务
运行logstash
/usr/share/logstash/bin/logstash --path.settings /etc/logstash
运行filebeat
systemctl start filebeat
日志
logstash 正常运行日志
[root@log logstash]# /usr/share/logstash/bin/logstash --path.settings /etc/logstash
Could not find log4j2 configuration at path /etc/logstash/log4j2.properties. Using default config which logs errors to the console
[INFO ] 2018-09-02 23:33:27.758 [main] scaffold - Initializing module {:module_name=>"netflow", :directory=>"/usr/share/logstash/modules/netflow/configuration"}
[INFO ] 2018-09-02 23:33:27.772 [main] scaffold - Initializing module {:module_name=>"fb_apache", :directory=>"/usr/share/logstash/modules/fb_apache/configuration"}
[INFO ] 2018-09-02 23:33:28.657 [LogStash::Runner] runner - Starting Logstash {"logstash.version"=>"6.2.2"}
[INFO ] 2018-09-02 23:33:29.170 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
[INFO ] 2018-09-02 23:33:31.075 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] pipeline - Starting pipeline {:pipeline_id=>"mylogs", "pipeline.workers"=>8, "pipeline.batch.size"=>1000, "pipeline.batch.delay"=>50}
[INFO ] 2018-09-02 23:33:31.199 [[mylogs]-pipeline-manager] ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [t2.demo.com:9092, t3.demo.com:9092, t4.demo.com:9092]
buffer.memory = 33554432
client.id = log.demo.com
compression.type = snappy
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 10
reconnect.backoff.ms = 10
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
[INFO ] 2018-09-02 23:33:31.311 [[mylogs]-pipeline-manager] AbstractLogin - Successfully logged in.
[INFO ] 2018-09-02 23:33:31.315 [kafka-kerberos-refresh-thread-admin/admin@demo.com] KerberosLogin - [Principal=admin/admin@demo.com]: TGT refresh thread started.
[INFO ] 2018-09-02 23:33:31.317 [kafka-kerberos-refresh-thread-admin/admin@demo.com] KerberosLogin - [Principal=admin/admin@demo.com]: TGT valid starting at: 2018-09-02T23:33:31.000+0800
[INFO ] 2018-09-02 23:33:31.317 [kafka-kerberos-refresh-thread-admin/admin@demo.com] KerberosLogin - [Principal=admin/admin@demo.com]: TGT expires: 2018-09-03T23:33:31.000+0800
[INFO ] 2018-09-02 23:33:31.317 [kafka-kerberos-refresh-thread-admin/admin@demo.com] KerberosLogin - [Principal=admin/admin@demo.com]: TGT refresh sleeping until: 2018-09-03T19:41:17.691+0800
[INFO ] 2018-09-02 23:33:31.338 [[mylogs]-pipeline-manager] AppInfoParser - Kafka version : 1.0.0
[INFO ] 2018-09-02 23:33:31.338 [[mylogs]-pipeline-manager] AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
[INFO ] 2018-09-02 23:33:31.653 [[mylogs]-pipeline-manager] beats - Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[INFO ] 2018-09-02 23:33:31.705 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] pipeline - Pipeline started succesfully {:pipeline_id=>"mylogs", :thread=>"#<Thread:0x49c5d932 run>"}
[INFO ] 2018-09-02 23:33:31.752 [[mylogs]<beats] Server - Starting server on port: 5044
[INFO ] 2018-09-02 23:33:31.780 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] agent - Pipelines running {:count=>1, :pipelines=>["mylogs"]}
网友评论