美文网首页大数据学习
Spark Streaming消费Kafka写入Hbase

Spark Streaming消费Kafka写入Hbase

作者: xiaogp | 来源:发表于2020-07-06 15:42 被阅读0次

编写生产者代码, 往kafka写入一条房产json格式的数据

{
    "id_code": 10,
    "date": "2020-03-20",
    "layout": "2室1厅1卫",
    "area": "50.5",
    "orientation": "南",
    "build_year": "1989",
    "floor_degree": "低楼层",
    "floor_all": "6",
    "renovation": "简装",
    "district": "未知",
    "locate": "南码头  · 距离地铁临沂新村307米",
    "place": "临沂六村",
    "ditie": "临沂新村",
    "count": "14",
    "release": "2020-03-13",
    "total_price": "280",
    "avg_price": "55446"
}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaProducerAnalysis {
    public static final String brokerList = "192.168.61.97:9092";
    public static final String topic = "test_gp";

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.RETRIES_CONFIG, 10);
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        // 实例化生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        // 构建待发送的消息
        String mes = "{\"id_code\": 10, \"date\": \"2020-03-20\", \"layout\": \"2室1厅1卫\", \"area\": \"50.5\", \"orientation\": \"南\", \"build_year\": \"1989\", \"floor_degree\": \"低楼层\", \"floor_all\": \"6\", \"renovation\": \"简装\", \"district\": \"未知\", \"locate\": \"南码头  · 距离地铁临沂新村307米\", \"place\": \"临沂六村\", \"ditie\": \"临沂新村\", \"count\": \"14\", \"release\": \"2020-03-13\", \"total_price\": \"280\", \"avg_price\": \"55446\"}";
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, mes);

        for (int i = 0; i <= 2000; i++) {
            try {
                Future<RecordMetadata> future = producer.send(record);
                RecordMetadata metadata = future.get();
                // System.out.println(metadata.topic() + "-" + metadata.partition() + "-" + metadata.offset());
                Thread.sleep(1000);

            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 关闭生产者实例
        producer.close();
    }
}

编写Spark Streaming进行消费, 使用fastJson解析数据, 使用saveAsNewAPIHadoopDataset接口往hbase插入数据, 插入完成再提交位移

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.log4j.{Level, Logger}
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.util.Bytes
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark.streaming.kafka010._
import org.apache.hadoop.mapreduce.Job

object StreamingWithKafka {

  private val brokers = "192.168.61.97:9092"
  private val topic = "test_gp"
  private val group = "group.demo"
  private val checkpointDir = "/opt/kafka/checkpoint"

  def main(args: Array[String]): Unit = {
    Logger.getRootLogger.setLevel(Level.WARN)
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingWithKafka")
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    //    ssc.checkpoint(checkpointDir)

    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> group,
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
    )

    val stream = KafkaUtils.createDirectStream[String, String](
      ssc, PreferConsistent, Subscribe[String, String](List(topic), kafkaParams))

    val tablename = "test:gp"
    ssc.sparkContext.hadoopConfiguration.set("hbase.zookeeper.quorum", "192.168.61.97")
    ssc.sparkContext.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
    ssc.sparkContext.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)

    val job = Job.getInstance(ssc.sparkContext.hadoopConfiguration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    stream.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        rdd.foreach(f => println(f.value()))

        val save_rdd = rdd.map(x => {
          val jsonObject: JSONObject = JSON.parseObject(x.value())
//          val id_code = jsonObject.getString("id_code")
//          val layout = jsonObject.getString("layout")
//          val put = new Put(Bytes.toBytes(id_code))
//          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("layout"), Bytes.toBytes(layout))
          val id_code = jsonObject.getString("id_code")
          val put = new Put(Bytes.toBytes(id_code))
          insert_hbase(jsonObject, put)
          (new ImmutableBytesWritable, put)
        })

        save_rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)

        // 提交offset值
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
    })

    def insert_hbase(jsonObject: JSONObject, onePut: Put): Unit = {
      val keys = jsonObject.keySet
      val iterator = keys.iterator
      while (iterator.hasNext) {
        val col = iterator.next()
        val value = jsonObject.get(col).toString
        onePut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(col), Bytes.toBytes(value))
      }
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

查看hbase数据

hbase(main):036:0> get 'test:gp', '10'
COLUMN                                           CELL                                                                                                                                         
 info:area                                       timestamp=1594021236559, value=50.5                                                                                                          
 info:avg_price                                  timestamp=1594021236559, value=55446                                                                                                         
 info:build_year                                 timestamp=1594021236559, value=1989                                                                                                          
 info:count                                      timestamp=1594021236559, value=14                                                                                                            
 info:date                                       timestamp=1594021236559, value=2020-03-20                                                                                                    
 info:district                                   timestamp=1594021236559, value=\xE6\x9C\xAA\xE7\x9F\xA5                                                                                      
 info:ditie                                      timestamp=1594021236559, value=\xE4\xB8\xB4\xE6\xB2\x82\xE6\x96\xB0\xE6\x9D\x91                                                              
 info:floor_all                                  timestamp=1594021236559, value=6                                                                                                             
 info:floor_degree                               timestamp=1594021236559, value=\xE4\xBD\x8E\xE6\xA5\xBC\xE5\xB1\x82                                                                          
 info:id_code                                    timestamp=1594021236559, value=10                                                                                                            
 info:layout                                     timestamp=1594021236559, value=2\xE5\xAE\xA41\xE5\x8E\x851\xE5\x8D\xAB                                                                       
 info:locate                                     timestamp=1594021236559, value=\xE5\x8D\x97\xE7\xA0\x81\xE5\xA4\xB4  \xC2\xB7 \xE8\xB7\x9D\xE7\xA6\xBB\xE5\x9C\xB0\xE9\x93\x81\xE4\xB8\xB4\xE
                                                 6\xB2\x82\xE6\x96\xB0\xE6\x9D\x91307\xE7\xB1\xB3                                                                                             
 info:orientation                                timestamp=1594021236559, value=\xE5\x8D\x97                                                                                                  
 info:place                                      timestamp=1594021236559, value=\xE4\xB8\xB4\xE6\xB2\x82\xE5\x85\xAD\xE6\x9D\x91                                                              
 info:release                                    timestamp=1594021236559, value=2020-03-13                                                                                                    
 info:renovation                                 timestamp=1594021236559, value=\xE7\xAE\x80\xE8\xA3\x85                                                                                      
 info:total_price                                timestamp=1594021236559, value=280 

相关文章

网友评论

    本文标题:Spark Streaming消费Kafka写入Hbase

    本文链接:https://www.haomeiwen.com/subject/lnawqktx.html