背景
工作中希望将复杂的数据Job的属性及相互间的依赖关系通过图形化展示,以便于清晰的展示路径依赖。现有的数据Job的元数据信息存储在关系型数据库,格式如下(数据为测试数据):
![](https://img.haomeiwen.com/i6650437/fba0295ede2ef300.png)
字段含义:
- id:Job的编号
- name:Job的名称
- depencies:Job依赖的Job的编号
- owner:Job的归属账号
- lvl:Job的优先级
-
user:Job的创建人
目标通过在图数据中建立4类节点,4种关系描述数据任务的属性及相互关系,具体结构如下:
image.png
图形数据库采用Neo4J数据库,开发语言为Python3.7,开发IDE采用Pycharm/Jupyter
初始化图数据库
首先导入当前的全量数据存为DataFrame,为了讲解方便,数据我下载到了本地Excel文件,实际部署时改成连接关系型数据库
from py2neo import Graph,Node,Relationship,NodeMatcher,RelationshipMatcher
import numpy as np
import pandas as pd
basic_dataframe=pd.read_excel('D:\\jobrelation\\test.xlsx',sheet_name=0,usecols='A:F')
数据导入后,需要检查下数据的质量,做一些必要的数据清洗
- 数据治理
print(incre_dataframe.isnull().any())#查看列中是否有空值
incre_dataframe.dtypes#查看列数据类型
incre_dataframe['id']=incre_dataframe['id'].astype(str)#更改列的数据类型
incre_dataframe['lvl']=incre_dataframe['lvl'].astype(str)
incre_dataframe['depencies'].fillna('config_time_run',inplace=True)#处理空值
在确保数据质量后,连接图数据库
graph=Graph("http://localhost:7474",username='neo4j',password='passwd')
然后分别来初始化各节点
- 创建标签是Job的节点
for i in range(basic_dataframe.shape[0]):
try:
n=Node('Job',name=basic_dataframe.iloc[i]['name'],JobId=basic_dataframe.iloc[i]['id'])
graph.create(n)
except Exception as e:
print(e)
print(basic_dataframe.iloc[i]['id'])
print(i)
- 创建标签是Lvl的节点
lvl_dict={'1':'low','2':'medium','3':'high'}
for name,group in basic_dataframe.groupby('lvl'):
na=Node('Lvl',LvlId=name,LvlName=lvl_dict.get(name))
graph.create(na)
- 创建标签是Owner的节点,并将数据记录在字典
owner_dict={}
owner_start_id=100000
for name,group in basic_dataframe.groupby('owner'):
owner_dict[name]=owner_start_id
na=Node('Owner',OwnerId=owner_start_id,OwnerName=name)
graph.create(na)
owner_start_id+=1
- 创建标签是User的节点,并将数据记录在字典
user_dict={}
user_start_id=200000
for name,group in basic_dataframe.groupby('user'):
user_dict[name]=user_start_id
na=Node('User',UserId=user_start_id,UserName=name)
graph.create(na)
user_start_id+=1
如上,4类节点已创建完成,接下来创建各节点间的关系。
- 创建user和job的关系
matcher=NodeMatcher(graph)
for row in basic_dataframe.itertuples(index=True,name='Pandas'):
job_node=matcher.match('Job').where(JobId=getattr(row,'id')).first()
user_node=matcher.match('User').where(UserId=user_dict.get(getattr(row,'user'))).first()
r=Relationship(user_node,'Create',job_node)
graph.create(r)
- 创建job和owner的关系
for row in basic_dataframe.itertuples(index=True,name='Pandas'):
job_node=matcher.match('Job').where(JobId=getattr(row,'id')).first()
owner_node=matcher.match('Owner').where(OwnerId=owner_dict.get(getattr(row,'owner'))).first()
r=Relationship(job_node,'Belongto',owner_node)
graph.create(r)
- 创建job和level的关系
for row in basic_dataframe.itertuples(index=True,name='Pandas'):
job_node=matcher.match('Job').where(JobId=getattr(row,'id')).first()
lvl_node=matcher.match('Lvl').where(LvlId=getattr(row,'lvl')).first()
r=Relationship(job_node,'Level',lvl_node)
graph.create(r)
- 创建job和job之间的依赖关系
for row in basic_dataframe.itertuples(index=True,name='Pandas'):
job_node=matcher.match('Job').where(JobId=getattr(row,'id')).first()
depencies_job=str(getattr(row,'depencies'))
if depencies_job!='config_time_run' and depencies_job.find(':')!=-1:
for index,val in enumerate(depencies_job.split(':')):
depency_job_node=matcher.match('Job').where(JobId=val).first()
if not depency_job_node is None:
r=Relationship(job_node,'Depency',depency_job_node)
graph.create(r)
elif depencies_job!='config_time_run' and depencies_job.find(':')==-1:
depency_job_node=matcher.match('Job').where(JobId=depencies_job).first()
if not depency_job_node is None:
r=Relationship(job_node,'Depency',depency_job_node)
graph.create(r)
else:
pass
至此,全量数据的图数据库初始化已经完成,接下来实现增量的更新。
数据的增量更新
数据的增量更新考虑过两种方案,一是开启MySQL的binlog,实时同步元数据的更新操作;二是通过最后更新时间字段,每10分钟取一次增量数据,进行更新。综合考虑了项目的实时性要求和开发复杂度,选择了第二种方案。
- 导入增量数据,同上一步,依然以本地文件为例
incre_dataframe=pd.read_excel('d:\\jobrelation\\test.xlsx',sheet_name=1,usecols='A:F')
- 数据治理,同上,不再祥述
- 判断增量数据中是否有新增的owner,如有则加入Owner节点
max_owner_id=int(graph.run('match (owner:Owner) return max(owner.OwnerId)').to_data_frame().iat[0,0])#找出当前最大的ownerId,并转成int
for name,group in incre_dataframe.groupby('owner'):
if(nodematcher.match("Owner").where(OwnerName=name).first() is None):
max_owner_id+=1
na=Node('Owner',OwnerId=max_owner_id,OwnerName=name)
graph.create(na)
- 判断增量数据中是否是新增的User,如有则加入User节点
max_user_id=int(graph.run('match (user:User) return max(user.UserId)').to_data_frame().iat[0,0])
for name,group in incre_dataframe.groupby('user'):
if(nodematcher.match("User").where(UserName=name).first() is None):
max_user_id+=1
na=Node('User',UserId=max_user_id,UserName=name)
graph.create(na)
接下来是对增量数据中的Job信息的处理,本案中采取的解决办法是对增量数据中的JobId,如果在当前的Job节点中存在,则删除该JobId及其的相应关系,然后依据增量数据重建该JobId及其关系
- 重/新 建增量数据中JobId节点
#job_node_dict={}
for row in incre_dataframe.itertuples(index=True,name='Pandas'):
job_node=nodematcher.match('Job').where(JobId=getattr(row,'id')).first()
#job_node_dict[getattr(row,'id')]=(0 if(job_node is None) else 1)
if not job_node is None:
cql='match(job:Job{JobId:"'+getattr(row,'id')+'"}) detach delete job'
graph.run(cql)
n=Node('Job',name=getattr(row,'name'),JobId=getattr(row,'id'))
graph.create(n)
- 重/新 建增量数据JobId的相应关系
for row in incre_dataframe.itertuples(index=True,name='Pandas'):
job_node=nodematcher.match('Job').where(JobId=getattr(row,'id')).first()
name=str(getattr(row,'name'))
depencies_job=str(getattr(row,'depencies'))
user=str(getattr(row,'user'))
owner=str(getattr(row,'owner'))
lvl=str(getattr(row,'lvl'))
#job和依赖job间的关系
if depencies_job!='config_time_run' and depencies_job.find(':')!=-1:
for index,val in enumerate(depencies_job.split(':')):
depency_job_node=nodematcher.match('Job').where(JobId=val).first()
if not depency_job_node is None:
r=Relationship(job_node,'Depency',depency_job_node)
graph.create(r)
elif depencies_job!='config_time_run' and depencies_job.find(':')==-1:
depency_job_node=nodematcher.match('Job').where(JobId=depencies_job).first()
if not depency_job_node is None:
r=Relationship(job_node,'Depency',depency_job_node)
graph.create(r)
else:
pass
#job和Owner的关系
owner_node=nodematcher.match('Owner').where(OwnerName=owner).first()
r=Relationship(job_node,'Belongto',owner_node)
graph.create(r)
#job和level的关系
lvl_node=nodematcher.match('Lvl').where(LvlId=lvl).first()
r=Relationship(job_node,'Level',lvl_node)
graph.create(r)
#job和user的关系
user_node=nodematcher.match('User').where(UserName=user).first()
r=Relationship(user_node,'Create',job_node)
graph.create(r)
目前效果及后续方向
在图数据库的效果如下:[图片上传中...(image.png-12ee9d-1561103426948-0)]
![](https://img.haomeiwen.com/i6650437/a46416bfb5588098.png)
目前的效果需要在Neo4j的数据界面通过Cypher来查询,接下来希望能开发web页面,方便非开发人员使用。ps:稍微看了下图数据库与web结合的例子,前端这些东东真是头大啊!如果找不到前端资源,就只能硬着头皮自己来O(∩_∩)O哈哈~
网友评论