SparkStreaming基础

作者: Z尽际 | 来源:发表于2017-06-07 15:59 被阅读241次

    * SparkStreaming基础

    打开之前构建好的Maven工程,如何构建?请参看SparkCore基础(二)的最后部分。

    在SparkCore中,我们操作的数据都在RDD中,是Spark的一个抽象概念,也是一个抽象类,是由SparkContext对象sc转换得到的。

    那么在SparkStreaming中,我们使用的Spark的StreamingContext对象,简称ssc。

    我们本节内容以动手为基础,直接开始一些测试案例:具体的框架结构请参看官方文档,写的非常之详细。

    SparkStreaming在Windows中使用IDEA的开发案例

    WordCount在IDEA工具

    首先导入相关依赖:

    代码如下:

    SparkStreaming与Kafka在IDEA工具

    我们可以使用Flume+Kafka将数据实时转入到SparkStreaming分析进行分析,因为Flume和Kafka的集成在之前的章节中已经讲解过,所以此时只讲述如何将Kafka与SparkStreaming进行集成,首先导入依赖:

    代码如下:注意红框内容

    然后启动Kafka的相关服务:

    启动Kafka Broker节点

    $ bin/kafka-server-start.sh config/server.properties

    创建Topic

    $ bin/kafka-topics.sh --create --zookeeper z01:2181 --replication-factor 1 --partitions 1 --topic SparkTopic

    查看一下有几个Topic

    $ bin/kafka-topics.sh --list --zookeeper z01:2181

    发布数据

    $ bin/kafka-console-producer.sh --broker-list z01:9092 --topic SparkTopic

    开启一个控制台消费者用于验证

    $ bin/kafka-console-consumer.sh --zookeeper z01:2181 --topic SparkTopic --from-beginning

    SparkStreaming统计录入的所有数据

    你会发现之前我们统计的单词每过几秒都是新的统计,并没有把每次流入的数据进行汇总统计,那么,我们此时的目标是,你懂得:)

    使用updateStateByKey将相同Key的数据的state状态进行汇总,顺便一提:hadoop,1里面的1其实就是一个state,之前我们也一直称之为count对吧,思维要扭转一下,毕竟,不是所有的数据分析统计都只是简单的加减乘除,用状态来描述,也是可以的。

    统计实时的最新状态,代码如下:

    SparkStreaming统计某一个时间范围内的所有数据

    我们需要使用windows窗口滑动这样一个概念,比如,设定一个窗口的大小为30秒,每次我们统计的都是最近30秒的数据汇总,将Window窗口一直向某一个方向滑动,一次滑动指定的距离,进行统计即可,其实一个Window就好比是框住了一定范围时间内的batch,SparkStreaming默认将200ms的数据分为一个batch(可以暂且理解为一个数据块)

    统计最近一段时间的状态,代码如下:

    Spark与HBase的集成

    首先导入HBase的相关依赖:

    从HBase中读取数据,代码如下:

    * 总结

    通过一些常用的案例,你应该能够掌握SparkStreaming运行的基本原理和架构模型了,Spark的官方文档特别的相信,源码注释也非常详细,如有不太理解的地方,直接看源码和官方文档是最好的途径。


    IT全栈公众号:

    QQ大数据技术交流群(广告勿入):476966007


    下一节:Storm框架基础(一)

    相关文章

      网友评论

      • e873b4ef0784:尽际大神,最近跑一个sparkStreaming程序,总是跑几个小时后报java.util.concurrent.TimeoutException: Futures timed out after [360 seconds],每次都调大spark.network.timeout参数,可调完之后多跑了几个小时还是会报TimeoutException,不知是否没找到问题的根源,感谢您的回复,拜托了!
        e873b4ef0784:@Z尽际 具体报错信息如下:(上一半)
        til.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
        Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600 seconds]. This timeout is controlled by spark.network.timeout
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:242)
        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
        ... 14 more
        Caused by: java.util.concurrent.TimeoutException: Futures timed out after [600 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:241)
        ... 15 more
        e873b4ef0784:@Z尽际 感谢您的回复,报错信息也没有显示具体是在哪段操作报的超时,我是每隔一分钟打一个批次运行,运行大概6、7个小时不等,就会报org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600 seconds]. This timeout is controlled by spark.network.timeout。然后程序就卡在那个job,本来零点几秒就能执行完的job会卡好几个小时,只能手动kill掉。
        麻烦您了,多谢!
        Z尽际:@小仙豆_b421 你是哪段操作报的超时,可以贴出来看一下trace

      本文标题:SparkStreaming基础

      本文链接:https://www.haomeiwen.com/subject/nqrkzttx.html