美文网首页
元数据管理系统 Amundsen 安装及使用

元数据管理系统 Amundsen 安装及使用

作者: SmartBin | 来源:发表于2021-12-21 14:44 被阅读0次

    1、线上安装环境

    3、安装docker

    安装docker-ce

    • 安装/升级Docker客户端,安装必要的一些系统工具。

      yum update -y

      yum install -y yum-utils device-mapper-persistent-data lvm2

    • 添加软件源信息

      yum-config-manager --add-repo [http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo](http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo)

    • 更新并安装

      yum makecache fast
      yum -y install docker-ce

    • 开启Docker服务

      service docker start

    配置镜像加速器,为docker容器设置默认网段

    • 添加docker 配置文件,若已存在,则修改配置文件
    
    tee /etc/docker/daemon.json <<-'EOF'
    {
      "debug" : true,
      "registry-mirrors": ["https://dpayzz9i.mirror.aliyuncs.com"],
      "default-address-pools" : [
        {
          "base" : "192.168.0.0/16",
          "size" : 24
        }
      ]
    }
    EOF
    
    • 重启,并设置开机自启

      systemctl daemon-reload
      systemctl restart docker
      systemctl enable docker

    docker-compose安装

    • 下载

      curl -L "https://get.daocloud.io/docker/compose/releases/download/1.27.3/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

    • 加上可执行权限:

      chmod +x /usr/local/bin/docker-compose

    修改docker数据路径,修改到数据盘

    默认是在根目录:/var/lib/docker ,修改到数据盘: /data/docker ,使用软链接方式。

    service docker stop

    mv /var/lib/docker /data/
    ln -sf /data/docker /var/lib/docker
    service docker start

    4、安装python3.7

    5、安装Amundsen

    确保您至少有 3GB 可用空间供 docker 使用。

    通过git克隆Amundsen:

    git clone --recursive [https://github.com/amundsen-io/amundsen.git](https://github.com/amundsen-io/amundsen.git)

    进入克隆的目录并在下面启动docker:

    # For Neo4j Backend
    docker-compose -f docker-amundsen.yml up

    如有报错,解决方法可参考 FAQ

    后台启动命令:

    docker-compose -f docker-amundsen.yml up -d

    修改es容器,添加ik中文分词器

    • 参考: https://zhuanlan.zhihu.com/p/377433737

    • 下载与es版本匹配的ik分词器 (7.13.3) : https://github.com/medcl/elasticsearch-analysis-ik/releases?page=2

    • 进入es容器,创建/usr/share/elasticsearch/plugins/ik/ 文件夹

      docker exec -it 3d701cddd320 /bin/bash
      mkdir /usr/share/elasticsearch/plugins/ik/

    • 将压缩包复制到容器内

      docker cp ./elasticsearch-analysis-ik-7.13.3.zip 3d701cddd320:/usr/share/elasticsearch/plugins/ik/

    • 进入容器,解压压缩包,并删除压缩包

      docker exec -it 3d701cddd320 /bin/bash
      cd /usr/share/elasticsearch/plugins/ik/
      unzip elasticsearch-analysis-ik-7.13.3.zip
      rm -rf elasticsearch-analysis-ik-7.13.3.zip

      exit

    • 重启es容器

      docker stop 3d701cddd320
      docker-compose -f docker-es.yml up -d

    • 修改导入元数据代码,修改mapping,添加ik分词器,详情查看svn代码。重新导入元数据。

    • 把修改过的es容器保存为镜像

    # 生成自己的镜像:
    docker commit 82fb415dcf0c  my/elasticsearch:7.13.3
    
    #修改docker-amundsen.yml 文件,使用上面生成的镜像
    elasticsearch:
          image: my/elasticsearch:7.13.3
    
    # 把镜像保存成文件
    docker save -o my_es_docker_image.tar my/elasticsearch:7.13.3
    

    添加prometheus+grafana监控

    prometheus+grafana搭建及配置参考
    使用cadvisor服务监控docker容器运行,它本身也是一个容器。

    运行 cadvisor 容器

    docker run \
    --volume=/:/rootfs:ro \
    --volume=/var/run:/var/run:ro \
    --volume=/sys:/sys:ro \
    --volume=/var/lib/docker/:/var/lib/docker:ro \
    --volume=/dev/disk/:/dev/disk:ro \
    --publish=8080:8080 \
    --detach=true \
    --name=cadvisor \
    google/cadvisor:v0.24.1
    

    添加prometheus配置

    - targets: ['ip:8080']
      labels:
        instance: amundsen_docker
    

    添加grafana监控,导入code码: 193

    导入元数据

    • 创建python3虚拟环境,安装依赖
    cd /data/service/amundsen
    yum install gcc
    yum install python3-devel mysql-devel
    
    python3 -m venv venv
    source venv/bin/activate
    /data/service/amundsen/venv/bin/pip3 install --upgrade pip
    
    /data/service/amundsen/venv/bin/pip3 install -r databuilder/requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
    /data/service/amundsen/venv/bin/python3 databuilder/setup.py install
    
    /data/service/amundsen/venv/bin/pip3 install Mysqlclient -i https://mirrors.aliyun.com/pypi/simple/
    
    /data/service/amundsen/venv/bin/pip3 install pyyaml  -i https://mirrors.aliyun.com/pypi/simple/
    
    
    • 脚本 部署路径
    amundsen/databuilder/my_metadata
    
    • 使用python3虚拟环境运行脚本,导入元数据
    # 先导入 mysql
    amundsen/venv/bin/python3 /data/service/amundsen/databuilder/my_metadata/my_mysql_loader.py
    
    # 再导入 hive
    amundsen/venv/bin/python3 /data/service/amundsen/databuilder/my_metadata/my_hive_loader.py
    

    Kibana 简单语法

    # 查询所有 mappinf
    GET _mapping
    
    
    #查看elasticsearch版本
    GET /
    
    #查看健康状况
    GET /_cat/health?v
    
    #查看节点
    GET /_cat/nodes?v
    
    #查看索引
    GET /_cat/indices?v
    
    #查看JVM内存
    GET /_nodes/stats/jvm?pretty
    
    #查看磁盘
    GET /_cat/allocation?v
    
    #查看安装插件
    GET /_cat/plugins?v
    
    #查询全部数据
    GET /tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_search
    {
        "query" : {
            "match_all" : {}
        }
    }
    
    #DSL查询语法
    GET /tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_search
    {
        "query" : {
            "match" : { "cluster" : "ndz_fund"}
        }
    }
    
    
    
    #统计index下的document数量 hive:837 ,mysql:14034
    GET _cat/count/tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1?v
    {
        "query" : {
            "match" : { "database": "mysql" }
        }
    }
    
    #分词效果测试:
    POST _analyze
    {
      "analyzer": "ik_max_word",
      "text":     "借方本年累计发生额"
    }
    
    #分词效果测试:
    POST _analyze
    {
      "analyzer": "ik_smart",
      "text":     "借方本年累计发生额"
    }
    
    # 删除,标记为删除,不会立即删除
    POST tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_delete_by_query
    {
      "query": { 
        "match": { 
           "database": "hive"
        }
      }
    }
    
    # 合并,删除被标记删除的数据。高资源消耗动作
    POST /tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_forcemerge
    
    
    

    导入脚本代码

    • my_util.py代码
    import textwrap
    import yaml
    import sys
    
    
    # 读取配置文件
    def read_config(config_file):
        with open(config_file, 'rb') as f:
            config_data = list(yaml.safe_load_all(f))
            if len(config_data) == 0:
                print("------配置文件: {} 为空------".format(config_file))
                sys.exit(1)
            # print(config_data[0].get(key1))
            return config_data[0]
    
    
    # es mapping
    YZF_TABLE_ES_INDEX_MAPPING = textwrap.dedent(
        """
        {
        "mappings":{
            "table":{
              "properties": {
                "name": {
                  "type":"text",
                  "analyzer": "simple",
                  "fields": {
                    "raw": {
                      "type": "keyword"
                    }
                  }
                },
                "schema": {
                  "type":"text",
                  "analyzer": "simple",
                  "fields": {
                    "raw": {
                      "type": "keyword"
                    }
                  }
                },
                "display_name": {
                  "type": "keyword"
                },
                "last_updated_timestamp": {
                  "type": "date",
                  "format": "epoch_second"
                },
                "description": {
                  "type": "text",
                  "analyzer": "ik_max_word"
                },
                "column_names": {
                  "type":"text",
                  "analyzer": "simple",
                  "fields": {
                    "raw": {
                      "type": "keyword"
                    }
                  }
                },
                "column_descriptions": {
                  "type": "text",
                  "analyzer": "ik_max_word"
                },
                "tags": {
                  "type": "keyword"
                },
                "badges": {
                  "type": "keyword"
                },
                "cluster": {
                  "type": "text",
                  "analyzer": "simple",
                  "fields": {
                    "raw": {
                      "type": "keyword"
                    }
                  }
                },
                "database": {
                  "type": "text",
                  "analyzer": "simple",
                  "fields": {
                    "raw": {
                      "type": "keyword"
                    }
                  }
                },
                "key": {
                  "type": "keyword"
                },
                "total_usage":{
                  "type": "long"
                },
                "unique_usage": {
                  "type": "long"
                },
                "programmatic_descriptions": {
                  "type": "text",
                  "analyzer": "ik_max_word"
                }
              }
            }
          }
        }
        """
    )
    
    • my_mysql_loader.py 代码
    # Copyright Contributors to the Amundsen project.
    # SPDX-License-Identifier: Apache-2.0
    
    """
    This is a example script which demo how to load data
    into Neo4j and Elasticsearch without using an Airflow DAG.
    
    """
    
    import logging
    import sys
    import textwrap
    import uuid
    
    from elasticsearch import Elasticsearch
    from pyhocon import ConfigFactory
    from sqlalchemy.ext.declarative import declarative_base
    
    from databuilder.extractor.mysql_metadata_extractor import MysqlMetadataExtractor
    from databuilder.extractor.neo4j_extractor import Neo4jExtractor
    from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
    from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
    from databuilder.job.job import DefaultJob
    from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
    from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
    from databuilder.publisher import neo4j_csv_publisher
    from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
    from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
    from databuilder.task.task import DefaultTask
    from databuilder.transformer.base_transformer import NoopTransformer
    from my_util import YZF_TABLE_ES_INDEX_MAPPING
    from my_util import read_config
    
    import yaml
    
    es_host = None
    neo_host = None
    if len(sys.argv) > 1:
        es_host = sys.argv[1]
    if len(sys.argv) > 2:
        neo_host = sys.argv[2]
    
    es = Elasticsearch([
        {'host': es_host or 'localhost'},
    ])
    
    DB_FILE = '/tmp/test.db'
    SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
    Base = declarative_base()
    
    NEO4J_ENDPOINT = f'bolt://{neo_host or "localhost"}:7687'
    
    neo4j_endpoint = NEO4J_ENDPOINT
    
    neo4j_user = 'neo4j'
    neo4j_password = 'test'
    
    # 连接串后缀  &useSSL=false
    mysql_conn_ = '?charset=utf8'
    
    LOGGER = logging.getLogger(__name__)
    
    
    def run_mysql_job(conn_str, connect_name):
        #where_clause_suffix = textwrap.dedent("""
        #    where c.table_schema = 'yzf_biz'
        #""")
        where_clause_suffix = textwrap.dedent("""
            where 1 = 1
        """)
        connect = conn_str + mysql_conn_
        logging.info("Begin load mysql conn: {}".format(connect))
    
        tmp_folder = '/var/tmp/amundsen/table_metadata'
        node_files_folder = f'{tmp_folder}/nodes/'
        relationship_files_folder = f'{tmp_folder}/relationships/'
    
        job_config = ConfigFactory.from_dict({
            f'extractor.mysql_metadata.{MysqlMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
            f'extractor.mysql_metadata.{MysqlMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME}': False,
            f'extractor.mysql_metadata.{MysqlMetadataExtractor.CLUSTER_KEY}': connect_name,
            f'extractor.mysql_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connect,
            f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
            f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
            f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
            f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
            f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
            f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
            f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
            f'publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}': connect_name,  # should use unique tag here like {ds}
        })
        job = DefaultJob(conf=job_config,
                         task=DefaultTask(extractor=MysqlMetadataExtractor(), loader=FsNeo4jCSVLoader()),
                         publisher=Neo4jCsvPublisher())
        return job
    
    
    def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
                                       elasticsearch_doc_type_key='table',
                                       model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
                                       cypher_query=None,
                                       elasticsearch_mapping=YZF_TABLE_ES_INDEX_MAPPING):
        """
        :param elasticsearch_index_alias:  alias for Elasticsearch used in
                                           amundsensearchlibrary/search_service/config.py as an index
        :param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
                                           `table_search_index`
        :param model_name:                 the Databuilder model class used in transporting between Extractor and Loader
        :param cypher_query:               Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
                                           it uses the `Table` query baked into the Extractor
        :param elasticsearch_mapping:      Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
                                           if None is given (default) it uses the `Table` query baked into the Publisher
        """
        # loader saves data to this location and publisher reads it from here
        extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
    
        task = DefaultTask(loader=FSElasticsearchJSONLoader(),
                           extractor=Neo4jSearchDataExtractor(),
                           transformer=NoopTransformer())
    
        # elastic search client instance
        elasticsearch_client = es
        # unique name of new index in Elasticsearch
        elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
    
        job_config = ConfigFactory.from_dict({
            f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}': neo4j_endpoint,
            f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}': model_name,
            f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}': neo4j_user,
            f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}': neo4j_password,
            f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
            f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}': 'w',
            f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
            f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}': 'r',
            f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
                elasticsearch_client,
            f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}':
                elasticsearch_new_index_key,
            f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}':
                elasticsearch_doc_type_key,
            f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
                elasticsearch_index_alias,
        })
    
        # only optionally add these keys, so need to dynamically `put` them
        if cypher_query:
            job_config.put(f'extractor.search_data.{Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY}',
                           cypher_query)
        if elasticsearch_mapping:
            job_config.put(f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY}',
                           elasticsearch_mapping)
    
        job = DefaultJob(conf=job_config,
                         task=task,
                         publisher=ElasticsearchPublisher())
        return job
    
    
    if __name__ == "__main__":
        # Uncomment next line to get INFO level logging
        logging.basicConfig(level=logging.INFO)
    
        conf_data = read_config('/data/service/amundsen/databuilder/yzf_metadata/mysql_connect.yaml')
        mysql_conf = conf_data.get('conn')
        for conn_name, mysql_list in mysql_conf.items():
            for mysql_conn in enumerate(mysql_list):
                loading_job = run_mysql_job(mysql_conn[1], conn_name)
                loading_job.launch()
    
                job_es_table = create_es_publisher_sample_job(
                    elasticsearch_index_alias='table_search_index',
                    elasticsearch_doc_type_key='table',
                    model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
                job_es_table.launch()
    
    • my_hive_loader.py 代码
    # Copyright Contributors to the Amundsen project.
    # SPDX-License-Identifier: Apache-2.0
    
    """
    This is a example script which demo how to load data
    into Neo4j and Elasticsearch without using an Airflow DAG.
    
    """
    
    import logging
    import sys
    import textwrap
    import uuid
    
    from elasticsearch import Elasticsearch
    from pyhocon import ConfigFactory
    from sqlalchemy.ext.declarative import declarative_base
    
    from databuilder.extractor.hive_table_metadata_extractor import HiveTableMetadataExtractor
    from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
    from databuilder.job.job import DefaultJob
    from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
    from databuilder.models.table_metadata import DESCRIPTION_NODE_LABEL
    from databuilder.publisher import neo4j_csv_publisher
    from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
    from databuilder.task.task import DefaultTask
    from databuilder.transformer.base_transformer import NoopTransformer
    from databuilder.extractor.mysql_metadata_extractor import MysqlMetadataExtractor
    from databuilder.extractor.neo4j_extractor import Neo4jExtractor
    from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
    from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
    from databuilder.job.job import DefaultJob
    from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
    from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
    from databuilder.publisher import neo4j_csv_publisher
    from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
    from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
    from databuilder.task.task import DefaultTask
    from databuilder.transformer.base_transformer import NoopTransformer
    from my_util import YZF_TABLE_ES_INDEX_MAPPING
    
    es_host = None
    neo_host = None
    if len(sys.argv) > 1:
        es_host = sys.argv[1]
    if len(sys.argv) > 2:
        neo_host = sys.argv[2]
    
    es = Elasticsearch([
        {'host': es_host or 'localhost'},
    ])
    
    DB_FILE = '/tmp/test.db'
    SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
    Base = declarative_base()
    
    NEO4J_ENDPOINT = f'bolt://{neo_host or "localhost"}:7687'
    
    neo4j_endpoint = NEO4J_ENDPOINT
    
    neo4j_user = 'neo4j'
    neo4j_password = 'test'
    
    LOGGER = logging.getLogger(__name__)
    
    # Todo: user provides a list of schema for indexing
    
    SUPPORTED_HIVE_SCHEMAS = ['accounting_collect', 'accounting_company', 'accounting_report', 'ads', 'bi_dm_ods', \
                              'biz_dm', 'biz_dw', 'common', 'common_kudu', 'common_sim', 'companyinfo_ods', 'customer', \
                              'customer_kudu', 'datax', 'default', 'di', 'dingtalk', 'dingtalk_kudu', 'dwd', 'dwd_sim', \
                              'dws', 'fintax_account', 'fintax_application', 'fintax_asset', 'fintax_data_init',
                              'fintax_fund', 'fintax_invoice', \
                              'fintax_salary', 'fintax_statistics', 'fintax_stock', 'fintax_task', 'fintax_tax',
                              'fintax_user_point', 'flink_database', \
                              'invoice_lake', 'log_ods', 'monitor', 'octopus_ods', 'sale_ods', 'taxops_ods', 'ucenter',
                              'upm_paas', 'view', 'yzf_biz', \
                              'yzf_biz_init', 'yzf_common', 'yzf_config', 'yzf_report', 'yzf_report_init']
    # Global used in all Hive metastore queries.
    # String format - ('schema1', schema2', .... 'schemaN')
    SUPPORTED_HIVE_SCHEMA_SQL_IN_CLAUSE = "('{schemas}')".format(schemas="', '".join(SUPPORTED_HIVE_SCHEMAS))
    
    
    # todo: connection string needs to change
    def connection_string():
        user = 'root'
        host = 'ip'
        port = '3306'
        db = 'hivemetastore'
        pd = '123456'
        return "mysql://%s:%s@%s:%s/%s?charset=utf8" % (user, pd, host, port, db)
    
    
    def create_table_wm_job(templates_dict):
        sql = textwrap.dedent("""
            SELECT From_unixtime(A0.create_time) as create_time,
                   'hive'                        as `database`,
                   C0.NAME                       as `schema`,
                   B0.tbl_name as table_name,
                   {func}(A0.part_name) as part_name,
                   {watermark} as part_type
            FROM   PARTITIONS A0
                   LEFT OUTER JOIN TBLS B0
                                ON A0.tbl_id = B0.tbl_id
                   LEFT OUTER JOIN DBS C0
                                ON B0.db_id = C0.db_id
            WHERE  C0.NAME IN {schemas}
                   AND B0.tbl_type IN ( 'EXTERNAL_TABLE', 'MANAGED_TABLE' )
                   AND A0.PART_NAME NOT LIKE '%%__HIVE_DEFAULT_PARTITION__%%'
            GROUP  BY C0.NAME, B0.tbl_name
            ORDER by create_time desc
        """).format(func=templates_dict.get('agg_func'),
                    watermark=templates_dict.get('watermark_type'),
                    schemas=SUPPORTED_HIVE_SCHEMA_SQL_IN_CLAUSE)
    
        LOGGER.info('SQL query: %s', sql)
        tmp_folder = '/var/tmp/amundsen/table_{hwm}'.format(hwm=templates_dict.get('watermark_type').strip("\""))
        node_files_folder = f'{tmp_folder}/nodes'
        relationship_files_folder = f'{tmp_folder}/relationships'
    
        hwm_extractor = SQLAlchemyExtractor()
        csv_loader = FsNeo4jCSVLoader()
    
        task = DefaultTask(extractor=hwm_extractor,
                           loader=csv_loader,
                           transformer=NoopTransformer())
    
        job_config = ConfigFactory.from_dict({
            f'extractor.hive_table_metadata.{HiveTableMetadataExtractor.CLUSTER_KEY}': 'tencent_hive_kudu',
            f'extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
            f'extractor.sqlalchemy.{SQLAlchemyExtractor.EXTRACT_SQL}': sql,
            'extractor.sqlalchemy.model_class': 'databuilder.models.watermark.Watermark',
            f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
            f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
            f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
            f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
            f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
            f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
            f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
            f'publisher.neo4j.job_publish_tag': 'load_hive_unique_tag'  # TO-DO unique tag must be added
        })
        job = DefaultJob(conf=job_config,
                         task=task,
                         publisher=Neo4jCsvPublisher())
        job.launch()
    
    
    def run_hive_job():
        """
            Launches data builder job that extracts table and column metadata from MySQL Hive metastore database,
            and publishes to Neo4j.
            @param kwargs:
            @return:
            """
    
        # Adding to where clause to scope schema, filter out temp tables which start with numbers and views
        where_clause_suffix = textwrap.dedent("""
                WHERE d.NAME IN {schemas}
                AND t.TBL_NAME NOT REGEXP '^[0-9]+'
                AND t.TBL_TYPE IN ( 'EXTERNAL_TABLE', 'MANAGED_TABLE' )
            """).format(schemas=SUPPORTED_HIVE_SCHEMA_SQL_IN_CLAUSE)
    
        tmp_folder = '/var/tmp/amundsen/table_metadata'
        node_files_folder = f'{tmp_folder}/nodes/'
        relationship_files_folder = f'{tmp_folder}/relationships/'
    
        job_config = ConfigFactory.from_dict({
            f'extractor.hive_table_metadata.{HiveTableMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
            f'extractor.hive_table_metadata.{HiveTableMetadataExtractor.CLUSTER_KEY}': 'tencent_hive_kudu',
            f'extractor.hive_table_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
            f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
            f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
            f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
            f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
            f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
            f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
            f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
            f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_CREATE_ONLY_NODES}': [DESCRIPTION_NODE_LABEL],
            'publisher.neo4j.job_publish_tag': 'load_hive_unique_tag'  # TO-DO unique tag must be added
        })
    
        job = DefaultJob(conf=job_config,
                         task=DefaultTask(extractor=HiveTableMetadataExtractor(), loader=FsNeo4jCSVLoader()),
                         publisher=Neo4jCsvPublisher())
        return job
    
    
    def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
                                       elasticsearch_doc_type_key='table',
                                       model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
                                       cypher_query=None,
                                       elasticsearch_mapping=YZF_TABLE_ES_INDEX_MAPPING):
        """
        :param elasticsearch_index_alias:  alias for Elasticsearch used in
                                           amundsensearchlibrary/search_service/config.py as an index
        :param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
                                           `table_search_index`
        :param model_name:                 the Databuilder model class used in transporting between Extractor and Loader
        :param cypher_query:               Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
                                           it uses the `Table` query baked into the Extractor
        :param elasticsearch_mapping:      Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
                                           if None is given (default) it uses the `Table` query baked into the Publisher
        """
        # loader saves data to this location and publisher reads it from here
        # 临时文件,可删除,对查询不影响
        extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
    
        task = DefaultTask(loader=FSElasticsearchJSONLoader(),
                           extractor=Neo4jSearchDataExtractor(),
                           transformer=NoopTransformer())
    
        # elastic search client instance
        elasticsearch_client = es
        # unique name of new index in Elasticsearch
        elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
    
        job_config = ConfigFactory.from_dict({
            f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}': neo4j_endpoint,
            f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}': model_name,
            f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}': neo4j_user,
            f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}': neo4j_password,
            f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
            f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}': 'w',
            f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
            f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}': 'r',
            f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
                elasticsearch_client,
            f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}':
                elasticsearch_new_index_key,
            f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}':
                elasticsearch_doc_type_key,
            f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
                elasticsearch_index_alias,
        })
    
        # only optionally add these keys, so need to dynamically `put` them
        if cypher_query:
            job_config.put(f'extractor.search_data.{Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY}',
                           cypher_query)
        if elasticsearch_mapping:
            job_config.put(f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY}',
                           elasticsearch_mapping)
    
        job = DefaultJob(conf=job_config,
                         task=task,
                         publisher=ElasticsearchPublisher())
        return job
    
    
    if __name__ == "__main__":
        # Uncomment next line to get INFO level logging
        logging.basicConfig(level=logging.INFO)
    
        loading_job = run_hive_job()
        loading_job.launch()
    
        templates_dict = {'agg_func': 'min',
                          'watermark_type': '"low_watermark"',
                          'part_regex': '{{ ds }}'}
        templates_dict.get('agg_func')
    
        create_table_wm_job(templates_dict)
    
        templates_dict = {'agg_func': 'max',
                          'watermark_type': '"high_watermark"',
                          'part_regex': '{{ ds }}'}
    
        create_table_wm_job(templates_dict)
    
        job_es_table = create_es_publisher_sample_job(
            elasticsearch_index_alias='table_search_index',
            elasticsearch_doc_type_key='table',
            model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
        job_es_table.launch()
    
    • mysql_connect.yaml 配置文件
    conn:
      mysql1:
        - mysql://root:123456@ip1:3306/<db>
      mysql2:
        - mysql://root:123456@ip2:3306/<db>
    

    FAQ

    1. docker-compose -f docker-amundsen.yml up启动容器报错:

    [2021-12-01T08:02:56,599][INFO ][o.e.b.BootstrapChecks ] [PD4Rw8t] bound or publishing to a non-loopback address, enforcing bootstrap checks
    es_amundsen_atlas | ERROR: [1] bootstrap checks failed
    es_amundsen_atlas | [1]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

    • 增加堆内存,修改 vim /etc/sysctl.conf ,添加内容: vm.max_map_count = 262144

    • 重加载修改配置: sysctl -p

    • 重跑 docker-compose -f docker-amundsen.yml up

    2.页面搜索元数据信息,中文注释显示乱码

    • 修改连接mysql url,在连接串后添加utf-8设置: ?charset=utf8

    3.导入Hive的mysql元数据时报错:

    sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1055, "Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column 'hive.A0.CREATE_TIME' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by")
    [SQL:
    SELECT From_unixtime(A0.create_time) as create_time,

    ....

    • 参考https://blog.csdn.net/fansili/article/details/78664267,修改mysql配置:

      mysql> set global sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';

      mysql> set session sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';

    4. 阿蒙森官方FAQ: https://www.amundsen.io/amundsen/installation/#troubleshooting

    5.防火墙是关闭的,通过215.5还可以访问不通阿蒙森机器的5000端口

    解决办法:先打开防火墙,打开5000端口限制,再关闭防火墙。然后搜索正常访问5000端口。

    systemctl start firewalld
    firewall-cmd --zone=public --add-port=5000/tcp --permanent
    firewall-cmd --reload
    systemctl stop firewalld
    systemctl disable firewalld

    相关文章

      网友评论

          本文标题:元数据管理系统 Amundsen 安装及使用

          本文链接:https://www.haomeiwen.com/subject/ecjxqrtx.html