美文网首页
kafka-node 实现生产者和消费者

kafka-node 实现生产者和消费者

作者: 西葫芦炒胖子 | 来源:发表于2019-08-01 14:17 被阅读0次

    生产者 producer.js

    • 从代码我们可以看到 生产者每两秒生产一条数据
    • 具体操作流程:
    • 1.安装请参考其他文章
    • 2.启动zk 启动kofka
    • 3.安装第三方模块依赖 node install kafka-node
      1. 执行脚本 node producer.js node consumer.js
    /**
     * 生产者
     */
    
    const kafka = require('kafka-node');
    
    let conn = {'kafkaHost':'localhost:9092'};
    
    var MQ = function (){
        this.mq_producers = {};
    }
    
    MQ.prototype.AddProducer = function (conn, handler){
        console.log('增加生产者',conn, this);
        let client = new kafka.KafkaClient(conn);
        /**
         * Producer(client, [options])
         * client: 和kafka服务保持连接的client对象
         * options:producer的属性
         */
        let producer = new kafka.Producer(client);
    
        producer.on('ready', function(){
            if(!!handler){
                handler(producer);
            }
        });
    
        producer.on('error', function(err){
            console.error('producer error ',err.stack);
        });
    
        this.mq_producers['common'] = producer;
        return producer;
    }
    console.log(MQ);
    var mq = new MQ();
    
    mq.AddProducer(conn, function (producer){
        producer.createTopics(['broadcast'], function (){
            setInterval(function(){
                mq.mq_producers['common'].send([{topic:['broadcast'], 
                messages:[JSON.stringify({"cmd":"testRpc","value":"Hello World"})]}], function (){
                    console.log("..... ");
                })
            }, 2000);
        })
    });
    
    
    

    消费者 consumer.js

    /**
     * 消费者
     */
    
    const kafka = require('kafka-node');
    
    let conn = {'kafkaHost':'127.0.0.1:9092'};
    let consumers = [
        {
            'type': 'consumer',
            'options': {'autoCommit': true},
            'name':'common',
            'topic':[
                {'topic': 'broadcast', 'partition': 0}
            ]
        }
    ];
    
    let MQ = function(){
    
    }
    
    MQ.prototype.AddConsumer = function (conn, topics, options, handler){
        let client = new kafka.KafkaClient(conn);
        let consumer = new kafka.Consumer(client, topics, options);
    
        if(!!handler){
            consumer.on('message', handler);
        }
    
        consumer.on('error', function(err){
            console.error('consumer error ',err.stack);
        });
    }
    
    var mq = new MQ();
    
    
    mq.AddConsumer(conn, consumers[0].topic, consumers[0].options, function (message){
        console.log(message.value);
    });
    

    相关文章

      网友评论

          本文标题:kafka-node 实现生产者和消费者

          本文链接:https://www.haomeiwen.com/subject/fsvqdctx.html