美文网首页
基于Scala实现Kafka生产者API

基于Scala实现Kafka生产者API

作者: 写scala的老刘 | 来源:发表于2019-06-12 14:42 被阅读0次

    Kafka 生产者发送数据一共有3种方式

    (1) 发送并忘记(fire-and-forget),把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息

    (2)使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待(会返回元数据或者抛出异常), 就可以知道消息是否发送成功

    (3)异步发送, 大多数时候,我们并不需要等待响应——尽管 Kafka会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必需的。 不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志等,这样的情况下可以使用异步发送消息的方式,调用 send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数。这也是生产环境下最常用的一种方式,相关Scala代码如下:

    package cn.com.bonc.Face
    import java.util.Properties
    import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
    import org.apache.kafka.common.serialization.StringSerializer
    import scala.io.Source
    /**
      * Create by liuyancheng 2019-06-12 11:13
      */
    object ScalaKafkaProducer {
      def main(args: Array[String]): Unit = {
        val kafkaProp = new Properties()
        kafkaProp.put("bootstrap.servers", "kafkahost:port")
        kafkaProp.put("acks", "1")
        kafkaProp.put("retries", "3")
        //kafkaProp.put("batch.size", 16384)//16k
        kafkaProp.put("key.serializer", classOf[StringSerializer].getName)
        kafkaProp.put("value.serializer", classOf[StringSerializer].getName)
        val producer = new KafkaProducer[String, String](kafkaProp)
        val lines = Source.fromFile("E:\\CodeProject\\IdeaProjects\\HeNan\\FactRecognition\\src\\main\\resources\\data.txt").getLines()
        while (lines.hasNext) {
          val line = lines.next()
          val record = new ProducerRecord[String, String]("facetest", line)
          producer.send(record, new Callback {
            override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
              if (metadata != null) {
                println("发送成功")
              }
              if (exception != null) {
                println("消息发送失败")
              }
            }
          })
        }
        producer.close()
      }
    }
    
    

    相关文章

      网友评论

          本文标题:基于Scala实现Kafka生产者API

          本文链接:https://www.haomeiwen.com/subject/chllfctx.html