美文网首页Spark专题
Kafka+redis+sparkstream整合

Kafka+redis+sparkstream整合

作者: 不愿透露姓名的李某某 | 来源:发表于2019-08-05 09:22 被阅读0次

现有数据类型如下(部分)

A 202.106.196.115 手机 iPhone8 8000

B 202.106.0.20 服装 布莱奥尼西服 199

C 202.102.152.3 家具 婴儿床 2000

D 202.96.96.68 家电 电饭锅 1000

F 202.98.0.68 化妆品 迪奥香水 200

H 202.96.75.68 食品 奶粉 600

将此类型的数据先将ip转换成归属地,再求计算成交金额、计算商品分类、计算区域成交金额,并将结果写入到Redis中

代码如下:

package day10

import day09.IpUtils

import kafka.common.TopicAndPartition

import kafka.message.MessageAndMetadata

import kafka.serializer.StringDecoder

import kafka.utils.{ZKGroupTopicDirs, ZkUtils}

import org.I0Itec.zkclient.ZkClient

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.streaming.dstream.InputDStream

import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}

import org.apache.spark.streaming.{Duration, StreamingContext}

object OrderCount {

def main(args: Array[String]): Unit = {

//指定组名

    val group="g001"

    //创建SparkConf

    val conf=new SparkConf().setMaster("local[4]").setAppName("OrderCount")

//创建SparkStreaming,并设置间隔时间

    val ssc=new StreamingContext(conf,Duration(5000))

val broadcastRef=IpUtils.broadcastIpRules(ssc,"D:\\data\\ip\\ip.txt")

//指定消费者的名字

    val topic="orders"

    //(sparkstream的Task直连到kafka的分区上,使用更加底层的API,效率更高)

    val brokerList="pro01:9092,pro02:9092,pro03:9092,pro04:9092"

    //指定zookeeper的地址,后期更新消费的偏移量使用(以后可以使用Redis,Mysql来记录偏移量)

    val zkQuorum="pro01:2181,pro02:2181,pro03:2181,pro04:2181"

    //创建stream时使用的topic名字集合,SaprkStreaming可以同时消费多个topic

    val topics=Set(topic)

//创建一个ZKGroupTopicDirs对象,其实是指定往zookeeper中写入数据的目录,用于保存偏移量

    val topicDirs=new ZKGroupTopicDirs(group,topic)

//获取zookeeper中的路径“/g001/offsets/wordcount”

    val zkTopicPath=s"${topicDirs.consumerOffsetDir}"

    //准备kafka的参数

    val kafkaParams=Map(

// "key.deserializer" -> classOf[StringDeserializer],

//      "value.deserializer"->classOf[StringDeserializer],

//      "deserializer.encoding"->"GB2312",//配置读取kafka中数据的编码

"metadata.broker.list"->brokerList,

"group.id"->group,

//从头开始读取数据

      "auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString

    )

//zookeeper的host和ip,创建一个client,用于更新偏移量的

//是zookeeper的客户端,可以从zookeeper读取偏移量数据,并更新偏移量

    val zkClient=new ZkClient(zkQuorum)

//查询该路径下是否有子节点(默认有子节点为我们自己保存不同partition时生成的)

//  /g001/offsets/wordcount/0/10001"

//  /g001/offsets/wordcount/1/30001"

//  /g001/offsets/wordcount/2/10001"

    val children=zkClient.countChildren(zkTopicPath)

var kafkaStream: InputDStream[(String,String)] =null

    //如果zookeeper中有保存offset,我们会利用这个offset作为kafkaStream的起始位置

    var  fromOffsets:Map[TopicAndPartition,Long]=Map()

//如果保存过offset

    if(children>0){

for(i<-0 until children){

//  /g001/offsets/wordcount/0/10001"

//  /g001/offsets/wordcount/

        val  partitionOffset:String =zkClient.readData[String](s"$zkTopicPath/${i}")

//owrdcount/0

        val tp =TopicAndPartition(topic,i)

//将不同partition对应的offset增加到fromOffsets中

//wordcount/0->10001

        fromOffsets+=(tp -> partitionOffset.toLong)

}

//Key:kafka的的key  values:"hello tom hello jerry"

//这个会将kafka的消息进行transform,最终kafka的数据都会变成(kafka的的key,message)这样的tuple

      val messageHandler=(mmd:MessageAndMetadata[String,String])=>(mmd.topic,mmd.message())

//通过kafkaUtils创建直连的Dstream(fromOffset参数的作用是:按照前面计算好了的偏移量继续消费数据)

//[String,String,StringDecoder,StringDecoder,    (String,String)]

//  key value key的解码方式  value的解码方式

      kafkaStream=KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,kafkaParams,fromOffsets,messageHandler)

}else{

//如果未保存,根据kafkaParam的配置使用最新(largest)或者最旧的(smallest)Offset

      kafkaStream=KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)

}

//偏移量的范围

    var offsetRanges=Array[OffsetRange]()

//直连方式只有在KafkaDstream的RDD中才能获取偏移量,那么就不能调用Dstream的Transformation

//所以只能在kafkaStream调用foreachRDD,获取RDD的偏移量,然后就是对RDD进行操作了

//依次迭代kafkaDstream中的kafkaRDD

    kafkaStream.foreachRDD{ kafkaRDD=>

//判断当前的kafkaStream中的RDD是否有数据

  if(!kafkaRDD.isEmpty()){

//只有kafkaRDD可以强转成HasOffsetRanges,并获取到偏移量

  offsetRanges= kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges

val lines: RDD[String] = kafkaRDD.map(_._2)

//整理数据

  val fields = lines.map(_.split(" "))

//计算成交金额

  Cacal.calculateIncome(fields)

//计算商品分类

  Cacal.calculateItem(fields)

//计算区域成交金额

  Cacal.calculateZone(fields,broadcastRef)

lines.foreachPartition(partition=>

partition.foreach(x=>{

println(x)

})

)

for(o<-offsetRanges){

//  /g001/offsets/wordcount/2

    val  zkPath=s"${topicDirs.consumerOffsetDir}/${o.partition}"

    //将partition的offset保存到zookeeper

//  /g001/offsets/wordcount/2/10001"

    ZkUtils.updatePersistentPath(zkClient,zkPath,o.untilOffset.toString)

}

}

}

//启动SparkStreaming程序

    ssc start()

//等待优雅的退出(当前的任务结束完了再退出)

    ssc.awaitTermination()

}

}

Conset类:

package day10

object Conset {

val TOTAL_INCOME="TOTAL_INCOME"

}

Cacal类:

package day10

import day01.ip.TestIp

import org.apache.spark.broadcast.Broadcast

import org.apache.spark.rdd.RDD

object Cacal {

def calculateIncome(fields:RDD[Array[String]]) = {

//将数据进行计算写入到Redis

    val priceRDD = fields.map(arr => {

val price = arr(4).toDouble

price

})

//reduce是一个Action,会把结果返回到Deiver端

//将当前批次的总金额返回

    val sum = priceRDD.reduce(_+_)

//获取一个jedis连接

    val conn = JedisDemo.getConnection()

//将历史值和当前的值进行累加

//    conn.set(Conset.TOTAL_INCOME,sum.toString)

    conn.incrByFloat(Conset.TOTAL_INCOME,sum)

//释放连接

    conn.close()

}

//计算分类的成交金额

  def calculateItem(fields:RDD[Array[String]]) = {

//对filed的map方法是在driver端调用的

    val itemser: RDD[(String, Double)] = fields.map(arr => {

//商品分类

      val item = arr(2)

//价格

      val price = arr(4).toDouble

(item, price)

})

//按商品分类进行聚合

val reduced = itemser.reduceByKey(_+_)

//获取一个Jedis连接

//在driver端拿连接不好

//    val conn=JedisDemo.getConnection()

//将当前批次累计的数据放入redis中

//foreachPartition是一个Action

    reduced.foreachPartition(part=>{

//获取一个Jedis连接

      val conn=JedisDemo.getConnection()

part.foreach(t=>{

conn.incrByFloat(t._1,t._2)

})

//将当前分区中的数据更新完后关闭连接

    })

}

def calculateZone(fiels:RDD[Array[String]],broadcastRef:Broadcast[Array[(Long,Long,String)]])={

val provinceAndPrice = fiels.map(arr => {

val ip = arr(1)

val price = arr(4).toDouble

val ipNum = TestIp.ip2Long(ip)

//在Executor中获取到广播变量的全部规则

    val allRules: Array[(Long, Long,String)] = broadcastRef.value

//二分法查找

    val index: Int = TestIp.binarySearch(allRules, ipNum)

var province ="未知"

    if (index != -1) {

province = allRules(index)._3

}

//省份,,订单金额

    (province,1)

})

//按省份进行聚合

  val reduced = provinceAndPrice.reduceByKey(_+_)

//将数据更新到Redis

  reduced.foreachPartition(part=>{

val conn=JedisDemo.getConnection()

part.foreach(t=>{

conn.incrByFloat(t._1,t._2)

})

conn.close()

})

}

}

TestIp类:

package day01.ip

import scala.io.Source

class TestIp {

}

object TestIp{

def ip2Long(ip:String):Long={

val fragments=ip.split("[.]")

var ipNum =0L

    for (i<-0 until fragments.length){

ipNum=fragments(i).toLong | ipNum <<8L

    }

ipNum

}

def  readRules(path:String):Array[(Long,Long,String)]={

val bf = Source.fromFile(path)

val lines = bf.getLines()

//对ip规则进行整理

    val rules = lines.map(line => {

val fileds = line.split("[|]")

val startNum = fileds(2).toLong

val endNum = fileds(3).toLong

val province = fileds(6)

(startNum, endNum, province)

}).toArray

rules

}

def binarySearch(lines:Array[(Long,Long,String)],ip:Long):Int= {

var low =0

    var high = lines.length -1

    while (low <= high) {

val middle = (low + high) /2

      if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))

return middle

if (ip < lines(middle)._1)

high = middle -1

      else {

low = middle +1

      }

}

-1

  }

def main(args: Array[String]): Unit = {

val rules: Array[(Long, Long,String)] =readRules("D:\\data\\ip\\ip.txt")

//将IP地址转换成十进制

    val ipNum=ip2Long("111.198.38.182")

//查找

    val i =binarySearch(rules,ipNum)

//根据脚本到rules中查找对应数据

    val tp=rules(i)

val  province=tp._3

println(province)

}

}

写完后在kafka创建一个orders的topic,并启动生产者写入数据

//我的kafka是连接zookeeper的,启动kafka之前也先启动zookeeper

而我们的消费者要获取到生产者写入的数据

然后我们在写一个小程序连接到redis查看我们的结果数据写入了没有

连接redis代码

执行结果:

相关文章

网友评论

    本文标题:Kafka+redis+sparkstream整合

    本文链接:https://www.haomeiwen.com/subject/kqlpdctx.html