美文网首页
kafka node 使用

kafka node 使用

作者: 小旎子_8327 | 来源:发表于2020-04-02 15:37 被阅读0次

    KafkaClient:

    概念:能够直接连接Kafka brokers的client
    初始化:const client = new kafka.KafkaClient({kafkaHost: '10.3.100.196:9092'}); 没有填写kafkaHost默认是:localhost:9092

    Producer

    初始化:Producer(KafkaClient, [options], [customPartitioner])
    例:

    var kafka = require('kafka-node'),
        Producer = kafka.Producer,
        client = new kafka.KafkaClient(),
        producer = new Producer(client);
    

    发送消息:
    send(payloads, cb)

    payloads:数组,item形如json
    {
       topic: 'topicName',
       messages: ['message body'], // multi messages should be a array, single message can be just a string or a KeyedMessage instance
       key: 'theKey', // string or buffer, only needed when using keyed partitioner
       partition: 0, // default 0
       attributes: 2, // default: 0
       timestamp: Date.now() // <-- defaults to Date.now() (only available with kafka v0.10+)
    }
    cb: 处理成功或失败的回调函数
    

    创建Topic
    createTopics(topics, cb)

    HighLevelProducer

    HighLevelProducer(KafkaClient, [options], [customPartitioner])
    send(payloads, cb)
    createTopics(topics, async, cb)

    ProducerStream

    ProducerStream (options)

    案例:
    使用Transform去更新数据

    Consumer

    Consumer(client, payloads, options)
    on('error', function (err) {})
    on('offsetOutOfRange', function (err) {})

    addTopics(topics, cb, fromOffset)
    removeTopics(topics, cb)
    commit(cb)
    setOffset(topic, partition, offset)
    pause()
    resume()
    pauseTopics(topics)
    resumeTopics(topics)
    close(force, cb)

    ConsumerStream

    ConsumerStream(client, payloads, options)

    ConsumerGroup

    ConsumerGroup(options, topics)

    相关文章

      网友评论

          本文标题:kafka node 使用

          本文链接:https://www.haomeiwen.com/subject/ljjauhtx.html