美文网首页PostgreSQLPostgreSQL
PipelineDB1.0,用写SQL的方式进行实时计算

PipelineDB1.0,用写SQL的方式进行实时计算

作者: 洛洛尘沙 | 来源:发表于2019-02-26 18:47 被阅读211次

    简述

    PipelineDB是一个高性能的PostgreSQL扩展,用于在时间序列数据上连续运行SQL查询。这些连续查询的输出存储在常规表中,可以像查询任何其他表或视图一样查询这些表。因此,连续查询可以看作是非常高的吞吐量、增量更新的物化视图。

    简言之,与传统的实时计算引擎(storm、kafka streams等)相比,PipelineDB不需要写任何程序代码,只需要写SQL即可实现实时指标的聚合计算,统计实时指标开发耗时可以从原来的天级骤降到分钟级。

    要点简概

    简要数据流向图

    上图为PipelineDB在实际应用中的简要数据流向图。

    下面对图中涉及的特定类型的表进行解释:

    1. Stream:进行时序数据聚合计算的第一个必需基础表类型,外部数据会顺序流入Stream表。该表不实际储存数据,当一条数据被后续所有的必须View/Transform读取后即会被丢弃。Stream作为PipelineDB的第一道关口,承接从外部实时写入到PipelineDB内的数据。之后,可以以Stream为基础表,在其上创建若干个统计结果的View或者转换内容的Transform。
    2. Continuous View:PipelineDB的基本抽象,从Stream读取数据并将新数据按照SQL内条件(count、sum等)实时聚合,并将结果实时增量更新到View内。Continuous View是物化视图,实际内容会存储在以_mrel结尾的表中。
    3. Continuous Transform:进行实时转换数据,比如解析URL、关联维表等。Continuous Transform不支持聚合,不储存数据,转换后的数据可以实时写入到另一个Stream或者表。
    动态展示图

    上图源引自PipelineDB官网,直观的展示了它的实时计算过程。

    用法实践

    安装

    从1.0版本开始,PipelineDB开始作为PostgreSQL的一个扩展存在,安装PipelineDB之前请先安装PostgreSQL。

    具体安装方法请参见官方安装教程

    连接PostgreSQL数据库

    为了连接方便,可以在安装有PostgreSQL的服务器上直接敲"psql"进入交互式命令行。

    当需要远程连接时,可以写一个名为“pg”的bash脚本并将路径export到PATH,内容:

    #!/bin/bash
    
    source /etc/profile
    
    # 设置密码环境变量
    export PGPASSWORD='PostgreSQL密码'
    
    # 登录PostgreSQL
    declare -A postgres
    postgres=(
        [pg1911-(Postgres11)]="psql -h test_host -p 1911 -U PostgreSQL用户名 -d postgres"
    )
    
    dbs=($(printf '%s\n' "${!postgres[@]}" | sort))
    
    echo
    echo "+-------------------------------+"
    echo "|Postgres/PipelineDB连接菜单工具|"
    echo "+-------------------------------+"
    echo "请选择数据库序号..."
    
    select db in ${dbs[@]}; do
      break
    done
    
    echo "执行命令:${postgres[$db]} ..."
    eval "${postgres[$db]}"
    
    exit
    

    运行pg脚本,选择对应的序号后即可连接到对应数据库。(注意:远程连接前,请先配置好pg_hba.conf)

    查看已创建Schema

    \dn
    

    注意: PostgreSQL里的schema可以类比MySQL里的database,但是并不相同,PostgreSQL有自己的database属性。

    查看创建Schema的语句

    \h CREATE SCHEMA;
    

    创建Schema

    CREATE SCHEMA IF NOT EXISTS test;
    

    设置表搜索路径

    SET search_path TO test, public;
    

    敲重点: 截止到目前,因为官方的bug,在创建view等前请确保目前的schema是public,或者使用上述方式把public加到搜索路径内,否则会报函数找不到等错误。

    查看当前Schema下表

    \d 或者 \d+
    

    创建Stream

    CREATE FOREIGN TABLE IF NOT EXISTS test.streams_page_log_kafka (
        data json
    )
    SERVER pipelinedb;
      
    -- 特别情况下,可以对STREAM增加字段(只能增,不能删)
    ALTER FOREIGN TABLE test.streams_page_log_kafka ADD COLUMN x integer;
    -- 删除STREAM
    DROP FOREIGN TABLE test.streams_page_log_kafka;
    

    注意:stream默认会添加一个arrival_timestamp字段,用来记录每条日志的到达时间,该字段可以用在诸如滑动窗口的地方。

    创建Continuous View

    CREATE VIEW test.rt_view_stat_daily_page WITH (action=materialize) AS
    SELECT
        to_date(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS') AS f_ds,
        SUM(COALESCE(cast(data->>'unreal_key' AS numeric), 0)) AS "testName",  -- 使用SUM时,请务必使用COALESCE把NULL值替换掉,否则会导致严重的数据库后端进程崩溃重启!!!
        COUNT(*) AS pv,
        COUNT(DISTINCT data->>'visitid') AS uv
    FROM test.streams_page_log_kafka
    GROUP BY
        f_ds
    ;
    
    -- 向stream插入三条测试数据
    INSERT INTO test.streams_page_log_kafka (data) VALUES ('{"log_datetime":"2019-01-02 17:20:54","visitid":"f9v65a20-18b8-bc77"}'), ('{"log_datetime":"2019-01-02 18:12:21","visitid":"f9v65a20-18b8-bc77"}'), ('{"log_datetime":"2019-01-02 19:35:50","visitid":"z8a32b19-28z8-kl57"}');
    
    -- 查看结果
    postgres=# SELECT * FROM test.rt_view_stat_daily_page;
        f_ds    | testName | pv | uv
    ------------+----------+----+----
     2019-01-02 |        0 |  3 |  2
    
    
    -- 清空VIEW内数据
    SELECT pipelinedb.truncate_continuous_view('test.rt_view_stat_daily_page');
      
    -- 删除VIEW
    DROP VIEW test.rt_view_stat_daily_page;
    

    创建TTL(Time-To-Live)View表

    TTL表会根据指定字段,尽可能删除早于指定时间的数据。

    -- TTL表可以尽量销毁早于指定时间的数据
    CREATE VIEW test.rt_view_stat_daily_ttl_page WITH (action=materialize, ttl='1 month', ttl_column='ttl_ds') AS
    SELECT
        to_date(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS') AS f_ds,
        day(to_timestamp(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS')) AS ttl_ds,
        COUNT(*) AS pv,
        COUNT(DISTINCT data->>'visitid') AS uv
    FROM test.streams_page_log_kafka
    GROUP BY
        f_ds,
        ttl_ds
    ;
    

    可以通过pipelinedb.set_ttl函数对continuous view增加、修改、移除TTL。*

    具体详见:http://docs.pipelinedb.com/continuous-views.html#modifying-ttls

    创建TRANSFORM

    CREATE VIEW test.rt_trans_add_message_name_page WITH (action=transform) AS
    SELECT
        v1.data->>'company'      AS company,
        v1.data->>'appf'         AS appf,
        v1.data->>'sourcec'      AS sourcec,
        v1.data->>'message_type' AS message_type,
        v1.data->>'message'      AS message,
        v2.message_name,
        v1.data->>'visitid'      AS visitid,
        v1.data->>'log_datetime' AS log_datetime
    FROM test.streams_page_log_kafka v1
    LEFT JOIN dw_setting.dim_source v2 ON v1.data->>'message' = v2.source
    WHERE v1.data->>'message_type' IN ('asta', 'moshi', 'kozi', 'semi')
    ;
      
    -- 从TRANSFORM创建VIEW,通过output_of读取tansform后的数据
    CREATE VIEW test.rt_view_source_stat_daily_utrack WITH (action=materialize) AS
    SELECT
        to_date(log_datetime, 'YYYY-MM-DD HH24:MI:SS') AS f_ds,
        message_type,
        message,
        COUNT(*) AS pv
    FROM output_of('test.rt_trans_add_message_name_page')
    GROUP BY
        f_ds,
        message_type,
        message
    ;
      
    -- TRANSFORM后的数据写入到STREAM
    CREATE FOREIGN TABLE IF NOT EXISTS test.streams_track_page_from_trans_utrack (
        company      text,
        appf         text,
        sourcec      text,
        message_type text,
        message      text,
        message_name text,
        visitid      text,
        log_datetime text
    )
    SERVER pipelinedb;
    CREATE VIEW test.rt_trans_add_message_name_to_stream_utrack WITH (action=transform, outputfunc=pipelinedb.insert_into_stream('test.streams_track_page_from_trans_utrack')) AS
    SELECT
        v1.data->>'company'      AS company,
        v1.data->>'appf'         AS appf,
        v1.data->>'sourcec'      AS sourcec,
        v1.data->>'message_type' AS message_type,
        v1.data->>'message'      AS message,
        v2.message_name,
        v1.data->>'visitid'      AS visitid,
        v1.data->>'log_datetime' AS log_datetime
    FROM test.streams_page_log_kafka v1
    LEFT JOIN dw_setting.dim_source v2 ON v1.data->>'message' = v2.source
    WHERE v1.data->>'message_type' IN ('asta', 'moshi', 'kozi', 'semi')
    ;
    

    注意:因为已知bug,使用TRANSFORM时请不要使用自定义函数,否则会有异常报错!可以使用自带的pipelinedb.insert_into_stream将结果写入到另一个STREAM

    利用combine统计更粗粒度的UV

    combine:可以使用view表保存的更多信息计算出准确的聚合值(sum/avg...),而不是简单的把现在值累加起来。

    -- 创建分小时统计的view
    CREATE VIEW test.rt_view_stat_hourly_page AS
    SELECT
        to_date(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS') AS f_ds,
        date_part('hour', to_timestamp(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS')) AS f_hour,
        COUNT(DISTINCT data->>'visitid') AS uv
    FROM test.streams_page_log_kafka
    GROUP BY
        f_ds,
        f_hour
    ;
    
    -- 同时向stream插入三条测试数据
    -- 两个用户在三个不同时间段的三条记录
    INSERT INTO test.streams_page_log_kafka (data) VALUES ('{"log_datetime":"2019-01-02 17:20:54","visitid":"f9v65a20-18b8-bc77"}'), ('{"log_datetime":"2019-01-02 18:12:21","visitid":"f9v65a20-18b8-bc77"}'), ('{"log_datetime":"2019-01-02 19:35:50","visitid":"z8a32b19-28z8-kl57"}');
    

    如上所示,我们创建了一个按照天和小时聚合统计uv的表。当有需求需要统计每天的uv数时,不必要重新创建以天统计的view,我们可以利用combine的特性聚合出实际的结果。

    -- 简单sum的结果 (错误)
    postgres=# select f_ds, sum(uv) from test.rt_view_stat_hourly_page group by f_ds;
        f_ds    | sum
    ------------+-----
     2019-01-02 |   3
    (1 row)
    
    -- 使用combine的结果 (正确)
    postgres=# select f_ds, combine(uv) from test.rt_view_stat_hourly_page group by f_ds;
        f_ds    | combine
    ------------+---------
     2019-01-02 |       2
    

    注意:Pipelinedb计算uv使用的算法是HLL(HyperLogLog),官方宣称误差约0.81%。经实际测试,100以内基本无误差,1000条丢失约8条UV数据,10000条丢失约13条数据,可用。

    滑动窗口

    在实际统计工作中,不可避免会有统计一段时间窗口内数据的需求,比如最近30分钟的在线用户数、最近1分钟的浏览量等等。PipelineDB利用arrival_timestamp和clock_timestamp(总是返回现在的时间)实现了滑动窗口的计算。在实际使用中通过WITH子句的sw指定窗口区间(sliding window)。

    -- 统计最近1分钟的访客数
    CREATE VIEW test.recent_user_count_page WITH (sw = '1 minute') AS
    SELECT
        COUNT(DISTINCT data->>'visitid') AS uv
    FROM test.streams_page_log_kafka
    ;
    

    注意:有局限性,不支持自定义时间计算窗口数据

    时间分桶指标计算

    date_round(timestamp, resolution):该函数会把时间向下转换到最近的分桶时间。利用date_round可以非常方便的统计诸如每十分钟浏览量的实时指标数据。

    -- 每10分钟浏览量
    CREATE VIEW test.stat_pv_bucket_10_page AS
    SELECT
      date_round(to_timestamp(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS'), '10 minutes') AS bucket_10m,
      COUNT(*) AS pv
    FROM test.streams_page_log_kafka
    GROUP BY
        bucket_10m
    ;
    

    其他命令或trick

    -- psql命令帮助
    \?
    
    -- 激活/停止 VIEW/TRANSFORM
    SELECT pipelinedb.activate('continuous_view_or_transform');
    SELECT pipelinedb.deactivate('continuous_view_or_transform');
     
    --回填数据到老的VIEW
    SELECT pipelinedb.combine_table('continuous_view_3', 'continuous_view_mrel');
     
    -- 获取所有的VIEW
    SELECT * FROM pipelinedb.get_views();
     
    -- 获取所有的TRANSFORM
    SELECT * FROM pipelinedb.get_transforms();
    
    -- 创建表,当字段是字符串类型时,除非你知道这个字段值永远不会超过某个长度,
    -- 否则请使用text格式,从此再也不用担心长度问题!
      
    -- 使用Postgres作为api的后端数据库时,可以在SQL前添加下面的语句,避免一些转义符引起的错误
    SET standard_conforming_strings = off;
    

    提个醒:

    • 现在在PipelineDB内,一个简单的数据聚合统计模型已经构建完了。

    • 因为还没有数据写入到stream,所以现在view内还没有统计结果。


    数据写入到Stream

    • 使用INSERT INTO,该方法主要体现在测试环节
    • 使用COPY命令,该方法适合从文件导入数据到数据库
    • 使用pipeline_kafka,该插件配置后可以实时从kafka读取数据并写入到stream
    • 自己实现从kafka等写入到stream的过程

    测试写入

    INSERT INTO test.streams_page_log_kafka (data) VALUES (
      '{"log_datetime":"2019-01-02 17:20:54","visitid":"f9v65a20-18b8-bc77","userid":"2345521","version":0,"company":"search","appf":"search","sourcec":"search","ch_source":"","message":"m.baidu.com","message_type":"domain","log_type":"page","terminal":"touch"}'
    );
    
    -- 查看实时统计结果
    SELECT * FROM test.rt_view_stat_daily_page;
        f_ds    | testName | pv | uv
    ------------+----------+----+----
     2019-01-02 |        0 |  1 |  1
    

    使用pipeline_kafka

    官方插件,读取kafka的速度依赖kafka的分区数,在小量级实时计算时,可以采用。

    具体使用方法不再赘述,详见http://docs.pipelinedb.com/integrations.html#apache-kafka

    自己实现写入部分

    当量级上升到pipeline_kafka无法及时处理或者有其他需求时,建议自己实现从kafka等到stream的写入部分。如下图,根据之前做的一次PipelineDB的0.95版本和1.0版本的性能测试,对只有一个分区的topic读取速度,pipeline_kafka只有约2w/s。而自己实现的写入可以达到约5w/s。

    性能测试结果

    参考

    PipelineDB官方文档:http://docs.pipelinedb.com/index.html

    PostgreSQL官方文档:https://www.postgresql.org/docs/11/index.html

    相关文章

      网友评论

        本文标题:PipelineDB1.0,用写SQL的方式进行实时计算

        本文链接:https://www.haomeiwen.com/subject/urszyqtx.html