美文网首页
使用 Elastic Beats 搜集日志到 Pulsa

使用 Elastic Beats 搜集日志到 Pulsa

作者: StreamNative | 来源:发表于2019-08-22 19:33 被阅读0次

    阅读本文需要约 5 分钟。

    前言

    本文主要涉及 Beats 和 Pulsar 服务的搭建、测试、数据的发送及消费。通过本篇文章,能够对 Beats 如何发送数据到 Pulsar,以及如何使用 Pulsar 对数据进行消费有一个比较全面的了解。

    关于 Apache Pulsar 和 Elastic Beats

    Apache Pulsar 是一个分布式消息发布订阅系统。Elastic Beats 集合了多种单一用途数据采集器。本文针对各种 Beats 的使用做了简要介绍。

    Elastic Beats 包括 Filebeat、Metricbeat、Functionbeat、Winlogbeat、Journalbeat 和 Auditbeat,本项目开发的 Output 适用于以上所有 Beat。具体使用方法,后面会逐步说明。本次试验以 Filebeat 的使用为例。

    环境依赖

    在进行本次试验之前,你需要在电脑上搭建以下环境依赖,本次试验测试是在 Mac 系统上进行的。

    由于 Golang 的包在本地 build 会出现一些网络问题,故本次试验(包括 Build 和 Install 等)均在 Docker 中完成。

    初始化网络

    在测试中 Beats 到 Pulsar 的网络环境必须是相通的,因此需要提前初始化网络。

    
    docker network create pulsar-beat
    
    

    该命令会创建一个名为 pulsar-beat 的网络,之后 Pulsar 服务和 Beats 都会连接到该网络。

    使用 Docker 镜像编译本项目

    下载代码到本地

    
    git clone https://github.com/streamnative/pulsar-beat-output
    
    

    由于多方面的原因,本项目把一些依赖如 Apache Pulsar Go ClientElastic Beats 都放在了 vendor 文件夹下,可直接在 Docker 中进行 Build。

    下载 Docker 镜像并安装依赖

    
    docker pull golang:1.12.4
    
    docker run -it -d --network pulsar-beat --name pulsar-beat-golang golang:1.12.4 /bin/bash
    
    mkdir -p src/github.com/streamnative/pulsar-beat-output
    
    mv pulsar-beat-output/* src/github.com/streamnative/pulsar-beat-output/
    
    docker cp src pulsar-beat-golang:/go/
    
    

    以上,首先拉取 golang 1.12.4 版本的镜像,然后使用 docker run 命令启动该镜像并命名为 pulsar-beat-golang-d 参数使该服务以后台模式运行,-it 以交互模式运行容器,并为容器分配一个伪输入终端。 --network 代表该容器使用 pulsar-beat 这个网络,之后 Pulsar 服务也会加入该网络。

    进入容器,安装依赖

    
    docker exec -it pulsar-beat-golang /bin/bash
    
    wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/DEB/apache-pulsar-client.deb
    
    wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/DEB/apache-pulsar-client-dev.deb
    
    dpkg -i apache-pulsar-client.deb
    
    dpkg -i apache-pulsar-client-dev.deb
    
    

    由于本项目使用到了 Pulsar 的 Golang Client ,该 Client 需要依赖 deb 包,因此在编译之前需要提前装好这些包。其他 Linux 发行版可根据需要在这里进行下载安装。

    开始编译

    继续上面的步骤,使用如下命令编译本项目:

    
    cd src/github.com/streamnative/pulsar-beat-output/
    
    go build -o filebeat main.go
    
    

    执行上述命令后,当前目录下会生成一个名为 filebeat 的可执行文件,之后可以使用这个文件进行数据搜集。

    安装测试

    单机模式下安装 Pulsar

    为方便测试,本次使用单机模式安装 Pulsar,并使用 Docker 镜像 apachepulsar/pulsar:2.3.0 启动服务。新开一个窗口,拉取 Pulsar 的镜像并启动服务。这里测试使用的是 2.3.0 的镜像,当然也可以使用最新版 2.4.0 的镜像。

    
    docker pull apachepulsar/pulsar:2.3.0
    
    docker run -d -it --network pulsar-beat -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-beat-standalone apachepulsar/pulsar:2.3.0 bin/pulsar standalone
    
    

    该服务暴露出 6650 和 8080 端口供外部服务使用,这个容器将 Pulsar 服务的 data 目录挂载在当前目录下,并且将其命名为 pulsar-beat-standalone-d 参数使该服务以后台模式运行,-it 是以交互模式运行该容器,并为容器分配一个伪输入终端。

    
    docker logs -f pulsar-flume-standalone
    
    01:55:55.005 [pulsar-web-55-1] INFO  org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
    
    01:55:55.013 [pulsar-ordered-OrderedExecutor-1-0-EventThread] INFO  org.apache.pulsar.zookeeper.ZooKeeperDataCache - [State:CONNECTED Timeout:30000 sessionid:0x10003995a81000a local:/127.0.0.1:54508 remoteserver:localhost/127.0.0.1:2181 lastZxid:156 xid:41 sent:41 recv:43 queuedpkts:0 pendingresp:0 queuedevents:0] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDataChanged path:/admin/policies/public/default
    
    01:55:55.014 [pulsar-web-55-1] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Successfully updated the replication clusters on namespace public/default
    
    01:55:55.016 [pulsar-web-55-1] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.3 - - [08/May/2019:01:55:55 +0000] "POST /admin/v2/namespaces/public/default/replication HTTP/1.1" 204 0 "-" "Jersey/2.27 (HttpUrlConnection 1.8.0_181)" 15
    
    

    使用上述命令查看 Docker 日志,若出现以上日志信息,说明 Pulsar 在单机模式下启动成功了。

    启动消费者客户端

    测试脚本文件放在这里,内容如下:

    
    import pulsar
    
    client = pulsar.Client('pulsar://localhost:6650')
    
    consumer = client.subscribe('my_topic', subscription_name='test123')
    
    while True:
    
        msg = consumer.receive()
    
        print("Received message: '%s'" % msg.data())
    
        consumer.acknowledge(msg)
    
    client.close()
    
    

    该脚本会连接到本地 Pulsar 服务的 6650 端口,因为是在 Pulsar 服务所在的容器中启动,因此服务地址是 localhost: 6650。该脚本会在 my_topic 这一 Topic 上进行消费,该 Topic 是 Beats 中配置的 Topic ,并且指定订阅名称为 test123

    
    docker cp pulsar-client.py pulsar-beat-standalone:/pulsar
    
    docker exec -it pulsar-beat-standalone /bin/bash
    
    python pulsar-client.py
    
    

    以上命令把消费者脚本复制到 pulsar-beat-standalone 容器中,然后进入容器,使用 python 命令启动该脚本。出现如下信息时,说明该消费者成功启动。

    
    root@1cda587d2694:/pulsar# python pulsar-client.py
    
    2019-05-08 02:02:01.823 INFO  Client:88 | Subscribing on Topic :my_topic
    
    2019-05-08 02:02:01.824 INFO  ConnectionPool:72 | Created connection for pulsar://localhost:6650
    
    2019-05-08 02:02:01.825 INFO  ClientConnection:300 | [127.0.0.1:47872 -> 127.0.0.1:6650] Connected to broker
    
    2019-05-08 02:02:01.845 INFO  HandlerBase:52 | [persistent://public/default/my_topic, test123, 0] Getting connection from pool
    
    2019-05-08 02:02:01.870 INFO  ConnectionPool:72 | Created connection for pulsar://1cda587d2694:6650
    
    2019-05-08 02:02:01.871 INFO  ClientConnection:302 | [127.0.0.1:47874 -> 127.0.0.1:6650] Connected to broker through proxy. Logical broker: pulsar://1cda587d2694:6650
    
    2019-05-08 02:02:01.872 INFO  ConsumerImpl:169 | [persistent://public/default/my_topic, test123, 0] Created consumer on broker [127.0.0.1:47874 -> 127.0.0.1:6650]
    
    

    配置并启动 Filebeat

    本次试验使用 Filebeat 进行测试,对于其他 Beat ,除最开始的编译不同外,配置方式是类似的,后面会介绍其他 Beat 的编译方式。

    配置 Filebeat

    回到最开始编译的地方,继续向下进行。首先需对 Filebeat 进行配置,增加如下内容到 filebeat.yml 文件。

    
    output.pulsar:
    
      url: "pulsar://pulsar-beat-standalone:6650"
    
      topic: my_topic
    
      name: test123
    
    

    该配置的各项含义:

    • url: Pulsar 服务的地址。因为 Filebeat 和 Pulsar 都在同一个网络 pulsar-beat 中,因此可直接使用 hostname 进行连接。

    • topic: Pulsar 中接收数据的主题。

    • name: 生产者的名称。

    有了上面几项配置,可成功进行本次试验。关于更多配置,参考这里

    修改目录权限

    
    chown -R root:root filebeat.yml test_module/modules.d/system.yml test_module/module/system
    
    cp test_module/module/system/auth/test/test.log /var/log/messages.log
    
    cp filebeat filebeat.yml test_module
    
    

    本次测试使用 Filebeat 搜集系统日志,发送到 Pulsar 服务。其中,应用了 Filebeat 中的 system 模块,修改权限后,复制测试日志到 /var/log 下,因为在 system 模块的配置文件中默认搜集该目录下日志的数据。最后,复制 Filebeat 的可执行文件 filebeat 和配置文件 filebeat.yml 到测试目录下。

    
    cd test_module
    
    ./filebeat modules enable system
    
    ./filebeat -c filebeat.yml -e
    
    

    进入测试目录,打开 system 模块,启动 filebeat。-c 用来指定配置文件, -e 开启日志到控制台。

    这时会输出如下类似信息:

    
    2019-05-08T02:38:58.991Z  INFO  add_cloud_metadata/add_cloud_metadata.go:340  add_cloud_metadata: hosting provider type not detected.
    
    2019-05-08T02:39:05.976Z  INFO  log/input.go:138  Configured paths: [/var/log/auth.log* /var/log/secure*]
    
    2019-05-08T02:39:05.976Z  INFO  log/input.go:138  Configured paths: [/var/log/messages* /var/log/syslog*]
    
    2019-05-08T02:39:05.977Z  INFO  input/input.go:114  Starting input of type: log; ID: 967574352861831335
    
    2019-05-08T02:39:05.977Z  INFO  input/input.go:114  Starting input of type: log; ID: 16982476733437093780
    
    2019-05-08T02:39:05.978Z  INFO  log/harvester.go:254  Harvester started for file: /var/log/messages.log
    
    2019-05-08T02:39:06.979Z  INFO  pipeline/output.go:95  Connecting to file(pulsar://pulsar-beat-standalone:6650)
    
    2019-05-08T02:39:06.980Z  INFO  pulsar/client.go:64  start create pulsar client
    
    2019-05-08T02:39:06.980Z  INFO  pulsar/client.go:69  start create pulsar producer
    
    2019/05/08 02:39:06.982 c_client.go:68: [info] INFO  | ConnectionPool:72 | Created connection for pulsar://pulsar-beat-standalone:6650
    
    2019/05/08 02:39:06.986 c_client.go:68: [info] INFO  | ClientConnection:300 | [172.24.0.2:59696 -> 172.24.0.3:6650] Connected to broker
    
    2019/05/08 02:39:07.003 c_client.go:68: [info] INFO  | BatchMessageContainer:43 | { BatchContainer [size = 0] [batchSizeInBytes_ = 0] [maxAllowedMessageBatchSizeInBytes_ = 131072] [maxAllowedNumMessagesInBatch_ = 1000] [topicName = persistent://public/default/my_topic] [producerName_ = test123] [batchSizeInBytes_ = 0] [numberOfBatchesSent = 0] [averageBatchSize = 0]} BatchMessageContainer constructed
    
    2019/05/08 02:39:07.004 c_client.go:68: [info] INFO  | HandlerBase:52 | [persistent://public/default/my_topic, test123] Getting connection from pool
    
    2019/05/08 02:39:07.005 c_client.go:68: [info] INFO  | ConnectionPool:72 | Created connection for pulsar://c37002ed8073:6650
    
    2019/05/08 02:39:07.007 c_client.go:68: [info] INFO  | ClientConnection:302 | [172.24.0.2:59698 -> 172.24.0.3:6650] Connected to broker through proxy. Logical broker: pulsar://c37002ed8073:6650
    
    2019/05/08 02:39:07.021 c_client.go:68: [info] INFO  | ProducerImpl:155 | [persistent://public/default/my_topic, test123] Created producer on broker [172.24.0.2:59698 -> 172.24.0.3:6650]
    
    

    同时,在刚才开启的消费者端可看到如下信息:

    
    Received message: '{"@timestamp":"2019-05-08T02:39:05.978Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.0.0","pipeline":"filebeat-7.0.0-system-syslog-pipeline"},"input":{"type":"log"},"host":{"os":{"kernel":"4.9.125-linuxkit","codename":"stretch","platform":"debian","version":"9 (stretch)","family":"debian","name":"Debian GNU/Linux"},"name":"50dd65646ea7","id":"8b6f0bc66c81d2b9ef76a3fc9b52b452","containerized":true,"hostname":"50dd65646ea7","architecture":"x86_64"},"agent":{"id":"e74b6d2c-5ec3-40bb-a23c-d3610d8b40fa","version":"7.0.0","type":"filebeat","ephemeral_id":"3f7daa1b-2f74-4550-b19a-88bf75367c6a","hostname":"50dd65646ea7"},"log":{"offset":0,"file":{"path":"/var/log/messages.log"}},"service":{"type":"system"},"fileset":{"name":"syslog"},"ecs":{"version":"1.0.0"},"message":"Feb 21 21:54:44 localhost sshd[3402]: Accepted publickey for vagrant from 10.0.2.2 port 63673 ssh2: RSA 39:33:99:e9:a0:dc:f2:33:a3:e5:72:3b:7c:3a:56:84","event":{"module":"system","dataset":"system.syslog"}}'
    
    Received message: '{"@timestamp":"2019-05-08T02:39:05.978Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.0.0","pipeline":"filebeat-7.0.0-system-syslog-pipeline"},"event":{"module":"system","dataset":"system.syslog"},"agent":{"ephemeral_id":"3f7daa1b-2f74-4550-b19a-88bf75367c6a","hostname":"50dd65646ea7","id":"e74b6d2c-5ec3-40bb-a23c-d3610d8b40fa","version":"7.0.0","type":"filebeat"},"ecs":{"version":"1.0.0"},"service":{"type":"system"},"input":{"type":"log"},"host":{"name":"50dd65646ea7","os":{"name":"Debian GNU/Linux","kernel":"4.9.125-linuxkit","codename":"stretch","platform":"debian","version":"9 (stretch)","family":"debian"},"id":"8b6f0bc66c81d2b9ef76a3fc9b52b452","containerized":true,"hostname":"50dd65646ea7","architecture":"x86_64"},"log":{"file":{"path":"/var/log/messages.log"},"offset":152},"message":"Feb 23 00:13:35 localhost sshd[7483]: Accepted password for vagrant from 192.168.33.1 port 58803 ssh2","fileset":{"name":"syslog"}}'
    
    

    至此,通过 Filebeat 搜集 system 的日志信息并发送到 Pulsar 的整个流程就完成了。

    编译其他 Beat

    
    go build -o metricbeat metricbeat.go
    
    go build -o filebeat filebeat.go
    
    go build -o functionbeat functionbeat.go
    
    go build -o journalbeat journalbeat.go
    
    go build -o auditbeat auditbeat.go
    
    go build -o winlogbeat winlogbeat.go
    
    go build -o packetbeat packetbeat.go
    
    

    其他 Beat 的使用方式同 Filebeat,具体可以参考官方文档

    问题

    使用中出现其他问题,可以参考 FAQ。**

    相关文章

      网友评论

          本文标题:使用 Elastic Beats 搜集日志到 Pulsa

          本文链接:https://www.haomeiwen.com/subject/xymosctx.html