1 分布式事务算法
- 2pc
- 两阶段提交协议。它引入了一个事务协调者角色,来管理各个参与者
- 请求提交阶段
- 协调器向所有参与者发送事务请求,询问是否可以执行事务,然后各个参与者响应Yes或者No
- 提交阶段
- 基于第一阶段的投票结果进行决策
- 当所有参与者同意提交,协调者才会通知各个参与者提交事务。
- 否则协调者通知各个参与者取消事务。
- 参与者接收到协调者发来的消息执行 本地commit或者 Rollback.
- 优点
- 利用数据库自身的功能进行本地事务的提交和回滚,也就是提交和回滚实际不需要我们实现。
- 不足
- 提交协议是阻塞协议,如果事务协调器宕机,某些参与者将无法解决他们的事务问题
- 同步阻塞
- 基于数据库实现会锁住资源。
- 不基于数据库实现,也可能要持数据库资源。
- 单点故障
- 事务协调者挂了,整个事务就执行不下去了。
- 如 参与者发生完准备命令之后挂了,每个本地资源都会处于锁定状态。
- 数据不一致问题
- 网络抖动,导致某些参与者无法收到协调者的请求,而某些收到了,导致数据不一致。
- XA 规范
- 重做日志(redo log)
- 每当有操作执行前,在数据真正更改前,会先把相关操作写入 redo 日志
- 回滚日志(undo log)
- 二进制日志(binlog)
- 记录了所有的 DDL 和 DML 语句,除了数据查询语句 select、show 等,还包含语句所执行的消耗时间
- 3PC
- 过程
- 提交请求阶段
- 协调器向所有参与者发送事务请求,询问是否可以CanCommit,然后各个参与者响应Yes或者No
- 预提交
- 协调者从所有参与者反馈都是Yes响应
- 协调者从所有参与者反有一个No反应,或者超时
- 提交( 只要预提交成功, 则一定要保证 真实提交成功,即使协调器下一阶段不可用,一般是通过重试补偿的策略)两种情况
- 执行提交
- 发送提交请求
- 所有参与者返回Ack响应后,进入提交阶段,协调者向所有参与者发送提交请求。
- 超时提交 如果参与者 超时没有收到提交请求,也进行提交。
- 事务提交
- 参与者接受到提交请求后,执行真正事务提交,完成事务释放资源。
- 响应反馈
- 完成事务
- 中断事务
- 协调者 预提交 没有收到参与者Ack 响应(非ack或者超市),执行中断。
- 如何解决2pc同步阻塞问题
- 对2pc 改进
- 引入超时机制
- 添加预阶段,保证最后提交阶段参与者节点状态的一致性。
2 cap
- 不同的分布式系统要根据业务场景和业务需求在 CAP 三者中进行权衡,系统设计时需要衡量的因素,而非进行绝对地选择。
- 一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾
- 当出现网络分区,是保证系统可用性,还是保证数据的一致性
- 网络没有出现分区时
- CAP 理论并没有给出衡量 A 和 C 的因素,但是有 系统数据同步的时延
3 Base 理论
- 核心思想 最终一致性。无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当方式来使系统达到最终一致性。
- 保证系统的可用性,然后通过最终一致性来代替强一致性
3.1 三要素讲解
- 基本可用
- 软状态
- 允许系统中的数据存在中间状态,允许系统在多个不同节点的数据副本存在数据延时。
- 最终一致性
- 应当保证在期限过后所有副本保持数据一致性。
- 实现的时间取决于
- 网络延时
- 系统负载
- 不同的存储选型
- 不同数据复制方案
2 CRDT
- 免冲突的可复制的数据类型
- 用于数据跨网络复制并且可以自动解决冲突达到一致,非常适合AP架构。
2.1 符合CRDT的条件
- update操作和merge操作满足
- 交换律
- (a+(b+c)=(a+b)+c) 即分组没有影响
- 结合律
- 幂等律
2.2 概念和名词
- object: 可以理解为“副本”
- operation: 操作接口,由客户端调用,分为两种,读操作query和写操作update
- query: 查询操作,仅查询本地副本
- update: 更新操作,先尝试进行本地副本更新,若更新成功则将本地更新同步至远端副本
- merge: update在远端副本的合并操作
2.3 类型
- op-based CRDT
- update操作本身满足三律,merge操作仅需要对update操作进行回放
- state-based CRDT
- update操作无法满足条件,则可以考虑同步副本数据,同时附带额外元信息,通过元信息让update和merge操作具备以上三律。
2.4 常见数据类型
- 计数器(Counters)、寄存器(Registers)、集合(Sets),图(graph),数组(Array),Map
- Counters
- G-Counter (Grow-only Counter) 只增长
- 可行的方式
- 每个副本都使用一个数组保留所有的副本值
- update时只操作当前副本在数组中的对于项目
- merge时对数组每一项求max进行合并
- query时 返回数组的和。
- PN-Counter(Positive-negative counter) 增长又减少
- 构造两个G-Counter,一个存放increment的累加值,一个存放decrement的累加值
- G-Counter 可行流程
时间 |
客户端A |
客户端B |
初始时刻 |
arryA[A,B]={0,0} |
arryB[A,B]={0,0} |
t时刻 update |
arryA[A+1,B]={1,0} |
arryB[A,B+2]={0,2} |
t2时刻 merge max(arryA,arryB) |
arryA[A,B]={1,2} |
arryB[A,B]={1,2} |
t3时刻 query sum(arry) |
3 |
3 |
- Registers 本质是一个string,仅支持一种写操作assign(), 读value()操作
- Last Write Wins -register(LWW-Register)
- 简单做法 后assign的覆盖先assign的,方式每次修改带有时间戳。
- update时通过时间戳生成偏序关系
- merge时只取较大时间戳附带的结果
- Multi-valued -register(MV-Register)
- 类似G-Counter,每次assign都会新增一个版本,使用max函数进行merge
- Sets 两种写操作,add和remove,多节点并发进行add和remove操作是无法满足交换律的
- Grow-Only Set (G-Set)
- add操作本质求并,满足Op-based CRDT。
- Two-phase set(2P-Set)
- 使用两个G-Set来实现,一个addSet用于添加,一个removeSet用于移除
- 十分不实用,一方面已经被删除的元素不能再次被添加,一方面删除的元素还会保留在原set中,占用大量空间。
- Last write wins set(LWW-element Set)
- 为了解决删除元素不能再次添加的问题,可以考虑给2P-Set中A和R的每个元素加一个更新时间戳,其它操作保持不变
- 查询的时候 元素E 是否存在
- 存在
- 在SetAdd 中存在
- 在SetRmove 存在,但是SetRmove 的版本号小于 SetAdd的版本号
- 在SetRmove 不存在。
- 优化
- 不要R集合,而A集合中每一个元素除了维护一个更新时间戳之外,还有一个删除标志位。
- Observed-remove set(OR-Set)
- 思想
- 每次add(e)的时候都为元素e加一个唯一的tag,remove(e)将当前节点上的所有e和对应的tag都删除
- 这样在remove(e)同时其它节点又有并发add(e)的情况下e是能够最终保证添加成功,此种语义称为add wins
- 问题
- 重复add和remove的场景下会产生大量的tag,空间需要优化
- 在考虑空间优化的前提下如何生成全局唯一的tag
- 需要考虑如何进行垃圾回收
<!--wurmloch-crdt的实现-->
<dependency>
<groupId>com.netopyr.wurmloch</groupId>
<artifactId>wurmloch-crdt</artifactId>
<version>0.1.0</version>
</dependency>
@Test
public void testPNCount() {
LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);
PNCounter replica1 = crdtStore1.createPNCounter("ID_1");
PNCounter replica2 = crdtStore2.findPNCounter("ID_1").get();
replica1.increment();
replica2.decrement(2L);
System.out.println(replica1.get() == -1L);
System.out.println(replica2.get() == -1L);
crdtStore1.disconnect(crdtStore2);
replica1.decrement(3L);
replica2.increment(5L);
System.out.println(replica1.get() == -4L);
System.out.println(replica2.get() == 4L);
crdtStore1.connect(crdtStore2);
System.out.println(replica1.get() == 1L);
System.out.println(replica2.get() == 1L);
}
网友评论