美文网首页
node日志接入kafka(封装kafka-node)

node日志接入kafka(封装kafka-node)

作者: 景阳冈大虫在此 | 来源:发表于2020-04-24 16:11 被阅读0次

准备工作

  • npm i kafka-node
  • 拷贝 Logger.js 进 node 项目

查看kafka-node文档

文档地址https://www.npmjs.com/package/kafka-node
需求日志接入kafka存到elk上,各种要求都不高,找一段最基础的发送message

var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    KeyedMessage = kafka.KeyedMessage,
    client = new kafka.KafkaClient(),
    producer = new Producer(client),
    km = new KeyedMessage('key', 'message'),
    payloads = [
        { topic: 'topic1', messages: 'hi', partition: 0 },
        { topic: 'topic2', messages: ['hello', 'world', km] }
    ];
producer.on('ready', function () {
    producer.send(payloads, function (err, data) {
        console.log(data);
    });
});
 
producer.on('error', function (err) {})

来观察一下这个乍看平平无奇的例子,我需要的调用方式为log.info('XXX'),
而这里使用PubSub模式实现了分布式事件,并不是像axios那样可以直接用promise去处理异步逻辑。

producer.on('ready', function () {
    producer.send(payloads, function (err, data) {
        console.log(data);
    });
});

为了让调用方有精神上的愉悦感(并不想:)),这块需要处理一下

  • PubSub模式
    题外话,根据《JavaScript异步编程》,PubSub简单实现类似于
PubSub = {handlers: {}}

PubSub.on = function(eventType, handler) { 
  if (!(eventType in this.handlers)) {this.handlers[eventType] = []; }
  this.handlers[eventType].push(handler);
  return this; 
}

// 接着,等到触发事件的时候,再循环遍历所有的事件处理器。
PubSub.emit = function(eventType) {
  var handlerArgs = Array.prototype.slice.call(arguments, 1); 
  for (var i = 0; i < this.handlers[eventType].length; i++) {
    this.handlers[eventType][i].apply(this, handlerArgs);
  }
  return this; 
}

Logger代码如下

// Logger.js
var kafka = require('kafka-node'),
    Producer = kafka.Producer;
const EVENT = ['DEBUG', 'INFO', 'WARN', 'ERROR'];

function Logger(options) {
    let client = new kafka.KafkaClient({
        kafkaHost: 'logkafkafat05.test.com:9092',
    });
    this.producer = new Producer(client);
    this.ready = readyFunc(this.producer);
    this.message = createMessage(options);
}
// 发日志
Logger.prototype.sendMessage = function ({ level, layoutMessage }) {
    let wholeMes = Object.assign(this.message, {
        level,
        layoutMessage,
        timeStamp: new Date().getTime(),
    });
    let payloads = [
        {
            topic: 'test.log',
            messages: JSON.stringify(wholeMes),
        },
    ];

    return this.ready.then(() => {
        return new Promise((resolve, reject) => {
            this.producer.send(payloads, function (err, data) {
                // console.log(data);
                resolve(data);
            });
        });
    });
};

// 支持debug、warn、info、error等方法
EVENT.map((method) => {
    Logger.prototype[method.toLowerCase()] = function (mes) {
        return this.sendMessage({ level: method, layoutMessage: mes });
    };
});
function readyFunc(producer) {
    return new Promise((resolve, reject) => {
        return producer.on('ready', resolve);
    });
}
function createMessage({ appId, logName }) {
    return {
        appId,
        logName,
        level: '',
        layoutMessage: '',
        tags: {
            HOST_IP: getIPAdress(),
            catTrace: '',
        },
    };
}
// 获取本机ip
function getIPAdress() {
    var interfaces = require('os').networkInterfaces();
    for (var devName in interfaces) {
        var iface = interfaces[devName];
        for (var i = 0; i < iface.length; i++) {
            var alias = iface[i];
            if (
                alias.family === 'IPv4' &&
                alias.address !== '127.0.0.1' &&
                !alias.internal
            ) {
                return alias.address;
            }
        }
    }
}
module.exports = Logger;

使用

  • 支持两类调用方式
var Logger = require('./Logger.js');

var log = new Logger({ appId: 'XXXXX', logName: 'htmlquicker' });

// 调用方式1类
log.warn('warn test');
log.info('info test');
log.debug('debug test');
log.error('error test');

// 调用方式2类
log.sendMessage({ level: 'INFO', layoutMessage: 'test' });
log.sendMessage({ level: 'WARN', layoutMessage: 'test' });
log.sendMessage({ level: 'DEBUG', layoutMessage: 'test' });
log.sendMessage({ level: 'ERROR', layoutMessage: 'test' });
  • koa 示例
    let resContent = {
        Content: {
            downloadPath: `/download/${encodeURIComponent(
                fileName
            )}.html`,
            fileName: fileName,
        },
    };
    ctx.body = response.success(resContent);
    log.info(
        JSON.stringify({
            href: ctx.href,
            ...resContent,
        })
    );
    

相关文章

网友评论

      本文标题:node日志接入kafka(封装kafka-node)

      本文链接:https://www.haomeiwen.com/subject/xsufwhtx.html