美文网首页
将程序日志写入kafka中

将程序日志写入kafka中

作者: 过来摸摸头丶 | 来源:发表于2018-09-25 18:27 被阅读0次

注:原文链接我给弄丢了,是外国友人在github项目中贡献的自己写的代码。

1.加入依赖包

 <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>0.11.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-log4j-appender</artifactId>
        <version>0.11.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.5</version>
    </dependency>

2.创建核心类

/**
 * A log4j appender that produces log messages to Kafka
 */
public class KafkaLog4jAppender extends AppenderSkeleton {

    private static final String BOOTSTRAP_SERVERS_CONFIG = ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
    private static final String COMPRESSION_TYPE_CONFIG = ProducerConfig.COMPRESSION_TYPE_CONFIG;
    private static final String ACKS_CONFIG = ProducerConfig.ACKS_CONFIG;
    private static final String RETRIES_CONFIG = ProducerConfig.RETRIES_CONFIG;
    private static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
    private static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
    private static final String SECURITY_PROTOCOL = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
    private static final String SSL_TRUSTSTORE_LOCATION = SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
    private static final String SSL_TRUSTSTORE_PASSWORD = SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
    private static final String SSL_KEYSTORE_TYPE = SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
    private static final String SSL_KEYSTORE_LOCATION = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
    private static final String SSL_KEYSTORE_PASSWORD = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;

    private String brokerList = null;
    private String topic = null;
    private String compressionType = null;
    private String securityProtocol = null;
    private String sslTruststoreLocation = null;
    private String sslTruststorePassword = null;
    private String sslKeystoreType = null;
    private String sslKeystoreLocation = null;
    private String sslKeystorePassword = null;

    private int retries = 0;
    private int requiredNumAcks = Integer.MAX_VALUE;
    private boolean syncSend = false;
    private Producer<byte[], byte[]> producer = null;

    public Producer<byte[], byte[]> getProducer() {
        return producer;
    }

    public String getBrokerList() {
        return brokerList;
    }

    public void setBrokerList(String brokerList) {
        this.brokerList = brokerList;
    }

    public int getRequiredNumAcks() {
        return requiredNumAcks;
    }

    public void setRequiredNumAcks(int requiredNumAcks) {
        this.requiredNumAcks = requiredNumAcks;
    }

    public int getRetries() {
        return retries;
    }

    public void setRetries(int retries) {
        this.retries = retries;
    }

    public String getCompressionType() {
        return compressionType;
    }

    public void setCompressionType(String compressionType) {
        this.compressionType = compressionType;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public boolean getSyncSend() {
        return syncSend;
    }

    public void setSyncSend(boolean syncSend) {
        this.syncSend = syncSend;
    }

    public String getSslTruststorePassword() {
        return sslTruststorePassword;
    }

    public String getSslTruststoreLocation() {
        return sslTruststoreLocation;
    }

    public String getSecurityProtocol() {
        return securityProtocol;
    }

    public void setSecurityProtocol(String securityProtocol) {
        this.securityProtocol = securityProtocol;
    }

    public void setSslTruststoreLocation(String sslTruststoreLocation) {
        this.sslTruststoreLocation = sslTruststoreLocation;
    }

    public void setSslTruststorePassword(String sslTruststorePassword) {
        this.sslTruststorePassword = sslTruststorePassword;
    }

    public void setSslKeystorePassword(String sslKeystorePassword) {
        this.sslKeystorePassword = sslKeystorePassword;
    }

    public void setSslKeystoreType(String sslKeystoreType) {
        this.sslKeystoreType = sslKeystoreType;
    }

    public void setSslKeystoreLocation(String sslKeystoreLocation) {
        this.sslKeystoreLocation = sslKeystoreLocation;
    }

    public String getSslKeystoreLocation() {
        return sslKeystoreLocation;
    }

    public String getSslKeystoreType() {
        return sslKeystoreType;
    }

    public String getSslKeystorePassword() {
        return sslKeystorePassword;
    }

    @Override
    public void activateOptions() {
        // check for config parameter validity
        Properties props = new Properties();
        if (brokerList != null)
            props.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
        if (props.isEmpty())
            throw new ConfigException("The bootstrap servers property should be specified");
        if (topic == null)
            throw new ConfigException("Topic must be specified by the Kafka log4j appender");
        if (compressionType != null)
            props.put(COMPRESSION_TYPE_CONFIG, compressionType);
        if (requiredNumAcks != Integer.MAX_VALUE)
            props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks));
        if (retries > 0)
            props.put(RETRIES_CONFIG, retries);
        if (securityProtocol != null && sslTruststoreLocation != null &&
                sslTruststorePassword != null) {
            props.put(SECURITY_PROTOCOL, securityProtocol);
            props.put(SSL_TRUSTSTORE_LOCATION, sslTruststoreLocation);
            props.put(SSL_TRUSTSTORE_PASSWORD, sslTruststorePassword);

            if (sslKeystoreType != null && sslKeystoreLocation != null &&
                    sslKeystorePassword != null) {
                props.put(SSL_KEYSTORE_TYPE, sslKeystoreType);
                props.put(SSL_KEYSTORE_LOCATION, sslKeystoreLocation);
                props.put(SSL_KEYSTORE_PASSWORD, sslKeystorePassword);
            }
        }

        props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.producer = getKafkaProducer(props);
        LogLog.debug("Kafka producer connected to " + brokerList);
        LogLog.debug("Logging for topic: " + topic);
    }

    protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
        return new KafkaProducer<byte[], byte[]>(props);
    }

    @Override
    protected void append(LoggingEvent event) {
        String message = subAppend(event);
        LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);
        Future<RecordMetadata> response = producer.send(new ProducerRecord<byte[], byte[]>(topic, message.getBytes()));
        if (syncSend) {
            try {
                response.get();
            } catch (InterruptedException ex) {
                throw new RuntimeException(ex);
            } catch (ExecutionException ex) {
                throw new RuntimeException(ex);
            }
        }
    }

    private String subAppend(LoggingEvent event) {
        return (this.layout == null) ? event.getRenderedMessage() : this.layout.format(event);
    }

    public void close() {
        if (!this.closed) {
            this.closed = true;
            producer.close();
        }
    }

    public boolean requiresLayout() {
        return true;
    }
}

3.配置log4j.properties

## appender KAFKA
log4j.logger.com.demo.kafka.Log4jToKafka=INFO,KAFKA
log4j.appender.KAFKA=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.KAFKA.topic=test
log4j.appender.KAFKA.brokerList=172.16.92.237:9092

log4j.appender.KAFKA.compressionType=none
log4j.appender.KAFKA.syncSend=true
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA.ThresholdFilter.level=INFO
log4j.appender.KAFKA.ThresholdFilter.onMatch=ACCEPT
log4j.appender.KAFKA.ThresholdFilter.onMismatch=DENY

log4j.appender.KAFKA.layout.ConversionPattern=%-d{yyyy-MM-dd HH\:mm\:ss} [%c\:%L]-[%p] %m%n

## appender console
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.target=System.out
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d (%t) [%p - %l] %m%n

4.测试

public class Log4jToKafka {
    private static Logger logger = Logger.getLogger(Log4jToKafka.class);

    public static void main(String args[]) {
        System.out.println("hello word!");
        int start = 1;
        while (start < 10) {
            start++;
            logger.info(start + " hello Log4jToKafka test !");
        }
    }
}

5.在开启kafka监听后,运行上面的程序即可看到kafka中打印出的日志。

相关文章

  • 将程序日志写入kafka中

    注:原文链接我给弄丢了,是外国友人在github项目中贡献的自己写的代码。 1.加入依赖包 2.创建核心类 3.配...

  • Flink:注册Table Kafka Sink报错处理

    实现 实现Flink Job:读取阿里云LogService日志,统计事件后将指标写入搭建的Kafka中。JAVA...

  • Kafka的日志管理模块--LogManager

    这里说的日志不是为了追踪程序运行而打的日志,指的是Kafka接受到消息后将消息写入磁盘或从磁盘读取的子系统; 它负...

  • python3读写kafka

    消费kafka数据,方式一 消费kafka数据,方式二 将消息写入kafka

  • Kafka的日志清理-LogCleaner

    这里说的日志,是指Kafka保存写入消息的文件; Kafka日志清除策略包括中间:基于时间和大小的删除策略;Com...

  • Flink写入Hbase

    基本流程: 从Kafka中读取数据,再写入到Hbase。 写入Kafka代码 Flink写入Habse代码 pom...

  • 典型日志系统架构及其缺点

    典型日志系统架构 典型的日志架构如图所示,简单介绍下基本流程 日志通过filebeat或者api写入到kafka或...

  • 《Kafka》副本复制机制

    1. 实现原理 Kafka主题中的每个分区都有一个预写日志(write-ahead log),我们写入Kafka的...

  • AOF和RDB

    redis有两种持久化方案(AOF和RDB) 什么是AOF AOF将每条写入命令作为日志写入到日志文件中。 什么是...

  • 三、Kafka工作流程分析

    3.1 Kafka生产过程分析 参考Kafka架构 3.1.1 写入方式 producer采用推(push)模式将...

网友评论

      本文标题:将程序日志写入kafka中

      本文链接:https://www.haomeiwen.com/subject/zkgloftx.html