美文网首页
巧用广播,Spark向Kafka写入数据

巧用广播,Spark向Kafka写入数据

作者: DeepMine | 来源:发表于2017-11-21 22:50 被阅读0次

    Kafka生产者类不能实例化,需要包装成一个可实例化的类

    import java.util.concurrent.Future
    
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
    
    class KafkaSink[K,V](createProducer:()=> KafkaProducer[K,V]) extends Serializable{
    
      lazy val producer= createProducer()
      def send(topic:String, key:K, value:V):Future[RecordMetadata] =
        producer.send(new ProducerRecord[K,V](topic,key,value))
    
      def send(topic:String,value:V):Future[RecordMetadata] =
        producer.send(new ProducerRecord[K,V](topic,value))
    }
    
    object KafkaSink {
      import scala.collection.JavaConversions._
    
      def apply[K,V](config: Map[String,Object]):KafkaSink[K,V]= {
        val createProducerFunc = () => {
          val producer = new KafkaProducer[K,V](config)
          sys.addShutdownHook{
            producer.close()
          }
          producer
        }
    
        new KafkaSink(createProducerFunc)
      }
    
      def apply[K, V](config: java.util.Properties): KafkaSink[K,V] = apply(config.toMap)
    }
    
    

    注册为广播对象

    "spark write kafka" should "be fine" in {
    
        val kafkaProducer:Broadcast[KafkaSink[String,String]]={
          val kafkaProducerConfig = {
            val p = new Properties();
            p.setProperty("bootstrap.servers","kafka1-c1:9092,kafka2-c1:9092,kafka3-c1:9092")
            p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
            p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
            p
          }
          sc.broadcast(KafkaSink[String,String](kafkaProducerConfig))
        }
    
        val seqList = for(i<- 0 until 100) yield (i.toString,(i*2).toString)
    
        val rdd = sc.makeRDD(seqList)
        rdd.foreachPartition(f=>{
          f.foreach(record => {
            kafkaProducer.value.send("topic",record._1,record._2)
          })
        })
    

    相关文章

      网友评论

          本文标题:巧用广播,Spark向Kafka写入数据

          本文链接:https://www.haomeiwen.com/subject/jsugvxtx.html