美文网首页
spark-streaming中使用spark-sql做关联查询

spark-streaming中使用spark-sql做关联查询

作者: lsnl8480 | 来源:发表于2016-03-09 12:34 被阅读1991次

实现:


首先基于topic,创建出kafka的DStream流

val sparkConf = new SparkConf().setAppName(appParams.appName)

val sc = new SparkContext(sparkConf)

val streamingContext = new StreamingContext(sc, Seconds(appParams.batchProcInterval))

val kafkaParams = Map[String, String]("metadata.broker.list" -> appParams.brokers)

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, Set[String](appParams.messageInTopic))

创建时间窗:

val windows = messages.map(_._2).window(Seconds(appParams.windownTime), Seconds(appParams.windownTime))

针对每个时间窗做计算

windows.foreachRDD { rdd =>

......

}

每个时间窗内部的处理:


创建case class

case class Record(channelid: String, starttime: Long)

创建sqlContext

val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)

import sqlContext.implicits._

将当前消息流转换成DataFrame:

val df = rdd.map(_.split("\\|")).map(line => Record(line(5), line(2).toLong)).toDF()

注册成一张表:

df.registerTempTable("UserPlayChannel")

读取parquet数据,注册成另一张表:

val paraquetFile = sqlContext.read.parquet(filePath)

paraquetFile.registerTempTable("ProgramMetaData")

现在有了两张表,关联查询只需要写好sql语句就可以了,样例:

select b.programid , count(b.programid) as count from UserPlayChannel a join ProgramMetaData b on a.channelid = b.channelid and a.starttime >= b.starttime and a.starttime <= b.endtime group by b.programid order by count DESC

代码执行:

val hotProgramList = sqlContext.sql("select b.programid , count(b.programid) as count from UserPlayChannel a join ProgramMetaData b on a.channelid = b.channelid and a.starttime >= b.starttime and a.starttime <= b.endtime group by b.programid order by count DESC")

现在hotProgramList就是关联查询出的结果。

相关文章

  • spark-streaming中使用spark-sql做关联查询

    实现: 首先基于topic,创建出kafka的DStream流 val sparkConf = new Spark...

  • spark-sql 关联查询

    给表起别名的方式读取关联查询结果

  • 关联查询、子查询与分页查询

    关联查询 关联关系分为一对一,一对多,多对多关系关联查询就是从多张表中查询数据,当我们使用n张表使用关联查询,至少...

  • MySQL中的join以及on条件的用法

    join 经常用来做关联查询,可以把两张或者多张表用通过关联条件关联起来做数据查询在使用join查询的时候要区分主...

  • mongo数据结构设计

    两种关系设计方式 内嵌:只做查询使用,避免返回数据时关联查询或多次查询 引用:关联的集合涉及到附加信息(被引用的文...

  • laravel with 渴求式加载指定字段

    在使用 Laravel 的关联查询中,我们经常使用 with 方法来避免 N+1 查询,但是 with 会将目标关...

  • MySQL的多表关联查询

    一、多表关联查询 多表关联查询是使用一条SQL语句,将关联的多张表的数据查询出来。 1.1 交叉查询 交叉查询就是...

  • mongodb单集合关联查询parent字段内容

    最近在考虑一个树状结构存储。 使用的mongodb,同一张集合中,使用parentId来做关联。 查询的时候,希望...

  • 关联关系查询(第一讲)

    关联查询 当查询内容涉及到具有关联关系的多个表时,就需要使用关联查询。根据表与表之间的关联关系的不同,关联查询分为...

  • 关联子查询

    关联子查询会在细分的组内进行比较时使用。关联子查询和GROUP BY子句一样,也可以对表中的数据进行切分。关联子查...

网友评论

      本文标题:spark-streaming中使用spark-sql做关联查询

      本文链接:https://www.haomeiwen.com/subject/prnrkttx.html