1 安装filebeat
- 系统:CentOS 6.x-7.x
- filebeat版本:filebeat-6.5.3-linux-x86_64
- 下载链接:https://www.elastic.co/downloads/past-releases
2 安装过程
选择安装目录:例如安装在/usr/local/或者/opt/下都可以。
$ tar -zxvf filebeat-6.5.3-linux-x86_64.tar.gz -C /opt/
$ cd /opt
创建一个软链接:
$ ln -s filebeat-6.5.3-linux-x86_64 filebeat
filebeat的配置很简单,只需要指定input和output就可以了。
3 要求
由于kafka server高低版本的客户端API区别较大,因此推荐同时使用高版本的filebeat和kafka server。注意
高版本的filebeat配置使用低版本的kafka server会出现kafka server接受不到消息的情况。这里我使用的kafka server版本是:2.12-0.11.0.3,可参考快速搭建kafka
4 配置
编辑filebeat安装目录下 filebeat.yml
文件:
$ cd /opt/filebeat
$ vim filebeat.yml
配置Filebeat inputs:
#=========================== Filebeat inputs =============================
filebeat.inputs:
# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.
- type: log
# Change to true to enable this input configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /opt/test/*.log
上面/opt/test/*.log
是我要传输的日志,根据实际情况改成你自己的值。
配置Filebeat outputs:
#================================ Outputs =====================================
output.kafka:
enabled: true
hosts: ["111.11.1.243:9092"]
topic: test
"111.11.1.243:9092"
是我的单机kafka broker,如果你是kafka集群,请用,
分隔。test
是kafka topic,请改成你自己实际情况的值。另外以下这段需要删除:
#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["localhost:9200"]
# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "elastic"
#password: "changeme"
因为我并没有用到Elasticsearch,所以有多个输出在启动filebeat时会报错。到这里filebeat配置kafka就完成了,是不是很简单,让我们启动它测试一下。
5 测试
启动,进入filebeat的安装目录:
$ nohup ./filebeat -c filebeat.yml &
查看是否启动:
$ ps -ef | grep filebeat
opmm 33500 22174 0 17:53 pts/0 00:00:00 ./filebeat -c filebeat.yml
opmm 33537 22174 0 17:53 pts/0 00:00:00 grep filebeat
很好,已经启动了。如果没有启动,请查看启动日志文件nohup.out。
停止:
$ ps -ef | grep filebeat
$ kill -9 ${PID}
随机生成日志脚本:
import pandas as pd
import numpy as np
import time
import random
import json
# ndarray
localIp = np.array(['11.12.11.11','11.13.11.11','11.14.11.11'])
remoteIp = np.array(['11.12.11.11','11.13.11.11','11.14.11.11'])
appkey = np.array(['mbp_ds','hhr','haoda','cashbox'])
userId = np.arange(1,100000)
bizType = np.array(['phone','bank','idcard'])
operateType = np.array(['decrypt','encrypt'])
a1=(2018,4,1,0,0,0,0,0,0) #设置开始日期时间元组(1976-01-01 00:00:00)
a2=(2018,4,30,23,59,59,0,0,0) #设置结束日期时间元组(1990-12-31 23:59:59)
start=time.mktime(a1) #生成开始时间戳
end=time.mktime(a2) #生成结束时间戳
#随机生成时间字符串
def gen_data():
t=random.randint(start,end) #在开始和结束时间戳中随机取出一个
date_touple=time.localtime(t) #将时间戳生成时间元组
rdate=time.strftime('%Y-%m-%d' ,date_touple) #将时间元组转成格式化字符串(1976-05-21)
rtime=time.strftime('%H:%M:%S' ,date_touple)
return rdate,rtime
for i in range(1000000000):
if i % 100000 == 0:
print(i)
localIpStr = np.random.choice(localIp, 1)[0]
remoteIpStr = np.random.choice(remoteIp, 1)[0]
userIdStr = str(np.random.choice(userId, 1)[0])
appkeyStr = np.random.choice(appkey, 1)[0]
bizTypeStr = np.random.choice(bizType, 1)[0]
operateTypeStr = np.random.choice(operateType, 1)[0]
rdate,rtime = gen_data()
j = {'client_ip':remoteIpStr,'server_ip':localIpStr,'appkey':appkeyStr,'user_id':userIdStr,'biz_type':bizTypeStr,'operate_time':rdate+' '+rtime,'operate_type':operateTypeStr}
s = json.dumps(j) + '\n'
filename='/opt/test/'+rdate+'.log'
# 已追加的方式写文件
with open(filename,'a') as f:
f.write(s)
执行这段python脚本,开启一个kafka消费者如果成功消费日志消息:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
{"@timestamp":"2018-12-20T10:04:23.746Z","@metadata":{"beat":"filebeat","type":"doc","version":"6.5.3","topic":"test"},"prospector":{"type":"log"},"input":{"type":"log"},"beat":{"hostname":"SH-app-9-1","version":"6.5.3","name":"SH-app-9-1"},"host":{"name":"SH-app-9-1","architecture":"x86_64","os":{"codename":"Final","platform":"centos","version":"6.8 (Final)","family":"redhat"},"containerized":true},"source":"/opt/appl/test/2018-04-14.log","offset":0,"message":"{\"client_ip\": \"11.12.11.11\", \"server_ip\": \"11.12.11.11\", \"appkey\": \"hhr\", \"user_id\": \"96311\", \"biz_type\": \"idcard\", \"operate_time\": \"2018-04-14 12:30:50\", \"operate_type\": \"encrypt\"}"}
哈哈,大功告成。注
上面这段脚本要适时手动停止,因为它是个死循环,如果忘记手动停止那么就杯具了,我就是这样把机器写宕机的。
网友评论