美文网首页架构个人学习微服务
Spring Cloud Alibaba——Nacos 是如何同

Spring Cloud Alibaba——Nacos 是如何同

作者: 小波同学 | 来源:发表于2021-07-31 01:00 被阅读0次

    两种一致性策略如何在nacos中共存

    或许会有疑问,为什么早先cp模式的Zookeeper或者AP模式的Eureka,都只有支持CAP理论下大家常用的AP实现或者CP实现,而nacos却能够两个都实现呢?

    其实CAP理论,仅仅是针对分布式下数据的一致性而言,如果你对于数据的一致性要求不高,可忍受最终一致性,那么AP模式的Eureka就可以满足你了,如果说你对数据的一致性要求很高,那么就使用CP模式的Zookeeper,而追其根本,并不是说Eureka是AP的,或者说Zookeeper是CP的,而是他们存储的数据的一致性,满足AP或者CP,因此也就不难实现在一个组件中实现AP模式与CP模式共存。

    @DependsOn("ProtocolManager")
    @Service("consistencyDelegate")
    public class DelegateConsistencyServiceImpl implements ConsistencyService {
        
        private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService;
        
        private final EphemeralConsistencyService ephemeralConsistencyService;
    }
    

    DelegateConsistencyServiceImpl是一个一致性策略选择的类,根据不同的策略触发条件(在nacos中,CP与AP切换的条件是注册的服务实例是否是临时实例),选择PersistentConsistencyService策略或者EphemeralConsistencyService策略,而EphemeralConsistencyService对应的是DistroConsistencyServiceImpl,采用的协议是阿里自研的Distro,我个人觉得就像gossip协议;PersistentConsistencyService对应的是RaftConsistencyServiceImpl,其底层采用的是Raft协议;这两种一致性策略下的数据存储互不影响,所以nacos实现了AP模式与CP模式在一个组件中同时存在。

    Nacos AP实现:

    Nacos AP一致性策略 Distro

    CP实现

    重要的协议——RAFT

    腾讯文档——Raft论文

    Raft算法原论文

    一文搞懂Raft算法

    Raft为了实现容易理解的目标,在paxos的基础上进行的状态简化以及问题拆分,将之前复杂的逻辑拆成若干个子问题,基本上可以总结成下面几个方面:

    • leader election:选取主节点
    • log replication:日志备份,数据同步
    • safety:为了实现上述两点而产生的一些约束条件和保障条件

    leader election

    role

    首先先说明下Raft算法中节点的角色,分为以下三种:

    • leader:由所有节点选举,在candidate中产生,负责整个集群的状态以及元数据管理,当发现更大的term时,转化为follower。

    • candidate:由follower在集群选举时转化而成,选举时得到多数选票,则转化为leader,若发现主节点或者更大的term则转化为follower。

    • follower:集群初始化时所有节点的角色都是follower,若未发现leader心跳,则发起leader选举,并将角色转化为candidate;leader以及candidate在某些条件下也会转化成follower。

    给出状态机,协助大家理解:`


    term

    上面在讲解role的时候好几次说到了一个名词term,这是raft算法中的一个核心概念,很多的机制都需要依赖term。term通俗来讲就是任期,即一个leader正常工作的时间段,如果因为某些原因当前的leader不再是leader时,则该term结束,重新进行选举,开始新的任期,是不是和现实生活中的选举很像?另外term在raft算法中也起到了逻辑时钟的作用,在raft的实现中起到了重要的作用,此处用一句话先来概括:即term大的优先级高,leader必须是拥有更大的term。用白话理解:就是当前总统和前总统的关系,总统只能有一个就是当期总统,前总统在当前总统面前就变成选民了。在Raft中,term是个整数型的值,term变化即将term的值加1。

    深入浅出一文搞懂Raft协议:https://blog.csdn.net/microGP/article/details/114261089

    nacos是如何实现CP(raft)的

    RaftController

    Raft集群内部节点间是通过暴露的 Restful接口,代码在 RaftController 中。RaftController控制器是 Raft集群内部节点间通信使用的,具体的信息如下:

    POST HTTP://{ip:port}/v1/ns/raft/vote : 进行投票请求
    
    POST HTTP://{ip:port}/v1/ns/raft/beat : Leader向Follower发送心跳信息
    
    GET HTTP://{ip:port}/v1/ns/raft/peer : 获取该节点的RaftPeer信息
    
    PUT HTTP://{ip:port}/v1/ns/raft/datum/reload : 重新加载某日志信息
    
    POST HTTP://{ip:port}/v1/ns/raft/datum : Leader接收传来的数据并存入
    
    DELETE HTTP://{ip:port}/v1/ns/raft/datum : Leader接收传来的数据删除操作
    
    GET HTTP://{ip:port}/v1/ns/raft/datum : 获取该节点存储的数据信息
    
    GET HTTP://{ip:port}/v1/ns/raft/state : 获取该节点的状态信息{UP or DOWN}
    
    POST HTTP://{ip:port}/v1/ns/raft/datum/commit : Follower节点接收Leader传来得到数据存入操作
    
    DELETE HTTP://{ip:port}/v1/ns/raft/datum : Follower节点接收Leader传来的数据删除操作
    
    GET HTTP://{ip:port}/v1/ns/raft/leader : 获取当前集群的Leader节点信息
    
    GET HTTP://{ip:port}/v1/ns/raft/listeners : 获取当前Raft集群的所有事件监听者
    

    RaftPeerSet

    这个对象存储的是所有raft协议下的节点信息,存储的元素如下

    @Deprecated
    @Component
    @DependsOn("ProtocolManager")
    public class RaftPeerSet extends MemberChangeListener implements Closeable {
    
        // 集群节点地址管理
        private final ServerMemberManager memberManager;
    
        // 周期数
        private AtomicLong localTerm = new AtomicLong(0L);
    
        // 当前周期内的Leader
        private RaftPeer leader = null;
    
        // 所有的节点信息
        private volatile Map<String, RaftPeer> peers = new HashMap<>(8);
    
        // 暂时不清楚用途
        private Set<String> sites = new HashSet<>();
    
        // 本节点是否已准备完毕
        private volatile boolean ready = false;
    }
    

    同时还具备了raft协议下必要的方法

    @Deprecated
    @Component
    @DependsOn("ProtocolManager")
    public class RaftPeerSet extends MemberChangeListener implements Closeable {
    
        // 当前IP对应的节点是否是Leader
        public boolean isLeader(String ip) {
            if (EnvUtil.getStandaloneMode()) {
                return true;
            }
    
            if (leader == null) {
                Loggers.RAFT.warn("[IS LEADER] no leader is available now!");
                return false;
            }
    
            return StringUtils.equals(leader.ip, ip);
        }
        
        // 决定Leader节点,根据投票结果以及是否满足majorityCount机制
        public RaftPeer decideLeader(RaftPeer candidate) {
            peers.put(candidate.ip, candidate);
    
            SortedBag ips = new TreeBag();
            int maxApproveCount = 0;
            String maxApprovePeer = null;
            for (RaftPeer peer : peers.values()) {
                if (StringUtils.isEmpty(peer.voteFor)) {
                    continue;
                }
                // 选票计数
                ips.add(peer.voteFor);
                // 如果某节点的得票数大于当前的最大得票数,则更新候选Leader信息
                if (ips.getCount(peer.voteFor) > maxApproveCount) {
                    maxApproveCount = ips.getCount(peer.voteFor);
                    maxApprovePeer = peer.voteFor;
                }
            }
            // 是否满足majorityCount数量的限制
            if (maxApproveCount >= majorityCount()) {
                // 若满足则设置Leader节点信息
                RaftPeer peer = peers.get(maxApprovePeer);
                peer.state = RaftPeer.State.LEADER;
    
                if (!Objects.equals(leader, peer)) {
                    leader = peer;
                    ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));
                    Loggers.RAFT.info("{} has become the LEADER", leader.ip);
                }
            }
    
            return leader;
        }
    
        public RaftPeer makeLeader(RaftPeer candidate) {
            // 如果当前Leader与Candidate节点不一样,则进行Leader信息更改
            if (!Objects.equals(leader, candidate)) {
                leader = candidate;
                ApplicationUtils.publishEvent(new MakeLeaderEvent(this, leader, local()));
                Loggers.RAFT
                        .info("{} has become the LEADER, local: {}, leader: {}", leader.ip, JacksonUtils.toJson(local()),
                                JacksonUtils.toJson(leader));
            }
    
            for (final RaftPeer peer : peers.values()) {
                Map<String, String> params = new HashMap<>(1);
                // 如果当前节点与远程Leader节点不等且是Follower节点
                if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) {
                    try {
                        // 获取每个节点的RaftPeer节点信息对象数据
                        String url = RaftCore.buildUrl(peer.ip, RaftCore.API_GET_PEER);
                        HttpClient.asyncHttpGet(url, null, params, new Callback<String>() {
                            @Override
                            public void onReceive(RestResult<String> result) {
                                if (!result.ok()) {
                                    Loggers.RAFT
                                            .error("[NACOS-RAFT] get peer failed: {}, peer: {}", result.getCode(), peer.ip);
                                    peer.state = RaftPeer.State.FOLLOWER;
                                    return;
                                }
    
                                update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
                            }
    
                            @Override
                            public void onError(Throwable throwable) {
    
                            }
    
                            @Override
                            public void onCancel() {
    
                            }
                        });
                    } catch (Exception e) {
                        peer.state = RaftPeer.State.FOLLOWER;
                        Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip);
                    }
                }
            }
    
            return update(candidate);
        }   
    }
    

    RaftCore

    该对象是nacos中raft协议的主要实现,在启动之初,会进行一系列初始化的操作

    @Deprecated
    @DependsOn("ProtocolManager")
    @Component
    public class RaftCore implements Closeable {
    
        @PostConstruct
        public void init() throws Exception {
            Loggers.RAFT.info("initializing Raft sub-system");
            final long start = System.currentTimeMillis();
            // 进行日志文件的加载到内存数据对象Datums的操作
            raftStore.loadDatums(notifier, datums);
            // 设置当前的周期数
            setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
    
            Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
            // 初始化标识更改
            initialized = true;
    
            Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
            // 开启定时的Leader选举任务
            masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
            // 开启定时的Leader心跳服务
            heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
    
            versionJudgement.registerObserver(isAllNewVersion -> {
                stopWork = isAllNewVersion;
                if (stopWork) {
                    try {
                        shutdown();
                        raftListener.removeOldRaftMetadata();
                    } catch (NacosException e) {
                        throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
                    }
                }
            }, 100);
    
            NotifyCenter.registerSubscriber(notifier);
    
            Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
                    GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
        }
    }
    

    初始化的一系列操作完成后,此时集群还无法对外提供服务,因为此时Leader还未选举出来,需要在MasterElection选举Leader成功后才可以对外提供服务。

    Leader 选举任务

    @Deprecated
    @DependsOn("ProtocolManager")
    @Component
    public class RaftCore implements Closeable {
    
        // Leader 选举任务
        public class MasterElection implements Runnable {
    
            @Override
            public void run() {
                try {
                    if (stopWork) {
                        return;
                    }
                    // 当前节点是否已准备完毕
                    if (!peers.isReady()) {
                        return;
                    }
                    // 获取自身节点信息
                    RaftPeer local = peers.local();
                    // 本地存储的Leader任期时间
                    local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                    // 如果Leader任期时间还在允许范围内,则不进行Leader选举
                    if (local.leaderDueMs > 0) {
                        return;
                    }
    
                    // reset timeout
                    local.resetLeaderDue();
                    local.resetHeartbeatDue();
                    // 向其他节点发起投票请求
                    sendVote();
                } catch (Exception e) {
                    Loggers.RAFT.warn("[RAFT] error while master election {}", e);
                }
    
            }
    
            private void sendVote() {
    
                RaftPeer local = peers.get(NetUtils.localServer());
                Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),
                        local.term);
                //重置Raft集群数据
                peers.reset();
    
                local.term.incrementAndGet();
                // 设置给自己投票
                local.voteFor = local.ip;
                //将节点状态更新为候选节点
                local.state = RaftPeer.State.CANDIDATE;
    
                Map<String, String> params = new HashMap<>(1);
                params.put("vote", JacksonUtils.toJson(local));
                // 遍历所有的节点信息(除了自己之外)
                for (final String server : peers.allServersWithoutMySelf()) {
                    final String url = buildUrl(server, API_VOTE);
                    try {
                        //候选节点向除自身之外的所有其它Raft节点的/v1/ns/raft/vote发送HTTP POST请求
                        //请求内容为vote:JSON.toJSONString(local)
                        HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {
                            @Override
                            public void onReceive(RestResult<String> result) {
                                if (!result.ok()) {
                                    Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);
                                    return;
                                }
                                // 获取投票结果,并进行Leader的选举工作
                                RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);
    
                                Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
                                //候选节点收到其他节点投的候选节点数据,交给PeerSet.decideLeader方法处理
                                peers.decideLeader(peer);
    
                            }
    
                            @Override
                            public void onError(Throwable throwable) {
                                Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);
                            }
    
                            @Override
                            public void onCancel() {
    
                            }
                        });
                    } catch (Exception e) {
                        Loggers.RAFT.warn("error while sending vote to server: {}", server);
                    }
                }
            }
        }
    }
    

    每个节点启动时,都会认为自己可以作为Leader,因此都会以自去己作为被选举人,向其他节点发起投票请求,其他节点的接受投票请求路径为/v1/ns/raft/vote

    其他节点接收到投票请求,会调用RaftController.vote方法

    @Deprecated
    @RestController
    @RequestMapping({UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft",
            UtilsAndCommons.NACOS_SERVER_CONTEXT + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft"})
    public class RaftController {
    
        private final RaftCore raftCore;
        
        private final ClusterVersionJudgement versionJudgement;
    
        @PostMapping("/vote")
        public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception {
            if (versionJudgement.allMemberIsNewVersion()) {
                throw new IllegalStateException("old raft protocol already stop");
            }
            // 处理选举请求
            RaftPeer peer = raftCore.receivedVote(JacksonUtils.toObj(WebUtils.required(request, "vote"), RaftPeer.class));
            
            return JacksonUtils.transferToJsonNode(peer);
        }
    }
    
    
    @Deprecated
    @DependsOn("ProtocolManager")
    @Component
    public class RaftCore implements Closeable {
    
        // 节点接收到投票请求后的处理
        public synchronized RaftPeer receivedVote(RaftPeer remote) {
            if (stopWork) {
                throw new IllegalStateException("old raft protocol already stop work");
            }
            // 被选举人是否在raft集群节点列表中
            if (!peers.contains(remote)) {
                throw new IllegalStateException("can not find peer: " + remote.ip);
            }
            // 获取自身节点信息
            RaftPeer local = peers.get(NetUtils.localServer());
            // 如果被选举节点的周期数小于本节点的周期数,则将自己的投票投给自己并告诉被选举者
            // 若当前节点的 term 大于等于发送选举请求的节点 term,则选择自己为 leader
            if (remote.term.get() <= local.term.get()) {
                String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term;
    
                Loggers.RAFT.info(msg);
                if (StringUtils.isEmpty(local.voteFor)) {
                    local.voteFor = local.ip;
                }
    
                return local;
            }
            // 满足投票条件后,本节点确认将自己的票投给被选举者
            // 若当前节点的 term 小于发送请求的节点 term,选择发送请求的节点为 leader
            local.resetLeaderDue();
    
            local.state = RaftPeer.State.FOLLOWER;
            local.voteFor = remote.ip;
            local.term.set(remote.term.get());
    
            Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
    
            return local;
        }
    }
    

    通过以上步骤,最终选举出了Leader节点,接下来,就可以对外提供服务了。

    数据同步 日志同步

    当系统有了leader后,系统就进入对外工作了。客户端的一切请求发送到leader,leader来调度这些并发请求的顺序,并且保证leader与followers状态的一致性。在 raft 集群中,所有日志都必须首先提交至 leader 节点。leader 在每个 heartbeat 向 follower 同步日志。

    因为是CP模式,所以操作都是通过Leader节点进行传达的,Follower节点本身不与Client进行联系,Follower只能接受来自Leader的操作请求,因此就存在请求转发的问题。因此在RaftCore中的signalPublish以及signalDelete中,存在着对Leader节点的判断以及请求转发的逻辑。

    Raft协议下的注册流程入口也是在InstanceController.register方法,只不过在ConsistencyService的put方法实现由AP一致性调用的DistroConsistencyServiceImpl变成了RaftConsistencyServiceImpl。

    @Deprecated
    @DependsOn("ProtocolManager")
    @Service
    public class RaftConsistencyServiceImpl implements PersistentConsistencyService {
        
        private final RaftCore raftCore;
        
        @Override
        public void put(String key, Record value) throws NacosException {
            checkIsStopWork();
            try {
                raftCore.signalPublish(key, value);
            } catch (Exception e) {
                Loggers.RAFT.error("Raft put failed.", e);
                throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
                        e);
            }
        }
    }
    

    最终调用到 RaftCore 的 signalPublish() 方法。

    RaftCore.signalPublish()

    @Deprecated
    @DependsOn("ProtocolManager")
    @Component
    public class RaftCore implements Closeable {
    
        public void signalPublish(String key, Record value) throws Exception {
            if (stopWork) {
                throw new IllegalStateException("old raft protocol already stop work");
            }
            //不是leader
            if (!isLeader()) {
                ObjectNode params = JacksonUtils.createEmptyJsonNode();
                params.put("key", key);
                params.replace("value", JacksonUtils.transferToJsonNode(value));
                Map<String, String> parameters = new HashMap<>(1);
                parameters.put("key", key);
    
                final RaftPeer leader = getLeader();
                //请求转发 交给leader去做/v1/ns/raft/datum
                raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
                return;
            }
    
            OPERATE_LOCK.lock();
            // 若是 leader,将包发送给所有的 follower
            try {
                final long start = System.currentTimeMillis();
                final Datum datum = new Datum();
                datum.key = key;
                datum.value = value;
                if (getDatum(key) == null) {
                    datum.timestamp.set(1L);
                } else {
                    datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
                }
    
                ObjectNode json = JacksonUtils.createEmptyJsonNode();
                json.replace("datum", JacksonUtils.transferToJsonNode(datum));
                json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
                //发布数据改变通知 本地 onPublish 方法用来处理持久化逻辑
                onPublish(datum, peers.local());
    
                final String content = json.toString();
                //只要过半的结点数
                final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
                //遍历所有结点
                for (final String server : peers.allServersIncludeMyself()) {
                    if (isLeader(server)) {
                        //自己算一次
                        latch.countDown();
                        continue;
                    }
                    // 将包发送给所有的 follower,调用 /v1/ns/raft/datum/commit 接口
                    final String url = buildUrl(server, API_ON_PUB);
                    HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
                        @Override
                        public void onReceive(RestResult<String> result) {
                            if (!result.ok()) {
                                Loggers.RAFT
                                        .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                                datum.key, server, result.getCode());
                                return;
                            }
                            //异步完成
                            latch.countDown();
                        }
    
                        @Override
                        public void onError(Throwable throwable) {
                            Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
                        }
    
                        @Override
                        public void onCancel() {
    
                        }
                    });
    
                }
                //等待半数完成
                if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
                    // only majority servers return success can we consider this update success
                    Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
                    throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
                }
    
                long end = System.currentTimeMillis();
                Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
            } finally {
                OPERATE_LOCK.unlock();
            }
        }
    }
    

    在RaftController中的delete方法会调用到RaftCore.signalDelete方法

    @Deprecated
    @DependsOn("ProtocolManager")
    @Component
    public class RaftCore implements Closeable {
    
        public void signalDelete(final String key) throws Exception {
            if (stopWork) {
                throw new IllegalStateException("old raft protocol already stop work");
            }
            OPERATE_LOCK.lock();
            try {
                //不是leader
                if (!isLeader()) {
                    Map<String, String> params = new HashMap<>(1);
                    params.put("key", URLEncoder.encode(key, "UTF-8"));
                    // 删除请求进行转发给 leader 进行处理 /v1/ns/raft/datum
                    raftProxy.proxy(getLeader().ip, API_DEL, params, HttpMethod.DELETE);
                    return;
                }
                // 若是 leader,将包发送给所有的 follower
                // construct datum:
                Datum datum = new Datum();
                datum.key = key;
                ObjectNode json = JacksonUtils.createEmptyJsonNode();
                json.replace("datum", JacksonUtils.transferToJsonNode(datum));
                json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
    
                //删除相关信息,并发布数据改变通知
                onDelete(datum.key, peers.local());
                //遍历所有结点
                for (final String server : peers.allServersWithoutMySelf()) {
                    String url = buildUrl(server, API_ON_DEL);
                    // 将包发送给所有的 follower,调用 /v1/ns/raft/datum/commit 接口
                    HttpClient.asyncHttpDeleteLarge(url, null, json.toString(), new Callback<String>() {
                        @Override
                        public void onReceive(RestResult<String> result) {
                            if (!result.ok()) {
                                Loggers.RAFT
                                        .warn("[RAFT] failed to delete data from peer, datumId={}, peer={}, http code={}",
                                                key, server, result.getCode());
                                return;
                            }
    
                            RaftPeer local = peers.local();
    
                            local.resetLeaderDue();
                        }
    
                        @Override
                        public void onError(Throwable throwable) {
                            Loggers.RAFT.error("[RAFT] failed to delete data from peer", throwable);
                        }
    
                        @Override
                        public void onCancel() {
    
                        }
                    });
                }
            } finally {
                OPERATE_LOCK.unlock();
            }
        }
    }
    

    心跳机制,raft通过心跳机制来维持Leader以及Follower的关系

    在RaftCore实例化之后,在init方法中会开启定时的Leader心跳服务

    @Deprecated
    @DependsOn("ProtocolManager")
    @Component
    public class RaftCore implements Closeable {
    
        @PostConstruct
        public void init() throws Exception {
            
            //省略代码......
            
            // 开启定时的Leader选举任务
            masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
            // 开启定时的Leader心跳服务
            heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
    
            //省略代码......
        }
    
    }
    

    HeartBeat

    • 心跳任务,如果成为Leader,需要对 follower 发送心跳信息
    @Deprecated
    @DependsOn("ProtocolManager")
    @Component
    public class RaftCore implements Closeable {
    
        // 心跳任务,如果成为Leader,需要对 follower 发送心跳信息
        public class HeartBeat implements Runnable {
    
            @Override
            public void run() {
                try {
                    if (stopWork) {
                        return;
                    }
                    // 程序是否已准备完毕
                    if (!peers.isReady()) {
                        return;
                    }
    
                    RaftPeer local = peers.local();
                    local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                    // 心跳周期判断
                    if (local.heartbeatDueMs > 0) {
                        return;
                    }
                    // 重置心跳发送周期
                    local.resetHeartbeatDue();
                    // 发送心跳信息
                    sendBeat();
                } catch (Exception e) {
                    Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
                }
    
            }
    
            private void sendBeat() throws IOException, InterruptedException {
                RaftPeer local = peers.local();
                // 如果自己不是Leader节点或者处于单机模式下,则直接返回
                if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
                    return;
                }
                if (Loggers.RAFT.isDebugEnabled()) {
                    Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
                }
                // 重置Leader任期时间
                local.resetLeaderDue();
    
                // build data
                // 构建心跳包信息,local 为当前 nacos 节点的信息,key 为 peer
                ObjectNode packet = JacksonUtils.createEmptyJsonNode();
                packet.replace("peer", JacksonUtils.transferToJsonNode(local));
    
                ArrayNode array = JacksonUtils.createEmptyArrayNode();
                // 只发送心跳包,不带数据过去
                if (switchDomain.isSendBeatOnly()) {
                    Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());
                }
                // 将相关的 key 通过心跳包发送给 follower
                if (!switchDomain.isSendBeatOnly()) {
                    // 如果开启了在心跳包中携带Leader存储的数据进行发送,则对数据进行打包操作
                    for (Datum datum : datums.values()) {
    
                        ObjectNode element = JacksonUtils.createEmptyJsonNode();
                        // 将 key 和对应的版本放入 element 中,最终添加到 array 里
                        if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                            element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
                        } else if (KeyBuilder.matchInstanceListKey(datum.key)) {
                            element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
                        }
                        element.put("timestamp", datum.timestamp.get());
    
                        array.add(element);
                    }
                }
                // 将所有 key 组成的 array 放入数据包
                packet.replace("datums", array);
                // broadcast
                // 将数据包转换成 json 字符串放入 params 中
                Map<String, String> params = new HashMap<String, String>(1);
                params.put("beat", JacksonUtils.toJson(packet));
    
                String content = JacksonUtils.toJson(params);
    
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                // 将参数信息进行 Gzip算法压缩,降低网络消耗
                GZIPOutputStream gzip = new GZIPOutputStream(out);
                gzip.write(content.getBytes(StandardCharsets.UTF_8));
                gzip.close();
    
                byte[] compressedBytes = out.toByteArray();
                String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
    
                if (Loggers.RAFT.isDebugEnabled()) {
                    Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),
                            compressedContent.length());
                }
                // 遍历所有的Follower节点进行发送心跳数据包
                for (final String server : peers.allServersWithoutMySelf()) {
                    try {
                        final String url = buildUrl(server, API_BEAT);
                        if (Loggers.RAFT.isDebugEnabled()) {
                            Loggers.RAFT.debug("send beat to server " + server);
                        }
                        // 采用异步HTTP请求进行心跳数据发送
                        HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {
                            @Override
                            public void onReceive(RestResult<String> result) {
                                if (!result.ok()) {
                                    Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);
                                    MetricsMonitor.getLeaderSendBeatFailedException().increment();
                                    return;
                                }
                                // 成功后接收Follower节点的心跳回复(Follower节点的当前信息)进行节点更新操作
                                peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
                                if (Loggers.RAFT.isDebugEnabled()) {
                                    Loggers.RAFT.debug("receive beat response from: {}", url);
                                }
                            }
    
                            @Override
                            public void onError(Throwable throwable) {
                                Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server,
                                        throwable);
                                MetricsMonitor.getLeaderSendBeatFailedException().increment();
                            }
    
                            @Override
                            public void onCancel() {
    
                            }
                        });
                    } catch (Exception e) {
                        Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
                        MetricsMonitor.getLeaderSendBeatFailedException().increment();
                    }
                }
    
            }
        }
    }
    

    至于心跳接收的回复操作基本就是Follower节点将自己当前的信息进行数据打包发送给Leader节点,同时也会重置当前Leader的任期时间信息,并且根据接收到心跳信息,进行拉取Leader节点的最新数据信息。

    为什么要同时实现CP和AP两套一致性策略模式?

    或许有的人会问,为什么Nacos要同时实现CP以及AP两种数据的一致性策略。其实在一个组件中同时实现两种数据一致性策略,我觉得这样在做服务注册中心选型时,就不必操心AP选什么组件,CP选什么组件,直接采用nacos就好了,同时满足你AP以及CP的数据一致性需求;直接在一个组件中,享受Zookeeper以及Eureka组件的服务,避免了需要同时维护两种不同的组件的运维代价,只需要根据自己的实例需求,选择不同的注册模式即可。

    zookeeper的zab协议与Nacos的Raft协议

    zab一致性算法原理,以zookeeper为例

    zab原子广播协议中两种模式:

    • 1、恢复模式:Leader宕机后选举新Leader
    • 2、广播模式:解决每个节点数据同步问题

    ZK每个节点都有myid(手动配置)和zxid(自动生成,默认为0)
    zab类似paxos,但zab在生成全局zxid时会使用锁保存zxid线程安全

    ZK同步原理

    Leader节点会生成全局zxid,然后通过两段提交协议进行同步

    • 1、第一阶段同步,带上zxid请求每个follower节点是否可以允许同步数据。
    • 2、Leader节点接收到半数以上的节点可以同步,Leader节点就会开始给Follower进行同步数据。

    ZK选举底层实现原理:

    • 1、先检查zxid,谁最大,谁就为Leader,因为zxid越大表示当前节点数据越新。
    • 2、如果zxid都一样的情况,myid最大的为leader。

    备注:ZK的Observer(特殊的follower,只读,只监听同步,不参与投票选举(扩容时不影响本身选举的时候效率,能提高查询性能)

    Raft协议

    在Raft协议算法上角色有跟随者(不竞选领导角色)、竞选者(候选人)、领导角色
    选举票数满足半数以上就会成为领导角色。

    选举过程:

    默认情况下每个节点都为跟随者,每个节点会随机生成一个超时时间,大概100-300ms,超时时间过后,当前节点的状态就会由跟随者变为竞选者,会给其他节点发出选举投票通知,只要竞选者有超过半数以上的票,就成为领导角色,所以节点的超时时间最短,最有可能成为领导角色。

    故障选举过程

    如果某跟随者节点不能及时收到领导角色的消息(心跳检测),那么这个跟随者状态就会变为竞选者状态,给其他节点发出选举投票通知,其他节点确认领导角色挂了,就会进行投票,竞选者超过半数以上即可选举为领导角色。

    Raft采用日志复制形式同步数据

    • 1、所有的写请求都是统一交给领导角色完成,会写入对应的日志,标记该状态为提交状态。
    • 2、领导角色将日志以心跳的形式发送其他的跟随者,只要满足过半的跟随者可以写入数据,则直接通知其他节点同步该数据,这个称为日志复制。

    参考:
    https://www.liaochuntao.cn/2019/06/01/java-web-41/

    https://blog.csdn.net/liyanan21/article/details/89320872

    https://blog.csdn.net/microGP/article/details/114261089

    相关文章

      网友评论

        本文标题:Spring Cloud Alibaba——Nacos 是如何同

        本文链接:https://www.haomeiwen.com/subject/pcocvltx.html