Kafka生产者类不能实例化,需要包装成一个可实例化的类
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
class KafkaSink[K,V](createProducer:()=> KafkaProducer[K,V]) extends Serializable{
lazy val producer= createProducer()
def send(topic:String, key:K, value:V):Future[RecordMetadata] =
producer.send(new ProducerRecord[K,V](topic,key,value))
def send(topic:String,value:V):Future[RecordMetadata] =
producer.send(new ProducerRecord[K,V](topic,value))
}
object KafkaSink {
import scala.collection.JavaConversions._
def apply[K,V](config: Map[String,Object]):KafkaSink[K,V]= {
val createProducerFunc = () => {
val producer = new KafkaProducer[K,V](config)
sys.addShutdownHook{
producer.close()
}
producer
}
new KafkaSink(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): KafkaSink[K,V] = apply(config.toMap)
}
注册为广播对象
"spark write kafka" should "be fine" in {
val kafkaProducer:Broadcast[KafkaSink[String,String]]={
val kafkaProducerConfig = {
val p = new Properties();
p.setProperty("bootstrap.servers","kafka1-c1:9092,kafka2-c1:9092,kafka3-c1:9092")
p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
p
}
sc.broadcast(KafkaSink[String,String](kafkaProducerConfig))
}
val seqList = for(i<- 0 until 100) yield (i.toString,(i*2).toString)
val rdd = sc.makeRDD(seqList)
rdd.foreachPartition(f=>{
f.foreach(record => {
kafkaProducer.value.send("topic",record._1,record._2)
})
})
网友评论