准备工作
npm i kafka-node
- 拷贝 Logger.js 进 node 项目
查看kafka-node文档
文档地址https://www.npmjs.com/package/kafka-node
需求日志接入kafka存到elk上,各种要求都不高,找一段最基础的发送message
var kafka = require('kafka-node'),
Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage,
client = new kafka.KafkaClient(),
producer = new Producer(client),
km = new KeyedMessage('key', 'message'),
payloads = [
{ topic: 'topic1', messages: 'hi', partition: 0 },
{ topic: 'topic2', messages: ['hello', 'world', km] }
];
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.log(data);
});
});
producer.on('error', function (err) {})
来观察一下这个乍看平平无奇的例子,我需要的调用方式为log.info('XXX')
,
而这里使用PubSub模式实现了分布式事件,并不是像axios那样可以直接用promise去处理异步逻辑。
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.log(data);
});
});
为了让调用方有精神上的愉悦感(并不想:)),这块需要处理一下
-
PubSub模式
题外话,根据《JavaScript异步编程》,PubSub简单实现类似于
PubSub = {handlers: {}}
PubSub.on = function(eventType, handler) {
if (!(eventType in this.handlers)) {this.handlers[eventType] = []; }
this.handlers[eventType].push(handler);
return this;
}
// 接着,等到触发事件的时候,再循环遍历所有的事件处理器。
PubSub.emit = function(eventType) {
var handlerArgs = Array.prototype.slice.call(arguments, 1);
for (var i = 0; i < this.handlers[eventType].length; i++) {
this.handlers[eventType][i].apply(this, handlerArgs);
}
return this;
}
Logger代码如下
// Logger.js
var kafka = require('kafka-node'),
Producer = kafka.Producer;
const EVENT = ['DEBUG', 'INFO', 'WARN', 'ERROR'];
function Logger(options) {
let client = new kafka.KafkaClient({
kafkaHost: 'logkafkafat05.test.com:9092',
});
this.producer = new Producer(client);
this.ready = readyFunc(this.producer);
this.message = createMessage(options);
}
// 发日志
Logger.prototype.sendMessage = function ({ level, layoutMessage }) {
let wholeMes = Object.assign(this.message, {
level,
layoutMessage,
timeStamp: new Date().getTime(),
});
let payloads = [
{
topic: 'test.log',
messages: JSON.stringify(wholeMes),
},
];
return this.ready.then(() => {
return new Promise((resolve, reject) => {
this.producer.send(payloads, function (err, data) {
// console.log(data);
resolve(data);
});
});
});
};
// 支持debug、warn、info、error等方法
EVENT.map((method) => {
Logger.prototype[method.toLowerCase()] = function (mes) {
return this.sendMessage({ level: method, layoutMessage: mes });
};
});
function readyFunc(producer) {
return new Promise((resolve, reject) => {
return producer.on('ready', resolve);
});
}
function createMessage({ appId, logName }) {
return {
appId,
logName,
level: '',
layoutMessage: '',
tags: {
HOST_IP: getIPAdress(),
catTrace: '',
},
};
}
// 获取本机ip
function getIPAdress() {
var interfaces = require('os').networkInterfaces();
for (var devName in interfaces) {
var iface = interfaces[devName];
for (var i = 0; i < iface.length; i++) {
var alias = iface[i];
if (
alias.family === 'IPv4' &&
alias.address !== '127.0.0.1' &&
!alias.internal
) {
return alias.address;
}
}
}
}
module.exports = Logger;
使用
- 支持两类调用方式
var Logger = require('./Logger.js');
var log = new Logger({ appId: 'XXXXX', logName: 'htmlquicker' });
// 调用方式1类
log.warn('warn test');
log.info('info test');
log.debug('debug test');
log.error('error test');
// 调用方式2类
log.sendMessage({ level: 'INFO', layoutMessage: 'test' });
log.sendMessage({ level: 'WARN', layoutMessage: 'test' });
log.sendMessage({ level: 'DEBUG', layoutMessage: 'test' });
log.sendMessage({ level: 'ERROR', layoutMessage: 'test' });
- koa 示例
let resContent = { Content: { downloadPath: `/download/${encodeURIComponent( fileName )}.html`, fileName: fileName, }, }; ctx.body = response.success(resContent); log.info( JSON.stringify({ href: ctx.href, ...resContent, }) );
网友评论