开篇不啰嗦,先说重点:
这套框架具备的几个特性
1. 支持任务分布式并发执行(也就是多节点多任务同时执行)
2. 具备任务执行错误补偿机制
3. 具备服务故障任务转移机制(也就是一个任务节点挂了,它剩下的任务交给别的节点做)
4. 支持非幂等性任务
暂不具备的功能
1. 暂不支持优先级任务
2. 还没有监控
项目的github地址:https://github.com/changbo218/zk_task
现在可以介绍项目背景了,我们有这样一个需求,每天凌晨有600万+条记录需要处理,因为这些条数据没有优先级,我们希望可以并行执行,既然是并行,就意味着需要解决以下几个问题:
1. 这些记录如何分配?
2. 分配之后有任务执行失败了怎么办?
3. 执行任务的节点down了怎么办?
4. 任务节点的动态增减?
框架执行原理说明
这个框架用到了zookeeper的几个特性
1. 创建的临时节点当对应服务session断掉时会自动删除
2. CreateMode.EPHEMERAL_SEQUENTIAL 可以创建自增的临时节点
3. 用到了这两个监听事件:EventType.NodeDataChanged EventType.NodeChildrenChanged
4. zookeeper更新数据时自带乐观锁
首先对zookeeper的节点结构做下分析:
image说明:
/root是这套结构的根节点,左侧是任务执行部分,右侧是数据调度部分
/newdata节点存储的是将要执行的新数据,/task节点存储的是分配好的数据
/schedulenode和/scheduleserver是临时调度节点对应着的调度服务
/tasknode和/taskserver是临时任务执行节点对应着的执行服务
任务节点监听/task的NodeDataChanged
调度节点监听/newdata的NodeDataChanged和/task的EventType.NodeChildrenChanged
业务执行过程:
当有新的数据要执行时,把数据存储到/newdata下,这时调度节点会收到监听开始对这些新数据进行分配,把分配好的数据+对应的任务节点的name保存到/task下,这时任务节点会收到监听通知,每个任务会从/task下取数据,执行它自己的那一部分,执行完成后,会把/task和/newdata下的对应数据删除掉。
如果在执行过程中,有部分任务出错,那么该节点会把出错的任务记下来,当这部分任务全部处理完后,把错误的任务数据重新写回到/newdata下,让调度节点来重新分配,再次执行上述过程。(解决问题2)
如果在执行过程中,任务节点突然down了,这时就需要明确知道哪些数据执行完了,哪些数据还没执行,我这里的做法是每成功执行一条数据,都保存起来(我这里用的是memcache来保存,增加了第三方依赖,感觉比较笨,目前没想到好办法)。当任务节点down了,调度节点会收到监听通知(因为对/task做了NodeChildrenChanged监听), 这时调度节点会把所有分配给这个节点的数据找出来,去掉已经成功执行了的(memcache里拿),把剩下的重新做分配。(解决问题3)
至于问题4,因为任务服务创建的都是临时节点,即新启动一个服务,就会对应一个zookeeper的临时节点,任务停掉,这个节点也会自动被删除掉。而调度节点在分配数据时,都会获取最新的任务节点,所以加减节点不受影响。(解决问题4)
最后我们来解决下第一个问题,数据如何分配。
最容易想到的分配方式当然就是平均分配了,我们有600万的数据,就是600万个数据id,假如有三个执行节点,那么就是每个200万呗,那么这个200万怎么存,如果是1,2,3,4,5,....2000000这样存list,每次都要传输十几兆的数据,肯定不可取。
我的处理方式是把这个数据列表转成了数字字符串就是 "1-2000000",如果是不连续的,就是"1,3,5-11,23-55,100-101,105-120" 相邻的用-,不相邻的用, 并保证排序,这样可以把一个很大的list,转成一个很小的字符串,网络传输毫无压力。如果只是10个8个任务,那怎么存就无所谓了。
zk_task.core里有三个类,分别是
NumberConcatUtil.java 往一个数字字符串里面加一个(组)数字
NumberSubstringUtil.java 从一个数字字符串里面去掉一个(组)数字
NumberGroupUtil.java 对这个数字字符串进行分组
(毕竟不是搞算法的,以实现效果为目的吧。。。)
原理的介绍就这么多,下一篇就对代码进行下分析,以及对部分细节深入的分析。
网友评论