美文网首页
数据结构和算法在流程画布中的实际应用

数据结构和算法在流程画布中的实际应用

作者: GrowingIO | 来源:发表于2021-09-02 18:44 被阅读0次

图灵奖的获得者,Pascal 之父——Niklaus Wirth ,有个经典说法:“算法+数据结构=程序”(Algorithm+Data Structures=Programs)。我们以这个说法为思路,看在流程画布这个场景中,如何应用数据结构和算法来解决实际的业务需求。

树和图

在数据结构中,对树的描述为:树是n(n≥0)个结点的有限集,它或为空树(n=0);或为非空树,对于非空树T:

1. 有且仅有一个称之为根的结点;

2. 除根结点以外的其余结点可分为m(m>0)个互不相交的有限集T1, T2, …, Tm,其中每一个集合本身又是一棵树,并且称为根的子树(SubTree)。

由上述描述可以看出,树的结构定义是一个递归的定义,即在树的定义中又用到树的定义,它道出了树的固有特性。树常用的存储结构有:顺序存储、链式存储。

对图的描述为:图G由两个集合V和E组成,记为G=(V, E),其中V是顶点的有穷非空集合,E是V中顶点偶对的有穷集合。V(G)为图的顶点集合,不可以为空;E(G)为图的边集合,可以为空。如果边集E(G)为有向边的集合,则称图为有向图;如果边集E(G)为无向边的集合,则称图为无向图。

图常用的存储结构有:邻接矩阵、邻接表、十字链表、邻接多重表。

树和图是画布这个场景中关联度最高的两种数据结构(当然,最基础的还是链表结构)。

G6

G6 是一个图可视化引擎。它提供了图的绘制、布局、分析、交互、动画等图可视化的基础能力,借助 G6 的能力,我们可以快速搭建自己的图分析或图编辑应用。流程画布底层便使用了 antv/g6 来实现图可视化的功能。

根据对其功能使用程度的不同,我们梳理 G6 的核心概念如下:

G6中对图 Graph 接收的参数定义如下:

export interface GraphData {

    nodes?: NodeConfig[];

    edges?: EdgeConfig[];

    combos?: ComboConfig[];

    [key: string]: any;

}

官网给出的最简单的快速开始的 demo 代码片段如下:

const data = {

  nodes: [

    {

      id: 'node1',

      x: 100,

      y: 200,

    },

    {

      id: 'node2',

      x: 300,

      y: 200,

    },

  ],

  edges: [

    {

      source: 'node1',

      target: 'node2',

    },

  ],

};

const graph = new G6.Graph({

  container: 'mountNode',

  width: 800,

  height: 500,

});

graph.data(data);

graph.render();

由上面两段代码可以看出,nodes 和 edges 是对图结构中顶点集合和边集合的数据表示,同时通过 combos 字段实现了顶点分组的能力。看到此处,我们可以得出结论:真正的元素绘制部分其实无需关心,我们要做的更多是对顶点集和边集数据的管理。

顶点项中的 x、y 字段是可选的,当数据中没有节点位置信息,或者数据中的位置信息不满足需求时,需要借助一些布局算法对图进行布局。

由G6的核心概念可以看到,框架本身已经实现了多种经典布局算法,其中不乏以树图为主的脑图布局,但根据需求描述中对UI设计和交互的定义,现有布局算法无法满足我们的需求。因此我们不仅要实现自定义顶点、边,还要实时计算每个顶点的坐标值来实现自定义布局的逻辑。

G6 在提供更高的灵活性的同时,也因处理数据带来了不可避免的复杂性,节点的坐标值计算和节点管理(新增、删除)便是其中的典型场景。

数据结构定义

G6 本身是对图可视化的实现,但流程画布这个场景中,我们在真正的实现中采用了链式存储的树结构。

实际开发过程中,对节点的数据类型定义如下:

interface IWorkflowNode<T extends WorkflowNodeParamsXOR> {

  id: string;

  type: TWorkflowNodeType;

  params: T;

  parent: string[];

  children: string[];

}

说明:

1 parent 为父节点引用(以 id 形式存储)

2 children 为子节点引用(以 id 形式存储)

满足二叉数的双向链表结构

算法

我们需要对数据进行处理的点主要有两个:

1、节点坐标计算

2、节点删除时,对当前节点、关联边和子树的删除

基于上述的数据结构定义,实现上述两点功能均用到同一个算法:二叉树的深度优先、前序遍历算法(采用递归解法)。

如上图,如果采用深度优先、前序遍历算法,则访问节点的顺序应为:1、2、4、8、5、9、3、6、7。

到此为止,我们的预备知识已经全部给出,接下来是具体的实现细节。

实现

假设现在有如下图所示的一张画布:

nodes 数据细节如下(忽略了部分与坐标计算无关的字段)。

[

  {

    id: 'root-1630463843227',

    type: 'start',

    parent: [],

    children: ['root-1630463843227-0'],

  },

  {

    id: 'root-1630463843227-0',

    type: 'strategy',

    parent: ['root-1630463843227'],

    children: ['root-1630463843227-0-0', 'root-1630463843227-0-1'],

  },

  {

    id: 'root-1630463843227-0-0',

    type: 'strategy',

    parent: ['root-1630463843227-0'],

    children: ['root-1630463843227-0-0-0', 'root-1630463843227-0-0-1'],

  },

  {

    id: 'root-1630463843227-0-0-0',

    type: 'action',

    parent: ['root-1630463843227-0-0'],

    children: [],

  },

  {

    id: 'root-1630463843227-0-0-1',

    type: 'trigger',

    parent: ['root-1630463843227-0-0'],

    children: ['root-1630463843227-0-0-1-0'],

  },

  {

    id: 'root-1630463843227-0-0-1-0',

    type: 'action',

    parent: ['root-1630463843227-0-0-1'],

    children: [],

  },

  {

    id: 'root-1630463843227-0-1',

    type: 'action',

    parent: ['root-1630463843227-0'],

    children: [],

  },

]

坐标计算

1、数据预处理

从 nodes 和 edges 的数组列表形式生成根据 id 为 key 的对象类型,方便后续取值

y累计偏移量 MaxOffsetY 置为 0

选定起始节点并设定起始坐标为(0, 0)

开始递归求解每个节点的坐标

/**

* 计算依赖的相关变量

* indentX:x 方向固定偏移量

* gapVertical: y 方向固定偏移量

* MaxOffsetY: 当前节点计算时 y 方向的累计偏移量

*/

const indentX = 370;

const gapVertical = 50;

const MaxOffsetY = 0;

const parseWorkflowToDraw = (

  workflow: IWorkflow,

  resolveNode: ResolveNode

): [GraphData, IWorkflowNodeMap] => {

  const nodes = workflow.nodes;

  const edges = workflow.edges;

  const nodeMap: IWorkflowNodeMap = {};

  const edgeMap: IWorkflowEdgeMap = {};

  let rootId = '';

  for (const item of nodes) {

    if (item.type === 'start') {

      rootId = item.id;

    }

    nodeMap[item.id] = { ...item };

  }

  for (const edge of edges) {

    edgeMap[edge.target] = { ...edge };

  }

  if (!rootId) {

    throw new Error('Workflow 节点类型错误,无起始节点!');

  }

  MaxOffsetY = 0;

  const graphData: GraphData = {

    nodes: [],

    edges,

  };

  parseWorkflowToGraphData(

    nodeMap[rootId],

    0,

    0,

    null,

    [0, 0],

    nodeMap,

    edgeMap,

    resolveNode,

    graphData

  );

  return [graphData, nodeMap];

};

2、坐标计算

数据预处理(更多的是注入后续自定义节点时draw函数所需的数据)

计算自身节点尺寸

根据父节点位置、是否为分支流程及分支流程中的位置索引计算当前位置

根据系列条件判断是否需要在 y 方向上进行偏移量累加

如果有 children,则循环递归求解每一个 children 节点位置

const parseWorkflowToGraphData = (

  node: IWorkflowNode<WorkflowNodeParamsXOR>,

  indexInSibling: number,

  parentMatrixX: number,

  parentNodeType: TWorkflowNodeType,

  parentNodeSize: number[],

  nodeMap: IWorkflowNodeMap,

  edgeMap: IWorkflowEdgeMap,

  resolveNode: ResolveNode,

  graphData: GraphData

): IWorkflowDrawNode => {

  const nodeType = node.type;

  const condition = nodeType === 'start' ? true : edgeMap[node.id]?.condition;

  let newNode = {

    ...resolveNode(node),

    nodeType: nodeType,

    type: tagRenderNodeType(nodeType),

    condition,

  };

  // 计算节点尺寸

  const size = nodeSize(newNode);

  newNode.size = size;

  newNode.domSize = isBranchedNodeType(nodeType)

    ? [size[0] - padding - 130, size[1] - padding - 75]

    : size;

  // 计算节点坐标位置

  const matrixX = parentNodeType === null ? 0 : parentMatrixX + indentX;

  const matrixY =

    !condition && indexInSibling === 0 ? MaxOffsetY + parentNodeSize[1] + gapVertical : MaxOffsetY;

  newNode.x = matrixX;

  newNode.y = matrixY;

  if (!condition && indexInSibling === 0) {

    MaxOffsetY += parentNodeSize[1] + gapVertical;

  }

  const children = [];

  if (node.children.length > 0) {

    node.children.forEach((childId, index) => {

      const childNode = parseWorkflowToGraphData(

        nodeMap[childId],

        index,

        matrixX,

        nodeType,

        size,

        nodeMap,

        edgeMap,

        resolveNode,

        graphData

      );

      children.push(childNode);

    });

  } else {

    MaxOffsetY += size[1] + gapVertical;

  }

  newNode.children = children;

  graphData.nodes.push(newNode);

  return newNode;

};

节点删除

节点删除时,不仅需要删除当前节点,而且当前节点的子树、当前节点的入度边也应该一并删除。此时用到的依然是 N 叉树的递归解法,同时借助函数引用传参可以改变参数的特性,实现了对源数据进行直接变更的操作。

/**

* 删除任意节点、子树及其入度边

* @param node

* @param nodes

* @param edges

*/

const removeNode = (

  node: IWorkflowNode<any>,

  nodes: IWorkflowNode<any>[],

  edges: IWorkflowEdge[]

) => {

  const { children } = node;

  if (children?.length > 0) {

    children.forEach((child) =>

      removeNode(

        nodes.find((n) => n.id === child),

        nodes,

        edges

      )

    );

  }

  handleNodeDelete(node, nodes, edges);

};

const handleNodeDelete = (

  node: IWorkflowNode<any>,

  nodes: IWorkflowNode<any>[],

  edges: IWorkflowEdge[]

) => {

  // 定位元素

  const nodeIndex = nodes.findIndex((n) => n.id === node.id);

  const foundEdges = edges.filter((edge) => edge.target === node.id);

  // 清除父节点指向该节点的指针

  const parentNode = nodes.find((n) => nodes[nodeIndex].parent[0] === n.id);

  if (parentNode.children?.length) {

    parentNode.children = parentNode.children.filter((child) => child !== node.id);

  }

  // 删除元素

  nodes.splice(nodeIndex, 1);

  if (foundEdges?.length > 0) {

    foundEdges.forEach((v) => {

      const edgeIndex = edges.findIndex((_) => _.id === v.id);

      edges.splice(edgeIndex, 1);

    });

  }

};

ABTest

在未拓展 ABTest 类型节点前,整个画布的数据结构满足二叉树的定义。但扩展之后,由于子节点数量存在大于二的场景,所以不再满足二叉树的定义,但整体不影响链表+树的结构定义,是由二叉到N叉的扩展。

但 ABTest 的特殊性不仅体现在当前节点的后继数量不限,更重要的是他的后继节点可以作为一个容器,再包含一个特殊的小型画布。

因此,在原本的数据结构基础上,我们扩展了节点的类型定义,新增 childNodes 字段。childNodes 主要为了存储 ABTest 节点之后跟的 combo 组的数据,从这里作为入口,在不打断原本树结构的前提下,内部又可以插入一棵树的结构。其实到此处,已经不再是单纯的双向链表形式,又多了一丝广义表的味道。

interface IWorkflowNode<T extends WorkflowNodeParamsXOR> {

  comboId?: string;

  id: string;

  type: TWorkflowNodeType;

  params: T;

  parent: string[];

  children: string[];

  childNodes?: string[];

}

经过扩展后的数据结构对原有的算法逻辑其实不会影响,我们要做的是需要处理中间广义表结构带来的子树逻辑(包括子树的坐标计算、子树对后继节点位置坐标的影响等)。

假如有如上的一棵树,按照扩展后的类型定义及 N 叉数的遍历算法,则遍历顺序应该为:1、2、3、5、4、6、10、11、13、12、14、7、8、9。

因此在具体实现中,数据预处理逻辑基本不变,只是新增了 MaxOffsetYSnapshot 变量,作为计算广义表之前y 偏移量的快照。当中间为了计算广义表子树而影响到后继节点的 y 偏移量时,可以将后继节点计算时的 y 偏移量重置为快照值,以此保证不影响原有的树结构坐标计算。

坐标计算逻辑新增对 childNodes 带来的广义表数据的处理:

const parseWorkflowToGraphData = (

  node: IWorkflowNode<WorkflowNodeParamsXOR>,

  indexInSibling: number,

  parentMatrixX: number,

  parentNodeType: TWorkflowNodeType,

  parentNodeSize: number[],

  nodeMap: IWorkflowNodeMap,

  edgeMap: IWorkflowEdgeMap,

  resolveNode: ResolveNode,

  graphData: GraphData

): IWorkflowDrawNode => {

  const nodeType = node.type;

  const condition = ['start', 'combo'].includes(nodeType) ? true : edgeMap[node.id]?.condition;

  let newNode = {

    ...resolveNode(node),

    nodeType: nodeType,

    type: tagRenderNodeType(nodeType),

    condition,

  };

  // 计算节点尺寸

  const size = nodeSize(newNode);

  newNode.size = size;

  newNode.domSize = isBranchedNodeType(nodeType)

    ? [size[0] - padding - 130, size[1] - padding - 75]

    : size;

  // 计算节点坐标位置

  let matrixX =

    parentNodeType === null && !node.id.startsWith('combo') ? 0 : parentMatrixX + indentX;

  const matrixY =

    !condition && indexInSibling === 0 ? MaxOffsetY + parentNodeSize[1] + gapVertical : MaxOffsetY;

  newNode.x = matrixX;

  newNode.y = matrixY;

  if (!condition && indexInSibling === 0) {

    MaxOffsetY += parentNodeSize[1] + gapVertical;

  }

  // 处理combo类型节点中的小画布数据

  if (nodeType === 'combo') {

    if (node.childNodes?.length > 0) {

      MaxOffsetY -= gapVertical;

    }

    const comboGraphData: GraphData = {

      nodes: [],

      edges: [],

    };

    if (node.childNodes?.length) {

      parseWorkflowToGraphData(

        nodeMap[node.id.replace('root', 'combo')],

        0,

        matrixX - size[0] - 90,

        null,

        [0, 0],

        nodeMap,

        edgeMap,

        resolveNode,

        comboGraphData

      );

    }

    let maxXNode = null;

    let maxYNode = null;

    comboGraphData.nodes.forEach((node) => {

      if (!maxXNode || node.x > maxXNode.x) maxXNode = node;

      if (!maxYNode || node.y > maxYNode.y) maxYNode = node;

    });

    const width = maxXNode && maxXNode.x > 0 ? maxXNode.x + maxXNode.size[0] - matrixX : size[0];

    const height = maxYNode && maxYNode.y > 0 ? maxYNode.y + maxYNode.size[1] - matrixY : size[1];

    newNode.size = [width, height];

    graphData.nodes.push(...comboGraphData.nodes);

    graphData.edges.push(...comboGraphData.edges);

    // 计算 combo 组最大偏移量

    if (indexInSibling === 0) {

      MaxOffsetYSnapshot = matrixY + newNode.size[1] + gapVertical;

      const nodeIds = nodeMap[node.parent[0]].children.map((id) => id.replace('root', 'combo'));

      const maxWidth = computeComboMaxWidth(nodeIds, nodeMap, edgeMap, resolveNode);

      matrixX = matrixX + maxWidth;

    }

    MaxOffsetY = matrixY;

  }

  const children = [];

  if (node.children.length > 0) {

    node.children.forEach((childId, index) => {

      const childNode = parseWorkflowToGraphData(

        nodeMap[childId],

        index,

        matrixX,

        nodeType,

        size,

        nodeMap,

        edgeMap,

        resolveNode,

        graphData

      );

      children.push(childNode);

    });

    MaxOffsetY = Math.max(MaxOffsetYSnapshot, MaxOffsetY);

  } else {

    MaxOffsetY += newNode.size[1] + gapVertical;

  }

  newNode.children = children;

  graphData.nodes.push(newNode);

  return newNode;

};

节点删除,也是新增对 childNodes 的处理逻辑即可:

/**

* 删除任意节点、子树及其入度边

* @param node

* @param nodes

* @param edges

*/

const removeNode = (

  node: IWorkflowNode<any>,

  nodes: IWorkflowNode<any>[],

  edges: IWorkflowEdge[]

) => {

  const { children, childNodes } = node;

  if (childNodes?.length > 0) {

    childNodes.forEach((child) =>

      removeNode(

        nodes.find((n) => n.id === child),

        nodes,

        edges

      )

    );

  }

  if (children?.length > 0) {

    children.forEach((child) =>

      removeNode(

        nodes.find((n) => n.id === child),

        nodes,

        edges

      )

    );

  }

  handleNodeDelete(node, nodes, edges);

};

以上,我们可实现的复杂策略配置如下:

结语

“软件的复杂性是一个基本特征,而不是偶然如此”。但数据结构和算法拉平了程序开发人员对程序的认知,是控制复杂度的行之有效的手段。

相关文章

网友评论

      本文标题:数据结构和算法在流程画布中的实际应用

      本文链接:https://www.haomeiwen.com/subject/cedcwltx.html