在 Rust 中使用 Raft

作者: siddontang | 来源:发表于2018-03-25 21:15 被阅读306次

    自从 Raft 一致性算法提出以来,越来越多的分布式应用开始基于 Raft 来构造自己的高可用服务,包括我们的分布式一致性性 Key-Value 数据库:TiKV

    在最开始开发 TiKV 的时候,我们对世面上的 Raft 实现进行了研究以及选型,最终决定基于 etcd 的 Raft 来实现。虽然 etcd 是用 Go 写的,但它的 Raft 模块并没有使用 Go 的特性,所以我们能很方便的使用 Rust 进行移植,并在 TiKV 中使用。

    自从 2017 年 TiKV 开源依赖,TiKV 的 Raft 模块一直持续稳定运行,经受住了用户生产环境的考验。所以我们决定将 TiKV 的 Raft 模块剥离出来,对外提供一个独立的 library - raft-rs,能让 Rust 社区使用它来构建自己的分布式一致性服务。后面,我将会简单的介绍下 raft-rs 以及它的使用方式,方便大家后续开发。

    设计

    在详细介绍如何使用 raft-rs 之前,我们先简单的介绍一下 Raft。Raft 的模型很简单,就是一个基于 Log 复制的状态机模型。只要我们能保证所有机器上面的 Log 序列是一致的,那么就能保证将这些 Log 序列的数据依次应用到状态机之后,各个机器上面的状态机是一致的。

    Replicated State Machines

    上图就是 Raft 论文里面提到的一个典型的状态机模型,我们可以看到几个关键的部分:

    1. Consensus Module,最核心的一致性算法模块,用来保证数据 Log 的一致性
    2. Log,保存 Raft Log 的地方
    3. State Machine,状态机,应用 Raft Log 之后实际写入数据的地方
    4. Transport,也就是网络传输层,Raft 是需要在各个节点之间进行网络交互的

    对于 raft-rs 来说,它只是提供了最核心的 Consensus Module,而其他的组件,包括 Log,State Machine,Transport,都是需要外面自己去定制实现的。这个设计的好处在于足够简单,只聚焦于最核心的一致性算法实现,大家可以用这个模块来灵活的去拼接起自己的服务。

    使用

    上面说到,raft-rs 只是实现了 Raft Consensus Module,如果我们要构建自己的分布式一致性服务,还需要做很多额外的工作,下面就来简单的介绍一下。

    Raft Storage

    首先,我们要实现一个 Raft Storage,Storage 是一个 trait,定义在 storage.rs 里面,我们需要实现相关的 interface,这里详细介绍一下。

    initial_state,当我们的 Raft 模块初始化的时候,会调用 initial_state 这个函数,这个函数会返回一个 RaftState,里面包含了 HardStateConfStateConfState 记录的是当前集群的节点列表,我们要求每一个节点都必须有一个唯一的 ID。HardState 则包含了该节点上一次的 Raft 情况,包括 commit index,上一次在哪次 term 给哪一个节点 vote 了。

    然后是 entries 函数,是返回某一段 index 范围里面的 Log entries,而 term 则是返回某个 Log 上面的 term,first_index 返回的是第一个 Log 的 index,last_index 返回的是最后一个 Log 的 index。

    这里需要在先说明下 first_index,对于一个完全空的节点来说,这时候 Log 是没有的,那么 last_index 会返回 0,而 first_index 我们通常会返回 1,也就是我们会有一个 dummy Log entry。这个主要是为了计算方便,在 term 那边,我们需要得到 first_index() - 1 的 term。

    最后一个是 snapshot,它会得到当前状态机的 Snapshot,Snapshot 里面有一个 data 字段就是保存当前状态机的所有数据。

    配置

    当我们创建好一个 Raft Storage 之后,下面就是创建一个 Raft 节点。创建 Raft 节点的时候需要传入一个配置,我们主要关心的配置如下:

    • id:节点的唯一 ID,这个需要我们外面自己去保证不会重复。
    • election_tick:多少次 tick 之后如果 Follower 还没收到 Leader 的消息,就开始重新选举。
    • heartbeat_tick:多少次 tick 之后 Leader 给 Follower 发一个心跳保活下。
    • applied:上一次状态机应用到哪一个 Log 上面了,Raft 会从这个 index 之后,返回已经被提交的 Log 让外面去继续应用。
    • max_size_per_msg:一次消息最多发送的数据大小,Raft 内部做了 batch 和 pipeline,我们一次可能会发多个 Log entries。
    • max_inflight_msgs:这个主要是针对 pipeline 的优化,Leader 可以一直给 follower 发消息,而不需要等 Follower ACK。

    至于 check_quorumpre_vote,主要是为了防止集群被干扰做的优化,我们后续会详细的说明。而 read_only_option 则是让用户选择是否进行完全线性一致的安全读,还是基于租约的非安全读。

    驱动 Raft

    当我们创建好一个 Raft node 之后,下面的就是要驱动这个 Raft 运行了。首先,我们需要有一个定期的 timer 去驱动 Raft,也就是定期调用 Raft 的 tick 函数,类似如下:

    let mut t = Instant::now();
    let d = Duration::from_millis(100);
    
    loop {
        match receiver.recv_timeout(d) {
            Ok(...) => (),
            Err(RecvTimeoutError::Timeout) => (),
            Err(RecvTimeoutError::Disconnected) => return,
        }
    
        if t.elapsed() >= d {
            t = Instant::now();
            // We drive Raft every 100ms.
            r.tick();
        }
    
    }
    

    上面我们创建了一个 channel,用 channel Receiver 的 recv_timeout 来定期 100ms 去驱动 Raft 执行。

    另一个驱动 Raft 的入口函数就是 propose,当外面给给 Raft 服务发送命令之后,我们需要显示的调用 propose 进行驱动。这里需要注意,通常客户端给 Raft 发送命令之后,Raft 这边要先将其追加到 Log 上面,然后复制给其他节点,当这个 Log 被提交了之后,才会应用到状态机上面得到结果,返回给客户端。这是一个异步的流程,所以我们需要将客户端的回调函数跟 Log 关联起来,保证 Log 在应用之后对应的回调能调用。

    一个比较简单的做法就是使用一个 HashMap,每个客户端的请求我们用一个唯一的 ID 来标识,用 ID 来关联对应的回调函数。然后 Log 被应用的时候,我们通过解开 Log 拿到实际的请求,从请求里面通过 ID 找到对应的回调函数,进行调用。

    还有一个驱动 Raft 的入口函数就是 step,让其他节点给当前节点发送 Raft message 的时候,我们会直接调用 step 这个函数让 Raft 模块去处理相关的 Raft 消息。具体到上面的例子,我们可以在 channel 里面发送 propose 和 Raft message,Receiver 收到之后,进行处理,如下:

    let mut cbs = HashMap::new();
    loop {
        match receiver.recv_timeout(d) {
            Ok(Msg::Propose { id, callback }) => {
                cbs.insert(id, callback);
                r.propose(vec![id], false).unwrap();
            }
            Ok(Msg::Raft(m)) => r.step(m).unwrap(),
            ......
        }
    ......
    }
    

    处理 Raft Ready

    上面我们说了驱动 Raft,当 Raft 发现我们可以准备好做一些后续事情之后,就会给我们返回 ready 状态,我们就可以调用 ready 得到一些信息做后续的处理。一个 Raft Ready 里面包含很多信息,我们需要依次处理。

    1. 判断 snapshot 是不是空的,如果不是,那么表明当前节点收到了一个 Snapshot,我们需要去应用这个 snapshot。
    2. 判断 entries 是不是空的,如果不是,表明现在有新增的 entries,我们需要将其追加到 Raft Log 上面。
    3. 判断 hs 是不是空的,如果不是,表明该节点 HardState 状态变更了,可能是重新给一个新的节点 vote,也可能是 commit index 变了,但无论怎样,我们都需要将变更的 HardState 持续化。
    4. 判断是否有 messages,如果有,表明需要给其他 Raft 节点发送消息,具体怎么发,发到哪里,由外面实现者来保证。这里其实有一个优化,如果我们能知道节点是 Leader,那么这一步可以放到第 1 步前面。
    5. 判断 committed_entries 是不是空的,如果不是,表明有新的 Log 已经被提交,我们需要去应用这些 Log 到状态机上面了。当然,在应用的时候,也需要保存 apply index。
    6. 调用 advance 函数,开启下一次的 Ready 处理。

    小结

    虽然我上面扯了很多,但实际基于 raft-rs 来写一个分布式一致性服务还是比较容易的,只要严格的按照上面的步骤来就可以构造一个简单的 demo 出来。当然,要做到性能好,还有很多工作要做,譬如异步落盘这些,后面我们会基于 TiKV 来分析相关的优化。

    如果你对 Raft 和 Rust 都很感兴趣,你可以尝试构造一个 example 并给我们提交 PR。如果你想更深入的开发一个分布式数据库,欢迎联系我 tl@pingcap.com

    相关文章

      网友评论

        本文标题:在 Rust 中使用 Raft

        本文链接:https://www.haomeiwen.com/subject/cwpkcftx.html