美文网首页
filebeat利用kafka进行日志实时传输

filebeat利用kafka进行日志实时传输

作者: 一ke大白菜 | 来源:发表于2018-12-20 18:13 被阅读285次

1 安装filebeat

2 安装过程

选择安装目录:例如安装在/usr/local/或者/opt/下都可以。

$ tar -zxvf filebeat-6.5.3-linux-x86_64.tar.gz -C /opt/
$ cd /opt

创建一个软链接:

$ ln -s filebeat-6.5.3-linux-x86_64 filebeat

filebeat的配置很简单,只需要指定input和output就可以了。

3 要求

由于kafka server高低版本的客户端API区别较大,因此推荐同时使用高版本的filebeat和kafka server。注意高版本的filebeat配置使用低版本的kafka server会出现kafka server接受不到消息的情况。这里我使用的kafka server版本是:2.12-0.11.0.3,可参考快速搭建kafka

4 配置

编辑filebeat安装目录下 filebeat.yml文件:

$ cd /opt/filebeat
$ vim filebeat.yml

配置Filebeat inputs:

#=========================== Filebeat inputs =============================

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

# Change to true to enable this input configuration.
enabled: true

# Paths that should be crawled and fetched. Glob based paths.
paths:
- /opt/test/*.log

上面/opt/test/*.log是我要传输的日志,根据实际情况改成你自己的值。
配置Filebeat outputs:

#================================ Outputs =====================================
output.kafka:
  enabled: true
  hosts: ["111.11.1.243:9092"]
  topic: test

"111.11.1.243:9092"是我的单机kafka broker,如果你是kafka集群,请用,分隔。test是kafka topic,请改成你自己实际情况的值。另外以下这段需要删除:

#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

因为我并没有用到Elasticsearch,所以有多个输出在启动filebeat时会报错。到这里filebeat配置kafka就完成了,是不是很简单,让我们启动它测试一下。

5 测试

启动,进入filebeat的安装目录:

$ nohup ./filebeat -c filebeat.yml &

查看是否启动:

$ ps -ef | grep filebeat
opmm     33500 22174  0 17:53 pts/0    00:00:00 ./filebeat -c filebeat.yml
opmm     33537 22174  0 17:53 pts/0    00:00:00 grep filebeat

很好,已经启动了。如果没有启动,请查看启动日志文件nohup.out。
停止:

$ ps -ef | grep filebeat
$ kill -9 ${PID}

随机生成日志脚本:

import pandas as pd
import numpy as np
import time
import random
import json

# ndarray
localIp = np.array(['11.12.11.11','11.13.11.11','11.14.11.11'])
remoteIp = np.array(['11.12.11.11','11.13.11.11','11.14.11.11'])
appkey = np.array(['mbp_ds','hhr','haoda','cashbox'])
userId = np.arange(1,100000)
bizType = np.array(['phone','bank','idcard'])
operateType = np.array(['decrypt','encrypt'])

a1=(2018,4,1,0,0,0,0,0,0)        #设置开始日期时间元组(1976-01-01 00:00:00)
a2=(2018,4,30,23,59,59,0,0,0)    #设置结束日期时间元组(1990-12-31 23:59:59)
start=time.mktime(a1)    #生成开始时间戳
end=time.mktime(a2)      #生成结束时间戳

#随机生成时间字符串
def gen_data():
    t=random.randint(start,end)    #在开始和结束时间戳中随机取出一个
    date_touple=time.localtime(t)  #将时间戳生成时间元组
    rdate=time.strftime('%Y-%m-%d' ,date_touple)  #将时间元组转成格式化字符串(1976-05-21)
    rtime=time.strftime('%H:%M:%S' ,date_touple)
    return rdate,rtime

for i in range(1000000000):
    if i % 100000 == 0:
        print(i)
    localIpStr = np.random.choice(localIp, 1)[0]
    remoteIpStr = np.random.choice(remoteIp, 1)[0]
    userIdStr = str(np.random.choice(userId, 1)[0])
    appkeyStr = np.random.choice(appkey, 1)[0]
    bizTypeStr = np.random.choice(bizType, 1)[0]
    operateTypeStr = np.random.choice(operateType, 1)[0]
    rdate,rtime = gen_data()
    j = {'client_ip':remoteIpStr,'server_ip':localIpStr,'appkey':appkeyStr,'user_id':userIdStr,'biz_type':bizTypeStr,'operate_time':rdate+' '+rtime,'operate_type':operateTypeStr}
    s = json.dumps(j) + '\n'
    filename='/opt/test/'+rdate+'.log'
    # 已追加的方式写文件
    with open(filename,'a') as f: 
        f.write(s)

执行这段python脚本,开启一个kafka消费者如果成功消费日志消息:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
{"@timestamp":"2018-12-20T10:04:23.746Z","@metadata":{"beat":"filebeat","type":"doc","version":"6.5.3","topic":"test"},"prospector":{"type":"log"},"input":{"type":"log"},"beat":{"hostname":"SH-app-9-1","version":"6.5.3","name":"SH-app-9-1"},"host":{"name":"SH-app-9-1","architecture":"x86_64","os":{"codename":"Final","platform":"centos","version":"6.8 (Final)","family":"redhat"},"containerized":true},"source":"/opt/appl/test/2018-04-14.log","offset":0,"message":"{\"client_ip\": \"11.12.11.11\", \"server_ip\": \"11.12.11.11\", \"appkey\": \"hhr\", \"user_id\": \"96311\", \"biz_type\": \"idcard\", \"operate_time\": \"2018-04-14 12:30:50\", \"operate_type\": \"encrypt\"}"}

哈哈,大功告成。上面这段脚本要适时手动停止,因为它是个死循环,如果忘记手动停止那么就杯具了,我就是这样把机器写宕机的。

相关文章

网友评论

      本文标题:filebeat利用kafka进行日志实时传输

      本文链接:https://www.haomeiwen.com/subject/taglkqtx.html