大数据技术分享:使用Spark Streaming SQL进行P

作者: 3d游戏建模经验交流 | 来源:发表于2019-12-09 17:20 被阅读0次

    使用Spark Streaming SQL,并结合Redis可以很方便进行PV/UV的统计。本文将介绍通过Streaming SQL消费Loghub中存储的用户访问信息,对过去1分钟内的数据进行PV/UV统计,将结果存入Redis中。作者:ligh-rain;来源:阿里云栖社区


    1.背景介绍

    PV/UV统计是流式分析一个常见的场景。通过PV可以对访问的网站做流量或热点分析,例如广告主可以通过PV值预估投放广告网页所带来的流量以及广告收入。另外一些场景需要对访问的用户作分析,比如分析用户的网页点击行为,此时就需要对UV做统计。

    使用Spark Streaming SQL,并结合Redis可以很方便进行PV/UV的统计。本文将介绍通过Streaming SQL消费Loghub中存储的用户访问信息,对过去1分钟内的数据进行PV/UV统计,将结果存入Redis中。

    2.准备工作

    创建E-MapReduce 3.23.0以上版本的Hadoop集群。

    下载并编译E-MapReduce-SDK包

    git clone git@github.com:aliyun/aliyun-emapreduce-sdk.git

    cd aliyun-emapreduce-sdk

    git checkout -b master-2.x origin/master-2.x

    mvn clean package -DskipTests

    编译完后, assembly/target目录下会生成emr-datasources_shaded_${version}.jar,其中${version}为sdk的版本。

    数据源已经为大家精心准备了大数据的系统学习资料,从Linux-Hadoop-spark-......,需要的小伙伴可以点击进入

    本文采用Loghub作为数据源,有关日志采集、日志解析请参考日志服务。

    3.统计PV/UV

    一般场景下需要将统计出的PV/UV以及相应的统计时间存入Redis。其他一些业务场景中,也会只保存最新结果,用新的结果不断覆盖更新旧的数据。以下首先介绍第一种情况的操作流程。

    3.1启动客户端

    命令行启动streaming-sql客户端

    streaming-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar

    也可以创建SQL语句文件,通过streaming-sql -f的方式运行。

    3.1定义数据表

    数据源表定义如下

    CREATE TABLE loghub_source(user_ip STRING, __time__ TIMESTAMP)

    USING loghub

    OPTIONS(

    sls.project=${sls.project},

    sls.store=${sls.store},

    access.key.id=${access.key.id},

    access.key.secret=${access.key.secret},

    endpoint=${endpoint});

    其中,数据源表包含user_ip和__time__两个字段,分别代表用户的IP地址和loghub上的时间列。OPTIONS中配置项的值根据实际配置。

    结果表定义如下

    CREATE TABLE redis_sink

    USING redis

    OPTIONS(

    table='statistic_info',

    host=${redis_host},

    key.column='interval');

    其中,statistic_info为Redis存储结果的表名,interval对应统计结果中的interval字段;配置项${redis_host}的值根据实际配置。

    3.2创建流作业

    CREATE SCAN loghub_scan

    ON loghub_source

    USING STREAM

    OPTIONS(

    watermark.column='__time__',

    watermark.delayThreshold='10 second');

    CREATE STREAM job

    OPTIONS(

    checkpointLocation=${checkpoint_location})

    INSERT INTO redis_sink

    SELECT COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval

    FROM loghub_scan

    GROUP BY TUMBLING(__time__, interval 1 minute), window;

    4.3查看统计结果

    最终的统计结果如下图所示

    可以看到,每隔一分钟都会生成一条数据,key的形式为表名:interval,value为pv和uv的值。

    3.4实现覆盖更新已经为大家精心准备了大数据的系统学习资料,从Linux-Hadoop-spark-......,需要的小伙伴可以点击进入

    将结果表的配置项key.column修改为一个固定的值,例如定义如下

    CREATE TABLE redis_sink

    USING redis

    OPTIONS(

    table='statistic_info',

    host=${redis_host},

    key.column='statistic_type');

    创建流作业的SQL改为

    CREATE STREAM job

    OPTIONS(

    checkpointLocation='/tmp/spark-test/checkpoint')

    INSERT INTO redis_sink

    SELECT "PV_UV" as statistic_type,COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval

    FROM loghub_scan

    GROUP BY TUMBLING(__time__, interval 1 minute), window;

    最终的统计结果如下图所示

    可以看到,Redis中值保留了一个值,这个值每分钟都被更新,value包含pv、uv和interval的值。

    4.总结

    本文简要介绍了使用Streaming SQL结合Redis实现流式处理中统计PV/UV的需求。

    相关文章

      网友评论

        本文标题:大数据技术分享:使用Spark Streaming SQL进行P

        本文链接:https://www.haomeiwen.com/subject/tjhigctx.html