美文网首页PostgreSQLGIS+RS应用
PostgreSQL 与 ElasticSearch 同步

PostgreSQL 与 ElasticSearch 同步

作者: 预流 | 来源:发表于2017-03-23 21:52 被阅读2632次

    碰到个全文搜索的需求,鉴于上家公司的业务日志查询用的就是 ELK ,效果还不错,所以用 ElasticSearch 做搜索引擎感觉问题不大。由于是针对业务数据做全文搜索,数据存在数据库里,所以将这些数据同步到 ES ,怎么做呢,无非两种方式:一是通过 ES 的 API 进行增删改查,二是通过中间件进行数据全量、增量的同步。很明显 API 的方式比较麻烦,那就试试第二种吧。中间件的方式总共搜了三种方案:

    1. elasticsearch-jdbc
      一个 独立的第三方工具。使用很简单,写个 shell 脚本就行了:

      shell 脚本例子
      这个工具的活跃度也很高,前一天还有人提 issue 。问题是它的版本兼容性:
      目前最高只兼容Elasticsearch2.3.4
      elastic 官方已经把 Elasticsearch、Logstash 、Kibana 版本升级到5.x版本,现在统一是5.2.2,这和以前的2.x在插件机制上有一些差异,个人倾向于使用最新版,所以这个工具暂时放弃了。
    2. logstash-input-jdbc
      logstash 的一个jdbc 插件,ruby 写的,据说不太好装,logstash 印象里一直是做日志文件收集的,尤其我们的数据库不是常见的 mysql、oracle,而是 postgresql ,感觉趟这个坑的人不多,没信心一定能成功。

    3. PostgreSQL 的 ElasticSearch 同步插件
      直接用 PostgreSQL 的插件,国内 PG 的大牛德哥在云栖社区贴了篇文章《PostgreSQL内核扩展之 - ElasticSearch同步插件》,写的很详细,显然这个方案跑通过了,所以就选了这个方案。

    因为这个方案涉及了很多安装步骤,中间出现了很多坑,所以单独写篇文章记录一下,能让后来者少走些弯路吧。

    安装前准备

    Mac 平台
    数据库:PostgreSQL 9.5.4
    数据库客户端:pgAdmin 4
    Python 2.7.10

    安装并运行 ElasticSearch

    官网下载 ElasticSearch ,解压缩,以下命令运行:

    bin/elasticsearch
    

    安装 PostgreSQL 插件 multicorn

    github下载 Multicorn:

    git clone https://github.com/Kozea/Multicorn /tmp/multicorn
    cd $_
    

    由于 Multicorn 的 master 代码在 OS X 环境有问题,有几个步骤要手工执行:
    1.修改文件 Makefile 的93行,将 darwin 的首字母改成大写:Darwin
    2.执行 make
    3.sudo ARCHFLAGS="-arch x86_64" make install
    具体原因看这个issue里的解释
    4.执行 make install

    pg 数据库中新建一个 EXTENSION

    psql -h {ip} -p {port} -d {database} -U {username}
    -- 新建名为 multicorn 的 EXTENSION
    create extension multicorn ;
    

    安装 PostgreSQL 插件 pg-es-fdw

    github 下载 pg-es-fdw:

    git clone https://github.com/Mikulas/pg-es-fdw /tmp/pg-es-fdw
    cd $_
    sudo python setup.py install
    

    基于 multicorn 创建 foreign server

    CREATE SERVER multicorn_es FOREIGN DATA WRAPPER multicorn
    OPTIONS (
      wrapper 'dite.ElasticsearchFDW'
    );
    

    PostgreSQL 的建 server 的语法不再赘述。wrapper 的值 dite.ElasticsearchFDW 的定义就在 pg-es-fdw 插件的 dite 目录 __init__.py 文件里,有兴趣的可以看看。

    创建测试表

    CREATE TABLE articles (
        id serial PRIMARY KEY,
        title text NOT NULL,
        content text NOT NULL,
        created_at timestamp
    );
    

    创建外部表

    CREATE FOREIGN TABLE articles_es (
        id bigint,
        title text,
        content text
    ) SERVER multicorn_es OPTIONS (host '127.0.0.1', port '9200', node 'test', index 'articles');
    

    参数 host 和 port 是 elasticsearch 服务的主机和端口号,参数 node 表示对应的 ES 中的索引名是 test ,参数 index 表示对应的 ES 中的类型名是 articles 。所以对应的 ES 中该表下的数据 url 访问路径是:
    http://localhost:9200/test/articles/{id}

    创建触发器

    对实体表,创建触发器函数,在用户对实体表插入,删除,更新时,通过触发器函数自动将数据同步到对应ES的外部表。同步过程调用 FDW 的接口,对 ES 进行索引的建立,更新,删除:

    CREATE OR REPLACE FUNCTION index_article() RETURNS trigger AS $def$
        BEGIN
            INSERT INTO articles_es (id, title, content) VALUES
                (NEW.id, NEW.title, NEW.content);
            RETURN NEW;
        END;
    $def$ LANGUAGE plpgsql;
    
    CREATE OR REPLACE FUNCTION reindex_article() RETURNS trigger AS $def$
        BEGIN
            UPDATE articles_es SET
                title = NEW.title,
                content = NEW.content
            WHERE id = NEW.id;
            RETURN NEW;
        END;
    $def$ LANGUAGE plpgsql;
    
    CREATE OR REPLACE FUNCTION delete_article() RETURNS trigger AS $def$
        BEGIN
            DELETE FROM articles_es a WHERE a.id = OLD.id;
            RETURN OLD;
        END;
    $def$ LANGUAGE plpgsql;
    
    CREATE TRIGGER es_insert_article
        AFTER INSERT ON articles
        FOR EACH ROW EXECUTE PROCEDURE index_article();
    
    CREATE TRIGGER es_update_article
        AFTER UPDATE OF title, content ON articles
        FOR EACH ROW
        WHEN (OLD.* IS DISTINCT FROM NEW.*)
        EXECUTE PROCEDURE reindex_article();
    
    CREATE TRIGGER es_delete_article
        BEFORE DELETE ON articles
        FOR EACH ROW EXECUTE PROCEDURE delete_article();
    

    测试验证

    插入一条表记录:

    insert into articles(title, content) values ('测试内容1', '测试标题1');
    

    查询 ES ,检查数据是否已同步:

    curl 'localhost:9200/test/articles/_search?q=*:*&pretty'
    

    结果如下:


    验证同步 ES 的结果

    提醒

    之前部署这个还踩了个坑,发现 PG 数据库的数据在 ES 中一直没有,原来因为我给elasticsearch安装了x-pack插件,导致给es添加了默认的安全机制,而pg-es-fdw插件中却没有传递访问es的用户名密码,所以一直不通。

    相关文章

      网友评论

        本文标题:PostgreSQL 与 ElasticSearch 同步

        本文链接:https://www.haomeiwen.com/subject/yletottx.html