编写生产者代码, 往kafka写入一条房产json格式的数据
{
"id_code": 10,
"date": "2020-03-20",
"layout": "2室1厅1卫",
"area": "50.5",
"orientation": "南",
"build_year": "1989",
"floor_degree": "低楼层",
"floor_all": "6",
"renovation": "简装",
"district": "未知",
"locate": "南码头 · 距离地铁临沂新村307米",
"place": "临沂六村",
"ditie": "临沂新村",
"count": "14",
"release": "2020-03-13",
"total_price": "280",
"avg_price": "55446"
}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class KafkaProducerAnalysis {
public static final String brokerList = "192.168.61.97:9092";
public static final String topic = "test_gp";
public static Properties initConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.RETRIES_CONFIG, 10);
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
// 实例化生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 构建待发送的消息
String mes = "{\"id_code\": 10, \"date\": \"2020-03-20\", \"layout\": \"2室1厅1卫\", \"area\": \"50.5\", \"orientation\": \"南\", \"build_year\": \"1989\", \"floor_degree\": \"低楼层\", \"floor_all\": \"6\", \"renovation\": \"简装\", \"district\": \"未知\", \"locate\": \"南码头 · 距离地铁临沂新村307米\", \"place\": \"临沂六村\", \"ditie\": \"临沂新村\", \"count\": \"14\", \"release\": \"2020-03-13\", \"total_price\": \"280\", \"avg_price\": \"55446\"}";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, mes);
for (int i = 0; i <= 2000; i++) {
try {
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
// System.out.println(metadata.topic() + "-" + metadata.partition() + "-" + metadata.offset());
Thread.sleep(1000);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
// 关闭生产者实例
producer.close();
}
}
编写Spark Streaming进行消费, 使用fastJson解析数据, 使用saveAsNewAPIHadoopDataset接口往hbase插入数据, 插入完成再提交位移
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.log4j.{Level, Logger}
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.util.Bytes
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark.streaming.kafka010._
import org.apache.hadoop.mapreduce.Job
object StreamingWithKafka {
private val brokers = "192.168.61.97:9092"
private val topic = "test_gp"
private val group = "group.demo"
private val checkpointDir = "/opt/kafka/checkpoint"
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.WARN)
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingWithKafka")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val ssc = new StreamingContext(sparkConf, Seconds(3))
// ssc.checkpoint(checkpointDir)
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc, PreferConsistent, Subscribe[String, String](List(topic), kafkaParams))
val tablename = "test:gp"
ssc.sparkContext.hadoopConfiguration.set("hbase.zookeeper.quorum", "192.168.61.97")
ssc.sparkContext.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
ssc.sparkContext.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val job = Job.getInstance(ssc.sparkContext.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
stream.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
rdd.foreach(f => println(f.value()))
val save_rdd = rdd.map(x => {
val jsonObject: JSONObject = JSON.parseObject(x.value())
// val id_code = jsonObject.getString("id_code")
// val layout = jsonObject.getString("layout")
// val put = new Put(Bytes.toBytes(id_code))
// put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("layout"), Bytes.toBytes(layout))
val id_code = jsonObject.getString("id_code")
val put = new Put(Bytes.toBytes(id_code))
insert_hbase(jsonObject, put)
(new ImmutableBytesWritable, put)
})
save_rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
// 提交offset值
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
})
def insert_hbase(jsonObject: JSONObject, onePut: Put): Unit = {
val keys = jsonObject.keySet
val iterator = keys.iterator
while (iterator.hasNext) {
val col = iterator.next()
val value = jsonObject.get(col).toString
onePut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(col), Bytes.toBytes(value))
}
}
ssc.start()
ssc.awaitTermination()
}
}
查看hbase数据
hbase(main):036:0> get 'test:gp', '10'
COLUMN CELL
info:area timestamp=1594021236559, value=50.5
info:avg_price timestamp=1594021236559, value=55446
info:build_year timestamp=1594021236559, value=1989
info:count timestamp=1594021236559, value=14
info:date timestamp=1594021236559, value=2020-03-20
info:district timestamp=1594021236559, value=\xE6\x9C\xAA\xE7\x9F\xA5
info:ditie timestamp=1594021236559, value=\xE4\xB8\xB4\xE6\xB2\x82\xE6\x96\xB0\xE6\x9D\x91
info:floor_all timestamp=1594021236559, value=6
info:floor_degree timestamp=1594021236559, value=\xE4\xBD\x8E\xE6\xA5\xBC\xE5\xB1\x82
info:id_code timestamp=1594021236559, value=10
info:layout timestamp=1594021236559, value=2\xE5\xAE\xA41\xE5\x8E\x851\xE5\x8D\xAB
info:locate timestamp=1594021236559, value=\xE5\x8D\x97\xE7\xA0\x81\xE5\xA4\xB4 \xC2\xB7 \xE8\xB7\x9D\xE7\xA6\xBB\xE5\x9C\xB0\xE9\x93\x81\xE4\xB8\xB4\xE
6\xB2\x82\xE6\x96\xB0\xE6\x9D\x91307\xE7\xB1\xB3
info:orientation timestamp=1594021236559, value=\xE5\x8D\x97
info:place timestamp=1594021236559, value=\xE4\xB8\xB4\xE6\xB2\x82\xE5\x85\xAD\xE6\x9D\x91
info:release timestamp=1594021236559, value=2020-03-13
info:renovation timestamp=1594021236559, value=\xE7\xAE\x80\xE8\xA3\x85
info:total_price timestamp=1594021236559, value=280
网友评论