生产数据,消费者消费代码。
const Kafka = require('node-rdkafka');
const createReadStream = Kafka.createReadStream;
// 生产
const producer = new Kafka.Producer({
'client.id': 'kafka',
'metadata.broker.list': 'localhost:9092'
});
const stream = Kafka.Producer.createWriteStream({
'metadata.broker.list': 'localhost:9092'
}, {}, {
topic: 'topic-name',
});
setInterval(() => {
const queuedSuccess = stream.write(Buffer.from(`Awesome message_${Math.random()}`));
if (queuedSuccess) {
console.log('We queued our message!');
} else {
console.log('queue is full');
}
}, 1000);
stream.on('error', (err) => {
console.error('Error in our kafka stream');
console.error(err);
})
// 消费
const readStream = createReadStream({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
}, {}, {topics: ['topic-name']});
readStream.on('data', (message) => {
console.log('got message');
console.log(message);
})
网友评论