美文网首页
流式数据库PipelineDB(集成Kafka)

流式数据库PipelineDB(集成Kafka)

作者: 郭寻抚 | 来源:发表于2016-11-18 17:24 被阅读8870次

    1. 前言

    1.1 PipelineDB 介绍

    偶然发现了个流式数据库PipelineDB,它是基于PostgreSQL数据库改造的,允许我们通过sql的方式,对数据流做操作,并把操作结果储存起来。

    这年头,真是SQL on everything。

    其基本的过程是:

    • 创建PipelineDB Stream。
    • 编写SQL,对Stream做操作。
    • 操作结果被保存到 continuous view,其背后是物理表在支撑。

    1.2 安装PipelineDB

    我们的安装是在Centos 7上面进行的。 PipelineDB不让用root权限的用户操作,请提前创建用户。

    #下载
    wget https://s3-us-west-2.amazonaws.com/download.pipelinedb.com/pipelinedb-0.9.6-centos7-x86_64.rpm
    # 安装
    rmp -ivh ----prefix=/opt/pipelinedb
    # 初始化 pipeline-init -D <data directory>
    pipeline-init -D /opt/pipelinedb/dbdata
    pipelinedb -D /opt/pipelinedb/dbdata
    # 激活 continuous query(仅需执行一次,后续重启不用再做)
    psql -h localhost -p 5432 -d pipeline -c "ACTIVATE"
    

    2. Quick Start例子

    本例是关于 Wikipedia页面访问数据的统计。每一条访问记录,包括以下字段,以英文逗号分割。

    hour project page title view count bytes served

    2.1 创建continuous视图

    首先,我们创建一个continuous view,使用psql工具。从sql里,我们能够看到统计方法和访问记录的对应关系。

    psql -h localhost -p 5432 -d pipeline -c "
    CREATE STREAM wiki_stream (hour timestamp, project text, title text, view_count bigint, size bigint);
    CREATE CONTINUOUS VIEW wiki_stats AS
    SELECT hour, project,
            count(*) AS total_pages,
            sum(view_count) AS total_views,
            min(view_count) AS min_views,
            max(view_count) AS max_views,
            avg(view_count) AS avg_views,
            percentile_cont(0.99) WITHIN GROUP (ORDER BY view_count) AS p99_views,
            sum(size) AS total_bytes_served
    FROM wiki_stream
    GROUP BY hour, project;"
    

    2.2 创建Stream

    我们通过curl工具,获取wiki的数据集,并压缩数据,作为一个Stream写入到stdin。因为数据集比较大,当我们执行了几秒钟之后,可以使用ctrl+c中断curl操作。

    curl -sL http://pipelinedb.com/data/wiki-pagecounts | gunzip | \
            psql -h localhost -p 5432 -d pipeline -c "
            COPY wiki_stream (hour, project, title, view_count, size) FROM STDIN"
    

    2.3 查看结果

    通过下面的命令,从视图(continuous view)读取streaming的统计计算结果。

    psql -h localhost -p 5432 -d pipeline -c "
    SELECT * FROM wiki_stats ORDER BY total_views DESC";
    

    3. PipelineDB和kafka的集成

    3.1 pipeline_kafka组件安装

    PipelineDB默认是没有pipeline_kafka扩展组件的,需要我们自己安装。安装需要git,如果没有git,请使用yum -y install git 安装git。

    1.安装librdkafka

    pipeline_kafka依赖librdkafka,需要先安装librdkafka。

    git clone -b 0.9.1 https://github.com/edenhill/librdkafka.git ~/librdkafka
    cd ~/librdkafka
    ./configure --prefix=/usr
    make
    sudo make install
    

    2.安装pipeline_kafka

    编译安装pipeline_kafk。如果有编译依赖的缺失,请根据缺失补充安装依赖。

    ./configure
    make
    make install
    

    配置pipeline_kafka

    # 编辑配置文件
    vi /opt/pipelinedb/dbdata/pipelinedb.conf
    # 在结尾输入以下内容并保存(:wq)
    # Add settings for extensions here
    shared_preload_libraries = 'pipeline_kafka'
    

    重启数据库,使得扩展组件生效

    # pipeline-ctl -D <data directory> start|stop|restart
    pipeline-ctl -D /opt/pipelinedb/dbdata restart
    

    3.2 Stream SQL开发过程

    # 连接数据库
    psql -h localhost -p 5432 -d pipeline
    # 创建pipeline_kafka
    CREATE EXTENSION pipeline_kafka;
    # 配置kafka broker
    SELECT pipeline_kafka.add_broker('192.168.1.170:9092');
    # 创建Stream,从kafka里接受三个参数
    CREATE STREAM msg_stream (sjc varchar, thread_name varchar, msg varchar);
    # 创建CONTINUOUS VIEW
    CREATE CONTINUOUS VIEW msg_result AS SELECT sjc,thread_name,msg FROM msg_stream;
    # 开始消费kafka消息
    # topic是my-topic,连接PipelineDB Stream名是msg_stream,消息类型是text,消息以英文逗号分割。
    SELECT pipeline_kafka.consume_begin ( 'my-topic', 'msg_stream', format := 'text', 
                delimiter := ',', quote := NULL, escape := NULL, batchsize := 1000,
                maxbytes := 32000000, parallelism := 1, start_offset := NULL );
    
    
    # 如果要停止Stream,请使用以下命令。
    SELECT pipeline_kafka.consume_end('my-topic', 'msg_stream');
    

    3.3 验证

    1.向kafka发送消息

    登录kafka节点的服务器,进入到kafka home路径,使用以下命令发送消息

    # 启动producer
    bin/kafka-console-producer.sh --broker-list 192.168.1.90:9092 --topic my-topic
    # 输入以下数据
    a,b,c
    

    2.在PipelineDB中查询收到的消息

    从CONTINUOUS VIEW中 查询数据,可以看到有一条记录,即[a,b,c]。

    psql -h localhost -p 5432 -d pipeline -c "
    SELECT * FROM msg_result";
    

    ps: 当我们连接到PipelineDB,我们可以使用PostgreSQL的命令,来查看有那些数据库对象生成。例如通过 \d 可以查看到,当我们创建CONTINUOUS VIEW的时候,额外创建了msg_result_mrel、msg_result_seq和msg_result_osrel,实际的数据就存储在msg_result_mrel中。

    Schema Name Type Owner
    public msg_result continuous view pipelinedb
    public msg_result_mrel table pipelinedb
    public msg_result_osrel stream pipelinedb
    public msg_result_seq sequence pipelinedb
    public msg_stream stream pipelinedb

    http://docs.pipelinedb.com/quickstart.html

    https://github.com/pipelinedb/pipeline_kafka

    (完)

    相关文章

      网友评论

          本文标题:流式数据库PipelineDB(集成Kafka)

          本文链接:https://www.haomeiwen.com/subject/kfkspttx.html