美文网首页
python nebula图数据库常用操作

python nebula图数据库常用操作

作者: 越大大雨天 | 来源:发表于2022-12-22 14:00 被阅读0次

    前言

    使用场景为:依赖NebulaGraph3.2.0图数据库,对一些数据节点做关联拓线查询,比如输入IP, 可查询展示该IP归属的地理位置、关联的域名、并继续往下根据域名查询解析的URL 、注册信息等,并以为图形的形式进行展示, 技术栈语言为Python。


    展示样例

    依赖安装及使用

    常用查询

    1. python创建nebula连接

    from nebula3.gclient.net import ConnectionPool
    from nebula3.Config import Config
    # define a config
    config = Config()
    config.max_connection_pool_size = 10
    # init connection pool
    connection_pool = ConnectionPool()
    # if the given servers are ok, return true, else return false
    ok = connection_pool.init([('127.0.0.1', 9669)], config)
    
    # option 1 control the connection release yourself
    # get session from the pool
    session = connection_pool.get_session('root', 'nebula')
    
    # select space
    session.execute('USE nba')
    
    # show tags
    result = session.execute('SHOW TAGS')
    print(result)
    
    # release session
    session.release()
    
    # option 2 with session_context, session will be released automatically
    with connection_pool.session_context('root', 'nebula') as session:
        session.execute('USE nba')
        result = session.execute('SHOW TAGS')
        print(result)
    
    # close the pool
    connection_pool.close()
    

    2. 常用nGQL语句及说明

    2.1 统计查询

    通过以下三步可获取图空间的点、边数量统计信息:

    SUBMIT JOB STATS;
    SHOW JOB <job_id>;
    SHOW STATS;
    

    2.2 常用数据查询语法

    最常用match语法查询,更加灵活
    match pattern详细语法参考: https://docs.nebula-graph.com.cn/3.2.0/3.ngql-guide/1.nGQL-overview/3.graph-patterns/

    • MATCH (v) RETURN v LIMIT 10, 任意查询10个点
    • MATCH ()-[e]->() RETURN e limit 10, 任意查询10条边
    • MATCH (a)-[e]->(b) WHERE id(a)=="xxx" RETURN a, e, b, 从id=xxx的a点出发,查询一级出边
    • MATCH (a)-[e]-(b) WHERE id(a)=="xxx" RETURN a, e, b, 取消方向,从id=xxx的a点出发,查询一级入表和出边
    • MATCH (a)-[e*0..2]->(b) WHERE id(a)=="xxx" RETURN a, e, b, 从id=xxx的a点出发,查询0-2级的出边
    • GO 0 TO 2 STEPS FROM "xxx" OVER <follow> YIELD properties($$), 使用GO遍历查询,与上一句作用一致,需要<follow>指定边类型
    • ...
      更多操作文档

    类代码示例

    代码包含python flask nebula客户端初始化、关联查询、结果解析

    from nebula3.data import DataObject
    from nebula3.gclient.net import ConnectionPool
    from nebula3.Config import Config
    import pandas as pd
    from typing import Dict, Union
    from nebula3.data.ResultSet import ResultSet
    from nebula3.mclient import MetaCache
    from nebula3.sclient.GraphStorageClient import GraphStorageClient
    from collections import defaultdict
    
    
    class NebulaClient:
        """
        nebula 查询客户端,定制化为tl图谱查询
        """
        def __init__(self, graph_servers=None, meta_servers=None, **nebula_config):
            self.graph_servers = graph_servers
            self.meta_servers = meta_servers
            self.username = None
            self.password = None
            self.space = None
            self.config = Config()
            self.graph_pool = ConnectionPool()
            self.graph_storage_client = None
            self._init_config(**nebula_config)
    
        def _init_config(self, **nebula_config):
            self.username = nebula_config.pop("username", "")
            self.password = nebula_config.pop("password", "")
            self.space = nebula_config.pop("space", "")
            for key, value in nebula_config.items():
                setattr(self.config, key, value)
            if self.graph_servers:
                self.graph_pool.init(self.graph_servers, self.config)
            if self.meta_servers:
                self.graph_storage_client = GraphStorageClient(MetaCache(self.meta_servers))
    
        def init_app(self, app):
            nebula_conf = app.config.get("nebula") or {}
            nebula_graphd_conf = nebula_conf.pop("graphd", {})
            nebula_metad_conf = nebula_conf.pop("metad", {})
    
            graph_hosts = nebula_graphd_conf.pop("host", [])
            meta_hosts = nebula_metad_conf.pop("host", [])
    
            self.graph_servers = [(item.split(":")[0], item.split(":")[1]) for item in graph_hosts]
            self.meta_servers = [(item.split(":")[0], item.split(":")[1]) for item in meta_hosts]
            self._init_config(**nebula_conf)
    
        def ngql_query(self, gql, space=None):
            if not space:
                space = self.space
            with self.graph_pool.session_context(self.username, self.password) as session:
                session.execute(f'USE {space}')
                result = session.execute(gql)
                return result
    
        def match_id_relation_edge(self, vid, space=None, variable_length_edges: Union[None, tuple, list] = None) -> ResultSet:
            """
            space: 命名空间
            vid:起始查询vid
            variable_length_edges: 指定路径长度范围, 用两个元素的数组或者元祖表示最小->最大长度, 例如[1,2]
            return: 边和点的查询集ResultSet
    
            """
            if not space:
                space = self.space
            if not variable_length_edges:
                # 默认只查询一个层级
                variable_length_edges = [0, 1]
            # 根据起始vid,向下关联查询
            gql = f'MATCH (source)-[e*{variable_length_edges[0]}..{variable_length_edges[1]}]->(target) WHERE ( id(source)  == "{vid}") RETURN source as source,e,target as target LIMIT 100'
            print(f"gql: {gql}")
            e_result = self.ngql_query(gql, space=space)
            return e_result
    
        def match_id_relation_edge_result_to_df(self, result: ResultSet) -> Union[pd.DataFrame, None]:
            """
            build list for each column, and transform to dataframe
            """
            if result.is_succeeded():
                source_v = result.column_values("source")
                edge_v = result.column_values("e")
                target_v = result.column_values("target")
                d: Dict[str, list] = {
                    "source": [self._parse_node(source) for source in source_v],
                    "edge": [self._parse_edge(edge) for edge in edge_v],
                    "target_v": [self._parse_node(target) for target in target_v]
                }
                return pd.DataFrame.from_dict(d)
            return None
    
        def match_id_relation_edge_result_to_struct(self, result: ResultSet):
            if result.is_succeeded() and not result.is_empty():
                source_v = result.column_values("source")
                edge_v = result.column_values("e")
                target_v = result.column_values("target")
    
                source_data = self._parse_node(source_v[0])
                target_data_list = [self._parse_node(target) for target in target_v]
                relationship = defaultdict(list)
                for target in target_data_list:
                    target_type = target.get("type")
                    # 若目标点和原点是同一个对象, 不做关联
                    if target.get("id") == source_data.get("id"):
                        continue
                    relationship[target_type].append(target)
    
                source_data.update({
                    "relation": relationship
                })
                return source_data
    
        def _parse_node(self, node):
            node = node.as_node()
            tag = node.tags()[0]
            node_parsed = {
                "id": node.get_id().as_string(),
                "type": tag,
                "isAlarm": False,
                "info": {k: self._parse_wrapper_value(v) for k, v in node.properties(tag).items()}
            }
            return node_parsed
    
        def _parse_edge(self, edge):
            edge = edge.as_list()[0].as_relationship()
            edge_parsed = {
                "edge_name": edge.edge_name(),
                "info": {k: self._parse_wrapper_value(v) for k, v in edge.properties().items()},
                "start_vertex_id": edge.start_vertex_id().as_string(),
                "end_vertex_id": edge.end_vertex_id().as_string(),
            }
            return edge_parsed
    
        @staticmethod
        def _parse_wrapper_value(value: DataObject.ValueWrapper):
            _value = value.get_value().value
            if isinstance(_value, bytes):
                return _value.decode(encoding="utf-8")
            return _value
    
        def scan_vertex(self, tag_name, space_name=None, *args, **kwargs):
            if not space_name:
                space_name = self.space
            resp = self.graph_storage_client.scan_vertex(
                space_name=space_name,
                tag_name=tag_name,
                *args, **kwargs)
            all_res = []
            while resp.has_next():
                result = resp.next()
                for vertex_data in result:
                    all_res.append(vertex_data)
                    print(vertex_data)
            return all_res
    
        def close(self):
            if hasattr(self.graph_pool, "close"):
                self.graph_pool.close()
    
    
    if __name__ == '__main__':
        graph_servers = [("localhost", 9669), ("localhost", 9669), ("localhost", 9669)]
        meta_servers = [("localhost", 9559), ("localhost", 9559), ("localhost", 9559)]
        nebula_tool = NebulaClient(graph_servers=graph_servers, meta_servers=meta_servers, username="root", password="nebula", space="test", max_connection_pool_size=10)
        # result = nebula_tool.ngql_query(space="tl_vast", gql='match (v) return v limit 10')
        # query_id_relations = 'match (a)-[e]-(b) where id(a) == "002b500b73952c997db130214ef03b26" return e;'
        # result = nebula_tool.ngql_query(space="tl_vast", gql='MATCH p = (source_v)-[e*1..1]->(target_v) WHERE ( id(source_v)  == "002b500b73952c997db130214ef03b26") RETURN p LIMIT 100')
        result = nebula_tool.match_id_relation_edge(vid="002b500b73952c997db130214ef03b26")
        df_result = nebula_tool.match_id_relation_edge_result_to_df(result)
        output_result = nebula_tool.match_id_relation_edge_result_to_struct(result)
        # print(nebula_tool.scan_vertex())
        print(result)
        print(df_result)
        print(output_result)
    

    相关文章

      网友评论

          本文标题:python nebula图数据库常用操作

          本文链接:https://www.haomeiwen.com/subject/btyvqdtx.html