美文网首页
pd调度源码分析

pd调度源码分析

作者: 白馨_1114 | 来源:发表于2020-04-25 16:33 被阅读0次

    Placement Driver (后续以 PD 简称) 是 TiDB 里面全局中心总控节点,它负责整个集群的调度,负责全局 ID 的生成,以及全局时间戳 TSO 的生成等。PD 还保存着整个集群 TiKV 的元信息,负责给 client 提供路由功能。

    作为中心总控节点,PD 通过集成 etcd ,自动的支持 auto failover,无需担心单点故障问题。同时,PD 也通过 etcd 的 raft,保证了数据的强一致性,不用担心数据丢失的问题。

    在架构上面,PD 所有的数据都是通过 TiKV 主动上报获知的。同时,PD 对整个 TiKV 集群的调度等操作,也只会在 TiKV 发送 heartbeat 命令的结果里面返回相关的命令,让 TiKV 自行去处理,而不是主动去给 TiKV 发命令。这样设计上面就非常简单,我们完全可以认为 PD 是一个无状态的服务(当然,PD 仍然会将一些信息持久化到 etcd),所有的操作都是被动触发,即使 PD 挂掉,新选出的 PD leader 也能立刻对外服务,无需考虑任何之前的中间状态。

    Pd选举:
    pd集成etcd,一个集群至少需要启动3个pd实例服务。
    启动pd集群:

    集群有两种启动方式:

    1.静态方式:
    在配置中设置:
    initial-cluster = "pd=http://127.0.0.1:2380"
    或者启动pd的时候加参数:
    ./pd-server --name="pd"
    --initial-cluster="pd1=http://10.87.109.72:2380,pd2=http://10.87.109.73:2380,pd3=http://10.87.109.74:2380
    静态方式,使用的是ectd peer内部通信端口2380.

    2.动态方式:
    使用 join 参数,将一个新的 PD 服务加入到现有的 PD 集群里面。 如果我们需要添加 pd4,只需要在 --join 参数里面填入当前 PD 集群任意一个 PD 服务的 client url,比如:
    ./bin/pd-server --name=pd4
    --client-urls="http://host4:2379"
    --peer-urls="http://host4:2380"
    --join="http://host1:2379"

    动态方式使用的是etcd 自身提供的 member 相关 API,所以使用 2379 端口。
    以上这两种方式是互斥的,只能使用一种方式初始化集群(不知道原因,但混合操作可能会报错)。

    启动单个pd代码:

    func main() {
       cfg := server.NewConfig()//加载配置
       ......      
       err = server.PrepareJoinCluster(cfg)//初始化上文集群
       ......
       if err := svr.Run(ctx); err != nil { //主程序具体实现见下文
          log.Fatalf("run server failed: %v", fmt.Sprintf("%+v", err))
       }
    ......
    }
    
    func (s *Server) Run(ctx context.Context) error {
       timeMonitorOnce.Do(func() {
          go StartMonitor(time.Now, func() {
             log.Errorf("system time jumps backward")
             timeJumpBackCounter.Inc()
          })
       })
    
       if err := s.startEtcd(ctx); err != nil {
          return err
       }
    
       if err := s.startServer(); err != nil {
          return err
       }
    
       s.startServerLoop()
    
       return nil
    }
    

    一、StartMonitor
    启动一个协程监控系统时间,有没有jumps backward
    UnixNano()在高并发环境下,在高并发情况下并不安全,所以将jumps backward当作错误日志记录下来。
    (代码中,只是记录下来,没有做多余操作,推测应该跟 raft时间有关)
    https://blog.csdn.net/fwhezfwhez/article/details/81069249

    func StartMonitor(now func() time.Time, systimeErrHandler func()) {
       log.Info("start system time monitor")
       tick := time.NewTicker(100 * time.Millisecond)
       defer tick.Stop()
       for {
          last := now().UnixNano()
          <-tick.C //阻塞100ms,在比较
          if now().UnixNano() < last {
             log.Errorf("syst em time jump backward, last:%v", last)
             systimeErrHandler()
          }
       }
    }
    

    二、startEtcd
    根据加载的配置,启动etcd服务(etcdCfg详细配置可以参见Server.embed.Config说明)

    func (s *Server) startEtcd(ctx context.Context) error {
       ctx, cancel := context.WithTimeout(ctx, etcdStartTimeout) //记录启动初始时间
       etcd, err := embed.StartEtcd(s.etcdCfg) //StartEtcd launches the etcd server and HTTP handlers for client/server communication.
       // Check cluster ID
       urlmap, err := types.NewURLsMap(s.cfg.InitialCluster) //parse --initial-cluster
       tlsConfig, err := s.cfg.Security.ToTLSConfig() //tls初始化
    
       if err = etcdutil.CheckClusterID(etcd.Server.Cluster().ID(), urlmap, tlsConfig); err != nil {
          return err
       } //checks Etcd's cluster ID
    
       client, err := clientv3.New(clientv3.Config{ //启动客户端,提供对etcd操作,比如member命令
          Endpoints:   endpoints,
          DialTimeout: etcdTimeout,
          TLS:         tlsConfig,
       })
       // update advertise peer urls.
       etcdMembers, err := etcdutil.ListEtcdMembers(client)
       …..
    }
    

    三、startServerLoop

    func (s *Server) startServerLoop() {
        s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(context.Background())
        s.serverLoopWg.Add(3)
        go s.leaderLoop() //选举pd leader
        go s.etcdLeaderLoop()
        go s.serverMetricsLoop()
    }
    

    pd Leader 的选举在leaderloop中实现,不同于raft选举:
    保存更新tso

    pd与tikv交互:
    handleStoreHeartbeat
    handleAskSplit

    pd与tidb交互:

    pd调度策略:
    pd调度策略主要是针对tikv的。
    如何通过调度策略影响tikv?

    1.pd 选举。如果是leader掉线,会重新选举。这个选举时间大概是3s,而且会断开客户端服务?
    2.tikv于pd的交互:
    每个tikv会定期向pd汇报节点整体信息(根据这个汇报的信息,比如每个region的大小不能超过64m,一旦超过64m。辣么就要分裂成两个group raft)
    每个raft group的leader会定期想pd汇报信息
    然后pd 不断的通过这两种信息,之后做决策。主动下线,是会马上通知pd 不可用。但是某个store心跳包不可用的时候,其实并不能判断到底是什么情况。
    这个默认是等待30分整,确实是相当的长。
    Pd通过某个 leader的心跳包发现replica不够,可以add或者remove
    保证多个replica不在同一个位置上,而不是同一个节点上。这个位置是一个逻辑的概念,通过location-lables来标志哪些lable是位置标志。???这个是怎么标示的?
    Pd不断通过store或者leader的心跳包收集信息。获取整个集群的纤细数据,
    然后根据这些详细数据生成操作调度序列。
    tikv收到这些信息,如何执行的?在哪些情况下,可能不执行。
    3.pd中增加replica ,删除replica ,重现选举replica如何实现的。

    相关文章

      网友评论

          本文标题:pd调度源码分析

          本文链接:https://www.haomeiwen.com/subject/aeljwhtx.html