美文网首页
logstash MQTT统一日志

logstash MQTT统一日志

作者: 我已不是少年郎 | 来源:发表于2022-04-25 18:41 被阅读0次

    1、背景

    目前有测试环境和生产环境,两个环境的日志本来是分开的,由于分布式锁的原因,导致了其中的MQ日志飘忽不定。但最根本的原因还在于,在云服务器上看日志特别占用内存和带宽,会触发带宽超限告警,所以需要做一个统一日志到本地服务器。
    至于为什么不用logstash的tcp配置,因为本地服务器的宽带不是固定IP,所以需要用云服务器上的固定IP来启动MQ服务,然后本地logstash服务连接MQ接收日志。

    2、选方案

    kafaka&RabbitMQ

    根据网上描述,kafaka是专门为统一消息处理开发的,很多人用,但是消息传输不如RabbitMQ可靠,二者内存占用都挺高,logstash里直接有插件可以使用,但是我都没有技术积累,遇到问题很难解决,所以弃用。

    本来在docker上已经把RabbitMQ部署并且程序都配置好全部跑起来了,但是当多起了一个客户端模拟logstash接收日志时发现,这个后来启动的消费者会接收到前面的日志,相当于服务器缓存了消息,这些消息肯定占用内存和性能,然后去搜索如何优化,最后发现门槛略高,直接放弃。

    本身服务器上已经有MQTT协议的MQ,对日志的准确性要求并不是很高,不需要服务器缓存消息,并且只需维护一套MQ。

    3、实践

    安装logstash mqtt插件

    这里用的是github上的开源项目,这个插件有个缺点,codec只能使用plain,功能比较简单,但是基本满足需求。
    https://github.com/jurek7/logstash-input-mqtt

    下载logstash-input-paho-mqtt-0.1.5.gem文件到本地,在docker里挂在该文件,然后进入,去安装插件。

    docker exec -it logstash /bin/bash
    
    bin/logstash-plugin install logstash-input-paho-mqtt-0.1.5.gem
    

    安装完毕后制作image。

    #查看原logstash的ID
    docker ps
    
    #提交当前修改并保存image
    docker commit e8a6976d799b logstash-mqtt:7.6.1
    
    docker images
    #这时会多出一个image
    

    最终的elk的yml容器编排文件

    version: "3"
    services:
    
      elasticsearch:
        image: elasticsearch:7.6.1
        environment:
          - discovery.type=single-node
          - "ES_JAVA_OPTS=-Xms256m -Xmx256m"
        volumes:
          - "/home/mgr/volumes/elasticsearch/data:/usr/share/elasticsearch/data"
        restart: always
        network_mode: "host"
        container_name: elasticsearch
    
      logstash:
        image: logstash-mqtt:7.6.1
        volumes:
          - "./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro"
          - "./logstash.yml:/usr/share/logstash/config/logstash.yml:ro"
          - "./logstash-input-paho-mqtt-0.1.5.gem:/usr/share/logstash/logstash-input-paho-mqtt-0.1.5.gem:rw"
        restart: always
        network_mode: "host"
        container_name: logstash
        depends_on:
          - elasticsearch
    
      kibana:
        image: kibana:7.6.1
        volumes:
          - "./kibana.yml:/usr/share/kibana/config/kibana.yml:rw"
        restart: always
        network_mode: "host"
        container_name: kibana
        depends_on:
          - elasticsearch
    

    修改logstash.conf

    input {
      tcp {
        host => "0.0.0.0"
        port => 9250
        mode => "server"
        tags => ["tags"]
        codec => json_lines #可能需要更新logstash插件
      }
    
      mqtt {
         host => "129.***.**.*"
         port => 1883
         topic => "logs"
         qos => 0
         clean_session => true
         client_id => "***"
         username  => "***"
         password => "***"
      }
    }
    
    output {
     #stdout{codec =>rubydebug}
      elasticsearch {
        hosts => ["localhost:9200"]  #这块配置需要带端口号
        index => "anfasmart-%{+YYYY.MM.dd}"
      }
    }
    

    重启

    docker-compose -f elk.yml down
    docker-compose -f elk.yml up -d
    

    配置logback

    修改logback.xml文件

    <configuration scan="false" scanPeriod="60 seconds" debug="false">
        <jmxConfigurator />
        <property name="log_level" value="${logLevel}" />
    
        <appender name="MQLogs" class="com.lucien.commons.MqttAppender">
            <encoder>
                <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
            </encoder>
        </appender>
    
        <!--配置druid的SQL日志输出-->
        <logger name="druid.sql.Statement" level="WARN" additivity="false">
            <appender-ref ref="MQLogs" />
        </logger>
    
        <root>
            <level value="${log_level}" />
            <appender-ref ref="MQLogs" />
        </root>
    </configuration>
    

    创建MqttAppender文件,其中MqttServer为自己创建的mqtt客户端,根据自己需要在start()里初始化创建即可,但我的项目在其他地方创建并且不希望有两个连接实例,因此用了静态变量的方式单例获取。

    package com.lucien.commons;
    
    import ch.qos.logback.classic.spi.LoggingEvent;
    import ch.qos.logback.core.UnsynchronizedAppenderBase;
    import com.lucien.gateway.impl.MqttServer;
    import com.lucien.utils.StringUtils;
    
    public class MqttAppender extends UnsynchronizedAppenderBase<LoggingEvent> {
        private MqttServer mqttServer;
        private final int DEFAULT_BUFFER_SIZE = 512;
        private boolean hasInstance = false;
        private String hostName;
    
        @Override
        public void start() {
            hostName = System.getenv("HOSTNAME");
            if (StringUtils.isEmpty(hostName)) {
                hostName = System.getenv("COMPUTERNAME");
                if (StringUtils.isEmpty(hostName)) {
                    hostName = "unKnow";
                }
            }
            super.start();
        }
    
        @Override
        protected void append(LoggingEvent event) {
            StringBuilder buf = new StringBuilder(DEFAULT_BUFFER_SIZE);
            buf.append(hostName);
            buf.append(' ');
            buf.append(event.getLevel().toString());
            buf.append(' ');
            buf.append(event.getLoggerName());
            buf.append(' ');
            buf.append(event.getFormattedMessage());
    
            if (!hasInstance) {
                if (MqttServer.getSingleton() != null) {
                    if (MqttServer.getSingleton().isClientConnect()) {
                        mqttServer = MqttServer.getSingleton();
                        hasInstance = true;
                    }
                }
            } else {
                mqttServer.pubLowMessage(buf.toString().getBytes(), "logs");
            }
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:logstash MQTT统一日志

          本文链接:https://www.haomeiwen.com/subject/xiityrtx.html