阅读本文需要约 5 分钟。
前言
本文主要涉及 Beats 和 Pulsar 服务的搭建、测试、数据的发送及消费。通过本篇文章,能够对 Beats 如何发送数据到 Pulsar,以及如何使用 Pulsar 对数据进行消费有一个比较全面的了解。
关于 Apache Pulsar 和 Elastic Beats
Apache Pulsar 是一个分布式消息发布订阅系统。Elastic Beats 集合了多种单一用途数据采集器。本文针对各种 Beats 的使用做了简要介绍。
Elastic Beats 包括 Filebeat、Metricbeat、Functionbeat、Winlogbeat、Journalbeat 和 Auditbeat,本项目开发的 Output 适用于以上所有 Beat。具体使用方法,后面会逐步说明。本次试验以 Filebeat 的使用为例。
环境依赖
在进行本次试验之前,你需要在电脑上搭建以下环境依赖,本次试验测试是在 Mac 系统上进行的。
由于 Golang 的包在本地 build 会出现一些网络问题,故本次试验(包括 Build 和 Install 等)均在 Docker 中完成。
初始化网络
在测试中 Beats 到 Pulsar 的网络环境必须是相通的,因此需要提前初始化网络。
docker network create pulsar-beat
该命令会创建一个名为 pulsar-beat 的网络,之后 Pulsar 服务和 Beats 都会连接到该网络。
使用 Docker 镜像编译本项目
下载代码到本地
git clone https://github.com/streamnative/pulsar-beat-output
由于多方面的原因,本项目把一些依赖如 Apache Pulsar Go Client 和 Elastic Beats 都放在了 vendor
文件夹下,可直接在 Docker 中进行 Build。
下载 Docker 镜像并安装依赖
docker pull golang:1.12.4
docker run -it -d --network pulsar-beat --name pulsar-beat-golang golang:1.12.4 /bin/bash
mkdir -p src/github.com/streamnative/pulsar-beat-output
mv pulsar-beat-output/* src/github.com/streamnative/pulsar-beat-output/
docker cp src pulsar-beat-golang:/go/
以上,首先拉取 golang 1.12.4 版本的镜像,然后使用 docker run
命令启动该镜像并命名为 pulsar-beat-golang
。-d
参数使该服务以后台模式运行,-it
以交互模式运行容器,并为容器分配一个伪输入终端。 --network
代表该容器使用 pulsar-beat 这个网络,之后 Pulsar 服务也会加入该网络。
进入容器,安装依赖
docker exec -it pulsar-beat-golang /bin/bash
wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/DEB/apache-pulsar-client.deb
wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/DEB/apache-pulsar-client-dev.deb
dpkg -i apache-pulsar-client.deb
dpkg -i apache-pulsar-client-dev.deb
由于本项目使用到了 Pulsar 的 Golang Client ,该 Client 需要依赖 deb 包,因此在编译之前需要提前装好这些包。其他 Linux 发行版可根据需要在这里进行下载安装。
开始编译
继续上面的步骤,使用如下命令编译本项目:
cd src/github.com/streamnative/pulsar-beat-output/
go build -o filebeat main.go
执行上述命令后,当前目录下会生成一个名为 filebeat 的可执行文件,之后可以使用这个文件进行数据搜集。
安装测试
单机模式下安装 Pulsar
为方便测试,本次使用单机模式安装 Pulsar,并使用 Docker 镜像 apachepulsar/pulsar:2.3.0
启动服务。新开一个窗口,拉取 Pulsar 的镜像并启动服务。这里测试使用的是 2.3.0 的镜像,当然也可以使用最新版 2.4.0 的镜像。
docker pull apachepulsar/pulsar:2.3.0
docker run -d -it --network pulsar-beat -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-beat-standalone apachepulsar/pulsar:2.3.0 bin/pulsar standalone
该服务暴露出 6650 和 8080 端口供外部服务使用,这个容器将 Pulsar 服务的 data
目录挂载在当前目录下,并且将其命名为 pulsar-beat-standalone
,-d
参数使该服务以后台模式运行,-it
是以交互模式运行该容器,并为容器分配一个伪输入终端。
docker logs -f pulsar-flume-standalone
01:55:55.005 [pulsar-web-55-1] INFO org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
01:55:55.013 [pulsar-ordered-OrderedExecutor-1-0-EventThread] INFO org.apache.pulsar.zookeeper.ZooKeeperDataCache - [State:CONNECTED Timeout:30000 sessionid:0x10003995a81000a local:/127.0.0.1:54508 remoteserver:localhost/127.0.0.1:2181 lastZxid:156 xid:41 sent:41 recv:43 queuedpkts:0 pendingresp:0 queuedevents:0] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDataChanged path:/admin/policies/public/default
01:55:55.014 [pulsar-web-55-1] INFO org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Successfully updated the replication clusters on namespace public/default
01:55:55.016 [pulsar-web-55-1] INFO org.eclipse.jetty.server.RequestLog - 172.17.0.3 - - [08/May/2019:01:55:55 +0000] "POST /admin/v2/namespaces/public/default/replication HTTP/1.1" 204 0 "-" "Jersey/2.27 (HttpUrlConnection 1.8.0_181)" 15
使用上述命令查看 Docker 日志,若出现以上日志信息,说明 Pulsar 在单机模式下启动成功了。
启动消费者客户端
测试脚本文件放在这里,内容如下:
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my_topic', subscription_name='test123')
while True:
msg = consumer.receive()
print("Received message: '%s'" % msg.data())
consumer.acknowledge(msg)
client.close()
该脚本会连接到本地 Pulsar 服务的 6650 端口,因为是在 Pulsar 服务所在的容器中启动,因此服务地址是 localhost: 6650
。该脚本会在 my_topic
这一 Topic 上进行消费,该 Topic 是 Beats 中配置的 Topic ,并且指定订阅名称为 test123
。
docker cp pulsar-client.py pulsar-beat-standalone:/pulsar
docker exec -it pulsar-beat-standalone /bin/bash
python pulsar-client.py
以上命令把消费者脚本复制到 pulsar-beat-standalone 容器中,然后进入容器,使用 python 命令启动该脚本。出现如下信息时,说明该消费者成功启动。
root@1cda587d2694:/pulsar# python pulsar-client.py
2019-05-08 02:02:01.823 INFO Client:88 | Subscribing on Topic :my_topic
2019-05-08 02:02:01.824 INFO ConnectionPool:72 | Created connection for pulsar://localhost:6650
2019-05-08 02:02:01.825 INFO ClientConnection:300 | [127.0.0.1:47872 -> 127.0.0.1:6650] Connected to broker
2019-05-08 02:02:01.845 INFO HandlerBase:52 | [persistent://public/default/my_topic, test123, 0] Getting connection from pool
2019-05-08 02:02:01.870 INFO ConnectionPool:72 | Created connection for pulsar://1cda587d2694:6650
2019-05-08 02:02:01.871 INFO ClientConnection:302 | [127.0.0.1:47874 -> 127.0.0.1:6650] Connected to broker through proxy. Logical broker: pulsar://1cda587d2694:6650
2019-05-08 02:02:01.872 INFO ConsumerImpl:169 | [persistent://public/default/my_topic, test123, 0] Created consumer on broker [127.0.0.1:47874 -> 127.0.0.1:6650]
配置并启动 Filebeat
本次试验使用 Filebeat 进行测试,对于其他 Beat ,除最开始的编译不同外,配置方式是类似的,后面会介绍其他 Beat 的编译方式。
配置 Filebeat
回到最开始编译的地方,继续向下进行。首先需对 Filebeat 进行配置,增加如下内容到 filebeat.yml 文件。
output.pulsar:
url: "pulsar://pulsar-beat-standalone:6650"
topic: my_topic
name: test123
该配置的各项含义:
-
url
: Pulsar 服务的地址。因为 Filebeat 和 Pulsar 都在同一个网络 pulsar-beat 中,因此可直接使用 hostname 进行连接。 -
topic
: Pulsar 中接收数据的主题。 -
name
: 生产者的名称。
有了上面几项配置,可成功进行本次试验。关于更多配置,参考这里。
修改目录权限
chown -R root:root filebeat.yml test_module/modules.d/system.yml test_module/module/system
cp test_module/module/system/auth/test/test.log /var/log/messages.log
cp filebeat filebeat.yml test_module
本次测试使用 Filebeat 搜集系统日志,发送到 Pulsar 服务。其中,应用了 Filebeat 中的 system 模块,修改权限后,复制测试日志到 /var/log
下,因为在 system 模块的配置文件中默认搜集该目录下日志的数据。最后,复制 Filebeat 的可执行文件 filebeat 和配置文件 filebeat.yml
到测试目录下。
cd test_module
./filebeat modules enable system
./filebeat -c filebeat.yml -e
进入测试目录,打开 system 模块,启动 filebeat。-c
用来指定配置文件, -e
开启日志到控制台。
这时会输出如下类似信息:
2019-05-08T02:38:58.991Z INFO add_cloud_metadata/add_cloud_metadata.go:340 add_cloud_metadata: hosting provider type not detected.
2019-05-08T02:39:05.976Z INFO log/input.go:138 Configured paths: [/var/log/auth.log* /var/log/secure*]
2019-05-08T02:39:05.976Z INFO log/input.go:138 Configured paths: [/var/log/messages* /var/log/syslog*]
2019-05-08T02:39:05.977Z INFO input/input.go:114 Starting input of type: log; ID: 967574352861831335
2019-05-08T02:39:05.977Z INFO input/input.go:114 Starting input of type: log; ID: 16982476733437093780
2019-05-08T02:39:05.978Z INFO log/harvester.go:254 Harvester started for file: /var/log/messages.log
2019-05-08T02:39:06.979Z INFO pipeline/output.go:95 Connecting to file(pulsar://pulsar-beat-standalone:6650)
2019-05-08T02:39:06.980Z INFO pulsar/client.go:64 start create pulsar client
2019-05-08T02:39:06.980Z INFO pulsar/client.go:69 start create pulsar producer
2019/05/08 02:39:06.982 c_client.go:68: [info] INFO | ConnectionPool:72 | Created connection for pulsar://pulsar-beat-standalone:6650
2019/05/08 02:39:06.986 c_client.go:68: [info] INFO | ClientConnection:300 | [172.24.0.2:59696 -> 172.24.0.3:6650] Connected to broker
2019/05/08 02:39:07.003 c_client.go:68: [info] INFO | BatchMessageContainer:43 | { BatchContainer [size = 0] [batchSizeInBytes_ = 0] [maxAllowedMessageBatchSizeInBytes_ = 131072] [maxAllowedNumMessagesInBatch_ = 1000] [topicName = persistent://public/default/my_topic] [producerName_ = test123] [batchSizeInBytes_ = 0] [numberOfBatchesSent = 0] [averageBatchSize = 0]} BatchMessageContainer constructed
2019/05/08 02:39:07.004 c_client.go:68: [info] INFO | HandlerBase:52 | [persistent://public/default/my_topic, test123] Getting connection from pool
2019/05/08 02:39:07.005 c_client.go:68: [info] INFO | ConnectionPool:72 | Created connection for pulsar://c37002ed8073:6650
2019/05/08 02:39:07.007 c_client.go:68: [info] INFO | ClientConnection:302 | [172.24.0.2:59698 -> 172.24.0.3:6650] Connected to broker through proxy. Logical broker: pulsar://c37002ed8073:6650
2019/05/08 02:39:07.021 c_client.go:68: [info] INFO | ProducerImpl:155 | [persistent://public/default/my_topic, test123] Created producer on broker [172.24.0.2:59698 -> 172.24.0.3:6650]
同时,在刚才开启的消费者端可看到如下信息:
Received message: '{"@timestamp":"2019-05-08T02:39:05.978Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.0.0","pipeline":"filebeat-7.0.0-system-syslog-pipeline"},"input":{"type":"log"},"host":{"os":{"kernel":"4.9.125-linuxkit","codename":"stretch","platform":"debian","version":"9 (stretch)","family":"debian","name":"Debian GNU/Linux"},"name":"50dd65646ea7","id":"8b6f0bc66c81d2b9ef76a3fc9b52b452","containerized":true,"hostname":"50dd65646ea7","architecture":"x86_64"},"agent":{"id":"e74b6d2c-5ec3-40bb-a23c-d3610d8b40fa","version":"7.0.0","type":"filebeat","ephemeral_id":"3f7daa1b-2f74-4550-b19a-88bf75367c6a","hostname":"50dd65646ea7"},"log":{"offset":0,"file":{"path":"/var/log/messages.log"}},"service":{"type":"system"},"fileset":{"name":"syslog"},"ecs":{"version":"1.0.0"},"message":"Feb 21 21:54:44 localhost sshd[3402]: Accepted publickey for vagrant from 10.0.2.2 port 63673 ssh2: RSA 39:33:99:e9:a0:dc:f2:33:a3:e5:72:3b:7c:3a:56:84","event":{"module":"system","dataset":"system.syslog"}}'
Received message: '{"@timestamp":"2019-05-08T02:39:05.978Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.0.0","pipeline":"filebeat-7.0.0-system-syslog-pipeline"},"event":{"module":"system","dataset":"system.syslog"},"agent":{"ephemeral_id":"3f7daa1b-2f74-4550-b19a-88bf75367c6a","hostname":"50dd65646ea7","id":"e74b6d2c-5ec3-40bb-a23c-d3610d8b40fa","version":"7.0.0","type":"filebeat"},"ecs":{"version":"1.0.0"},"service":{"type":"system"},"input":{"type":"log"},"host":{"name":"50dd65646ea7","os":{"name":"Debian GNU/Linux","kernel":"4.9.125-linuxkit","codename":"stretch","platform":"debian","version":"9 (stretch)","family":"debian"},"id":"8b6f0bc66c81d2b9ef76a3fc9b52b452","containerized":true,"hostname":"50dd65646ea7","architecture":"x86_64"},"log":{"file":{"path":"/var/log/messages.log"},"offset":152},"message":"Feb 23 00:13:35 localhost sshd[7483]: Accepted password for vagrant from 192.168.33.1 port 58803 ssh2","fileset":{"name":"syslog"}}'
至此,通过 Filebeat 搜集 system 的日志信息并发送到 Pulsar 的整个流程就完成了。
编译其他 Beat
go build -o metricbeat metricbeat.go
go build -o filebeat filebeat.go
go build -o functionbeat functionbeat.go
go build -o journalbeat journalbeat.go
go build -o auditbeat auditbeat.go
go build -o winlogbeat winlogbeat.go
go build -o packetbeat packetbeat.go
其他 Beat 的使用方式同 Filebeat,具体可以参考官方文档。
问题
使用中出现其他问题,可以参考 FAQ。**
网友评论