分布式系统下数据的一致性和系统可用性一直是个难题,工程上也有多种解决方案,如:mysql/RocketMQ的主备复制方案,本文将借助软负载服务的研究对Raft分布式一致性协议做分析和讨论。
本文主要分为以下几个部分:
- 分布式下数据一致性的问题解决方
- 服务发现/软负载/配置管理的介绍
- Raft协议的原理
- Raft在软负载集群中的实现
1、分布式下数据一致性问题解决方案
根据CAP理论,分布式系统中,一致性、可用性和分区容错性三者无法同时满足,并且最多只能满足其中两个。
对于分布式系统的高可用方案,业界有一些通用的解决方案:
分布式高可用方案对比
其中横轴代表了分布式系统中通用的高可用解决方案,如:冷备、Master/Slave、Master/Master、两阶段提交以及基于Paxos算法的解决方案;纵轴代表了分布式系统所关心的各项指标,包括数据一致性、事务支持的程度、数据延迟、系统吞吐量、数据丢失可能性、故障自动恢复时间。
Master/Slave解决方案的典型实现有mysql、RocketMQ。
Paxos解决方案的实现有阿里的OceanBase。
Raft协议产生的背景在于解决分布式系统下数据的一致性问题,本文将主要集中在介绍Raft算法实现的研究上。
2、ConfigServer/VipServer/Diamond
ConfigServer(服务发现)、VipServer(软负载)、Diamond(配置管理)三个产品容易引起疑惑,因为三者有一定的相似性,功能上都有配置管理的能力,设计上都基于配置的pub/sub模式,也都需要解决数据的一致性和应用的稳定性问题,因此难免会让人产生困惑,为什么需要三套不同的产品来实现近似一样的功能,对此我们先明确下三者在应用结构中所处的位置,如下图:
ConfigServer/Diamond/VipServer的关系一次请求的过程如下:
- 用户通过域名www.taobao.com访问,经过DNS解析获得域名地址,请求到达淘宝服务器机房
- http请求进过lvs进行一次负载均衡,到达ngnix服务器
- ngnix服务器通过内置的vipService客户端,按照一定算法选择一个ip
- 根据ip被ngnix转发到实际处理的机器,对应的应用服务开始处理请求
- 如果需要进行rpc调用,则用户configServer获取对应服务的机器ip,发起rpc调用
- 请求处理过程中,如果需要获取配置则通过diamond来获取
由此可见:
-
ConfigServer/VipServer关注于“服务发现”领域,服务=ip:port,运行时需要对服务进行活性检测,动态管理
- ConfigServer主要服务于内部PRC框架的服务发现
- VipServer主要服务于http服务发现,后期希望替换LVS发挥负载均衡的能力
-
Diamond关注与“服务配置”领域,配置=key:value,配置一经添加,一直存在,无需活性检测
在上图的架构中VipServer集群对于数据的持久化和一致性的保障是基于Diamond的,而Diamond则是基于mysql做数据的持久化。过重的依赖无疑会极大的限制VipServer在负载均衡上的能力,阿里巴巴负载均衡技术之LVS和VIPServer的发展介绍中,未来VipServer将下沉作为基础的负载均衡设施用于替换lvs,上图中3替换掉1,所以去依赖是VipServer必须要完成的,去掉对Diamond的依赖之后,对于数据的持久化和一致性则需要自己来完成。
去Diamond之后,VipServer集群中的每台server都会存储全量数据,提高了系统容灾能力,其中集群中机器上的数据一致性使用Raft协议来保证。
3、Raft的原理
在介绍Raft算法原理之前,可以借助Raft的动画演示先直观的看看Raft在解决什么问题以及解决的过程。
相比于Paxos的晦涩难懂,Raft被设计的更易于理解和实现:
- Raft简化了状态空间,集群中的机器在任何时候只能处于leader、follower和candidate三种状态之一
- Raft为了解决Paxos架构对真实场景下的架构不够友好的问题,被设计为单leader算法
选举出一个leader负责管理复制日志,保障一致性其他服务器都是follower,被动地接受数据
leader挂掉,部分follower会参加竞选,其中一个称为下一次的leader - leader宕机,会触发系统自动重新选主,选主期间系统对外不可服务
- Raft中少数派宕机,不会影响系统整体的可用性
Raft算法状态转换如下图,主要涉及到leader选举、日志复制(数据同步)、安全性三个部分:
Raft状态转换图3.1 leader选举
集群选举的过程如下:
-
集群刚启动时,所有机器都是follower,这个时候系统整体不可写
-
follower先等待,在等待期间如果没有收到leader的心跳,则认为leader已经不可用,并将自己作为candidate向集群中的其他机器发起选举,选取自己为leader。
-
其他follower收到candidate发送过来的选举请求,并将candidate的选举号(term)和自己的选举号比较,如果candidate大于自己的选举号,则投票给candidate作为leader,否则放弃投票。
-
对每个candidate来说,这次竞选会有三种情况:
- 如果candidate收到超一半的选票,那么它获胜为这次选举的leader,本次竞选结束,接下来就是向其他服务器发送心跳、并对外提供服务
- 如果candidate选票不够,却有其他candidate说自己成为leader,那么candidate退化为follower,本次竞选结束
- 如果candidate选票不够,且没有其他服务器自称竞选成功(比如三个candidate平分选票时),那么就只能再等随机长度的一段时间,重新发起下一轮的竞选号召选票过程
3.2 日志复制(数据同步)
leader被选举出来以后,所有的读写请求都会经过leader来处理,leader负责对follower进行数据同步,即日志复制。
日志复制包括历史数据同步和数据更新请求:
-
历史数据同步,leader会和所有follower进行心跳,心跳会带上最新的数据版本号
- 如果follower有这个版本号并且是最新的那个,说明它的数据已经是最新,什么同步都不用做
- 如果follower有这个版本号但不是最新的那个,则保持leader的数据
-
更新请求,leader接收请求,如果不失败是按照下述步骤执行的:
- 在本机上保存数据(未提交)
- 发给所有follower让它们也保存数据(未提交)
- leader收到过半的成功回复,认为保存成功,leader就会在本机上正式提交这份数据
- 回复客户端操作完毕
- 向所有的follower发送提交命令
3.3 安全性
集群中机器在可能遇到的问题:
-
网络延迟、机器宕机恢复等原因,是否会导致竞选错误任期错误?
不会,每轮选的candidate要选票的时候,都需要附带当前选举号;而follower需要保证,每个选期只投票给第一个联系自己的candidate
-
如果第一轮选出来的leader因为延迟没告诉某个candidate,在不同选期内是否会出现两个leader同时存在?
不会,在follower在接收到candidate的选举请求时,需要对比candidate的选举号(term),如果candidate的term比自己的小则拒绝投票。
-
是否存在选举无主问题,即一直选不出leader?
simple-decree Paxos也存在这种活性(Liveness)问题,Raft的解法是,通过将等待时间随机化(比如150ms~300ms)来减少这种情况。
-
leader是否会删除或者更改数据?
不会,被选举出的leader被设计为数据正确且完整的(有所有提交的数据),保证了数据只会从leader流向follower
-
如果某个follower在日志复制时失败,然后又在新的选举中当选,那么有可能选举出一个数据不正确或不完整的leader?又或者是否选举出的leader会不会是没有最新数据的那个?
不会,Raft采用了一种巧妙的方法:
- candidata在号召投票给自己时,(除了需要带上选举版本号外)还需要带上自己的数据最新版本号
- follower在投票时,(除了只给最新选举版本号外)不给数据版本号低于自己同意票
4、Raft在软负载集群中的实现
软负载集群中的机器都保存全量的机器信息(host:ip+port),机器之间的数据同步由Raft协议保证。
根据Raft原理的介绍,在实际的实现过程中,首要解决leader的选举过程。
4.1 选举
机器启动之后,每台机器都会注册选举和心跳的定时任务。
public static void init() throws Exception {
peers.add(VIPServerProxy.getVIPServers());
RaftStore.load();
while (true) {
if (notifier.tasks.size() <= 0) {
break;
}
}
Timer.register(new MasterElection());
Timer.register1(new HeartBeat());
Timer.register(new AddressServerUpdater(), Timer.ADDRESS_SERVER_UPDATE_INTVERAL_MS);
if (peers.size() > 0) {
if (lock.tryLock(3, TimeUnit.SECONDS) ) {
initialized = true;
lock.unlock();
}
} else {
throw new Exception("peers is empty.");
}
}
选举的等待时长是不定的,用于避免选举的无主问题
public void resetLeaderDue() {
leaderDueMs = Timer.LEADER_TIMEOUT_MS + RandomUtils.nextLong() % Timer.RAMDOM_MS;
}
等待时长过期则开始选举
public static class MasterElection implements Runnable {
@Override
public void run() {
try {
RaftPeer local = peers.local();
local.leaderDueMs -= Timer.TICK_PERIOD_MS;
if (local.leaderDueMs > 0) {
return;
}
local.resetLeaderDue();
local.resetHeartbeatDue();
//等待过期开始发起选举
sendVote();
} catch (Exception e) {
Loggers.RAFT.warn("VIPSERVER-RAFT", "error while master election", e);
}
}
public static void sendVote() {
RaftPeer local = peers.get(NetUtils.localIP());
local.term++;
local.voteFor = local.ip; //自己作为CANDIDATE,发送给其他follower
local.state = RaftPeer.State.CANDIDATE;
//向其他机器发送选举请求
voteRets = sendVoteToAllFollowers(local);
//根据选举结果确定自己是否成为了leader,得到超过一半的选票
decideLeader(voteRets)
}
//follower接收CANDIDATE发送的选举请求
public static RaftPeer receivedVote(RaftPeer remote) {
RaftPeer local = peers.get(NetUtils.localIP());
//CANDIDATE的选举号小于自己的,不参与投票
if (remote.term <= local.term) {
throw new IllegalArgumentException(msg);
}
//开始投票,重设自己的选举等待周期
local.resetLeaderDue();
//自己作为投票FOLLOWER给remote
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
local.term = remote.term;
}
}
如果选举成功,则CANDIDATE成为leader,leader开始同follower进行心跳和日志复制(数据同步)。
如果选举失败,则等待下一个follower的选举周期到期,进行新一轮选举。
4.2 心跳
leader负责对向所有的follower发送心跳和数据同步。
public static class HeartBeat implements Runnable {
@Override
public void run() {
try {
RaftPeer local = peers.local();
local.heartbeatDueMs -= Timer.TICK_PERIOD_MS;
if (local.heartbeatDueMs > 0) {
return;
}
local.resetHeartbeatDue();
//定时心跳
sendBeat();
} catch (Exception e) {
Loggers.RAFT.warn("VIPSERVER-RAFT", "error while sending beat", e);
}
}
public static void sendBeat() throws IOException, InterruptedException {
//自己不是leader不能发生心跳
if (local.state != RaftPeer.State.LEADER) {return;}
local.resetLeaderDue();
//构建leader上的数据信息key、timestamp,不包括具体的value
data.raftPeer = local;
data.key = key;
data.timestamp = timestamp;
data.value = null;
sendBeatToAllFolowers(data.key);
//...
}
public static RaftPeer receivedBeat(JSONObject beat) throws Exception {
final RaftPeer local = peers.local();
final RaftPeer remote = beat.getObject("peer", RaftPeer.class);
//收到的心跳不是leader发来的,丢弃
if (remote.state != RaftPeer.State.LEADER) {
throw new IllegalArgumentException("xxx");
}
//收到心跳的选举号小于自己的,丢弃
if (local.term > remote.term) {
throw new IllegalArgumentException("xxx");
}
if (local.state != RaftPeer.State.FOLLOWER) {
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
}
final JSONArray beatDatums = beat.getJSONArray("datums");
//重置自己的选举周期和心跳周期,用于follower长时间没有收到leader发来的心跳
//leader挂掉时,自己可以重新发起选举流程
local.resetLeaderDue();
local.resetHeartbeatDue();
peers.makeLeader(remote);
//根据leader data.timestamp和本机的timestamp,确定是否需要从leader更新数据
updateDataFromLeader(data);
//...
}
public static void updateDataFromLeader(remoteData){
if(remoteData.timestamp > local.timestamp){
//从leader获取数据并更新
}
}
}
4.3 数据更新
整个集群中,只有leader能对外提供数据更新服务。
public static void signalPublish(String key, String value) throws Exception {
operateLock.lock();
try {
if (!RaftCore.isLeader()) {
JSONObject params = new JSONObject();
params.put("key", key);
params.put("value", value);
//如果自己不是leader,则将请求转发给leader去处理
RaftProxy.proxyPostLarge(API_PUB, params.toJSONString());
return;
}
//请求到达leader,处理数据更新
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
datum.timestamp = System.currentTimeMillis();
JSONObject json = new JSONObject();
json.put("datum", datum);
json.put("source", peers.local());
//leader数据更新
onPublish(json);
//发送数据到所有的follower进行更新
successOverHalf = sendPublisToAllFollowers(datum);
//数据是否有一半机器更新成功,则通知保存数据
if(successOverHalf){
sendStoreToAllFollowers()
}
//...
}
follower接收到leader发送来的数据更新请求
public static void onPublish(JSONObject params){
RaftPeer source = params.getObject("source", RaftPeer.class);
Datum datum = params.getObject("datum", Datum.class);
//如果不是leader发送过来的则丢弃
if (!PeerSet.isLeader(source.ip)) {
throw new IllegalStateException("xxx");
}
//如果接收到的选举版本号低于自己的则丢弃
if (source.term < local.term) {
throw new IllegalStateException("xxx");
}
//更新自己的选举时间
local.resetLeaderDue();
//更新数据到文件
RaftStore.write(datum);
}
5、后记
本文专注于软负载系统中数据的一致性保证以及raft协议的分析,对于软负载系统中涉及到其他的问题,如:负载均衡的策略、活性检测、集群部署方案等没有过多的深入。
网友评论