kafka 安装
没有安装的小伙伴可以看我这篇文章
kafka安装
安装依赖
这里用到的是kafkajs
npm install kafkajs
# yarn add kafkajs
示例代码:
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
requestTimeout: 25000,
connectionTimeout: 30000,
authenticationTimeout:30000,
retry: {
initialRetryTime: 3000,
retries: 0
},
brokers: ['71.24.89.191:9092']
})
const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })
const run = async () => {
// Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'first', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
// await new Promise(r=>setTimeout(r,3000))
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
},
})
}
run().catch((e)=>{
debugger
})
setInterval(async ()=>{
await producer.connect()
await producer.send({
topic: 'first',
messages: [
{ value: 'Hello KafkaJS user!' },
],
})
},1000)
网友评论