美文网首页Golang
Rpcx源码之路由(Selector)

Rpcx源码之路由(Selector)

作者: 神奇的考拉 | 来源:发表于2019-02-22 17:57 被阅读4次

    一、路由

    一般在大型的微服务系统中,会为同一个服务部署到多个节点, 以便服务能够支持大并发的访问。可能部署在同一个数据中心的多个节点,或者多个数据中心。
    那么,在rpcx来完成service调用时,该如何将求请求交给对应的服务节点来完成,在rpcx中通过 Selector来实现路由选择, 很像一个负载均衡器,来选择出一个合适的节点。
    在rpcx提供了多个路由策略算法,可以在创建XClient来指定。

    注意,在Rpcx的路由是针对 ServicePath 和 ServiceMethod的路由。

    二、路由策略

    1、[Random]随机策略: 从指定的服务节点集合中随机选择一个节点。

    这也是最简单的,可能会导致性能、资源的那个节点的负载较重。主要由于该策略只能保证在大量的请求下路由的比较均匀,并不能保证在很短的时间内负载是均匀的。

    random-client.go源码
    package main
    
    import (
        "context"
        "flag"
        "log"
        "github.com/smallnest/rpcx/client"
        "rpcx/examples/models"
        "time"
    )
    
    var (
        addr1 = flag.String("addr1", "localhost:8972", "server address")
        addr2 = flag.String("addr2", "localhost:8973", "server address")
    )
    
    func main() {
        flag.Parse()
    
        d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
        xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
        defer xclient.Close()
    
        args := &models.Args{
            A: 10,
            B: 20,
        }
    
        for i := 0; i < 10; i++ {
            reply := &models.Reply{}
            err := xclient.Call(context.Background(), "Mul", args, reply)
            if err != nil {
                log.Fatalf("failed to call: %v", err)
            }
    
            log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    
            time.Sleep(time.Second)
        }
    
    }
    
    random-server.go源码
    package main
    
    import (
        "flag"
        "rpcx/examples/models"
        "github.com/smallnest/rpcx/server"
    )
    
    var (
        addr1 = flag.String("addr1", "localhost:8972", "server address")
        addr2 = flag.String("addr2", "localhost:8973", "server address")
    )
    
    func main() {
        flag.Parse()
    
        go func() {
            s := server.NewServer()
            s.RegisterName("Arith", new(models.Arith), "")
            s.Serve("tcp", *addr1)
        }()
    
        go func() {
            s := server.NewServer()
            s.RegisterName("Arith", new(models.PBArith), "")  // 注意由于在本地测试没有使用注册中心 rcvr采用不用的类型 只是都具备Mul方法
            s.Serve("tcp", *addr2)
        }()
    
        select {}
    }
    

    2、[Roundrobin] 轮训策略

    通过轮询的方式来依次调用节点,能保证每个节点都均匀的被访问。该路由策略常用在节点的服务能力都差不多的情况下

    roundrobin-client.go源码
    package main
    
    import (
        "context"
        "flag"
        "log"
        "rpcx/examples/models"
        "time"
    
        "github.com/smallnest/rpcx/client"
    )
    
    var (
        addr1 = flag.String("addr1", "tcp@localhost:8972", "server address")
        addr2 = flag.String("addr2", "tcp@localhost:8973", "server address")
    )
    
    func main() {
        flag.Parse()
    
        d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
        xclient := client.NewXClient("Arith", client.Failtry, client.RoundRobin, d, client.DefaultOption)
        defer xclient.Close()
    
        args := &models.Args{
            A: 10,
            B: 20,
        }
    
        for i := 0; i < 10; i++ {
            reply := &models.Reply{}
            err := xclient.Call(context.Background(), "Mul", args, reply)
            if err != nil {
                log.Fatalf("failed to call: %v", err)
            }
    
            log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    
            time.Sleep(time.Second)
        }
    
    }
    
    roundrobin-server.go源码
    package main
    
    import (
        "flag"
        "rpcx/examples/models"
    
        "github.com/smallnest/rpcx/server"
    )
    
    var (
        addr1 = flag.String("addr1", "localhost:8972", "server address")
        addr2 = flag.String("addr2", "localhost:8973", "server address")
    )
    
    func main() {
        flag.Parse()
    
        go func() {
            s := server.NewServer()
            s.RegisterName("Arith", new(models.Arith), "")
            s.Serve("tcp", *addr1)
        }()
    
        go func() {
            s := server.NewServer()
            s.RegisterName("Arith", new(models.PBArith), "")
            s.Serve("tcp", *addr2)
        }()
    
        select {}
    }
    

    3、[WeightedRoundRobin] 平滑加权轮训策略:使用Nginx采用的[平滑加权的轮询算法]

    例如当前有三个服务节点a、b、c的权重是{ 5, 1, 1 }, 最佳调用顺序是 { a, a, b, a, c, a, a }, 类比像 { c, b, a, a, a, a, a }这样的调用顺序比较来说权重虽一样,但相对调用负载来说前者更好,不至于在一段时间内将请求都发送给a。附上实现代码(仅用参考)

    int matchedIndex = -1; // 代表轮训后选中执行服务的节点
    int total = 0;
    for (int i = 0; i < servers.Length; i++)
    {
          servers[i].cur_weight += servers[i].weight;   //1、每次循环的时候做自增(步长=权重值)
          total += servers[i].weight;                      //2、将每个节点的权重值累加到汇总值中
    
        //3、如果 当前节点的自增数 > 当前待返回节点的自增数,则覆盖。
          if (matchedIndex == -1 || servers[matchedIndex].cur_weight < servers[i].cur_weight)
          {
                matchedIndex = i;
          }
    }
    //3被选取的节点减去2的汇总值,以降低下一次被选举时的初始权重值。
    servers[matchedIndex].cur_weight -= total;
    return servers[matchedIndex];
    
    weighted-client.go源码
    package main
    
    import (
        "context"
        "flag"
        "log"
        "rpcx/examples/models"
        "time"
    
        "github.com/smallnest/rpcx/client"
    )
    
    var (
        addr1 = flag.String("addr1", "localhost:8972", "server address")
        addr2 = flag.String("addr2", "localhost:8973", "server address")
    )
    
    func main() {
        flag.Parse()
    
        d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1, Value: "weight=7"}, 
        {Key: *addr2, Value: "weight=3"}})
        xclient := client.NewXClient("Arith", client.Failtry, client.WeightedRoundRobin, d, client.DefaultOption)
        defer xclient.Close()
    
        args := &models.Args{
            A: 10,
            B: 20,
        }
    
        for i := 0; i < 10; i++ {
            reply := &models.Reply{}
            err := xclient.Call(context.Background(), "Mul", args, reply)
            if err != nil {
                log.Fatalf("failed to call: %v", err)
            }
    
            log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    
            time.Sleep(time.Second)
        }
    
    }
    
    weighted-server.go源码
    package main
    
    import (
        "flag"
        "rpcx/examples/models"
    
        "github.com/smallnest/rpcx/server"
    )
    
    var (
        addr1 = flag.String("addr1", "localhost:8972", "server address")
        addr2 = flag.String("addr2", "localhost:8973", "server address")
    )
    
    func main() {
        flag.Parse()
    
        go func() {
            s := server.NewServer()
            s.RegisterName("Arith", new(models.Arith), "weight=7")
            s.Serve("tcp", *addr1)
        }()
    
        go func() {
            s := server.NewServer()
            s.RegisterName("Arith", new(models.PBArith), "weight=3")
            s.Serve("tcp", *addr2)
        }()
    
        select {}
    }
    

    4、[网络质量优先]

    客户端会基于ping(ICMP)探测各个节点的网络质量,越短的ping时间,对应服务节点的权重也就越高。同时,也会保证网络较差的节点也有被调用的机会。
    例如:假定t是ping的返回时间, 如果超过1秒基本就没有调用机会了:
    weight=191 if t <= 10
    weight=201 -t if 10 < t <=200
    weight=1 if 200 < t < 1000
    weight=0 if t >= 1000

    ping-client.go源码
    package main
    
    import (
        "context"
        "flag"
        "log"
        "rpcx/examples/models"
        "time"
    
        "github.com/smallnest/rpcx/client"
    )
    
    var (
        addr1 = flag.String("addr1", "localhost:8972", "server address")
        addr2 = flag.String("addr2", "baidu.com:8080", "server address")
    )
    
    func main() {
        flag.Parse()
    
        d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
        xclient := client.NewXClient("Arith", client.Failtry, client.WeightedICMP, d, client.DefaultOption)
        defer xclient.Close()
    
        args := &models.Args{
            A: 10,
            B: 20,
        }
    
        for i := 0; i < 10; i++ {
            reply := &models.Reply{}
            err := xclient.Call(context.Background(), "Mul", args, reply)
            if err != nil {
                log.Fatalf("failed to call: %v", err)
            }
    
            log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    
            time.Sleep(time.Second)
        }
    
    }
    
    ping-server.go源码
    package main
    import (
        "flag"
        "rpcx/examples/models"
    
        "github.com/smallnest/rpcx/server"
    )
    
    var (
        addr1 = flag.String("addr1", "localhost:8972", "server address")
        addr2 = flag.String("addr2", "baidu.com:80", "server address")
    )
    
    func main() {
        flag.Parse()
    
        go func() {
            s := server.NewServer()
            s.RegisterName("Arith", new(models.Arith), "weight=7")
            s.Serve("tcp", *addr1)
        }()
    
        go func() {
            s := server.NewServer()
            s.RegisterName("Arith", new(models.PBArith), "weight=3")
            s.Serve("tcp", *addr2)
        }()
    
        select {}
    }
    

    5、[一致性哈希]

    该策略是使用 [JumpConsistentHash]选择节点, 使得相同的servicePath, serviceMethod 和 参数会路由到同一个节点上。
    JumpConsistentHash 是一个快速计算一致性哈希的算法,但是有一个缺陷是它不能删除节点,如果删除节点,路由就不准确了,所以在节点有变动的时候它会重新计算一致性哈希。

    consistent-client.go源码
    待定
    
    consistent-server.go源码
    待定
    

    6、[地理位置优先]

    在实际的使用过程中可能会碰到,希望client端就近获取对应的服务节点来完成服务调用, 例如在同一个服务分别提供了上海和美国硅谷服务中心,若是客户端在北京,那么则希望优先选择上海的机房,而非美国硅谷的。在rpcx中也提供了类似路由策略,不过要求服务在注册的时候要设置它所在的地理经纬度。若两个服务的节点的经纬度是一样的, rpcx会随机选择一个。
    ** 必须使用下面的方法配置client的经纬度信息**:

    func (c *xClient) ConfigGeoSelector(latitude, longitude float64) 
    
    geo-client.go源码
    package main
    
    import (
        "context"
        "flag"
        "log"
        "rpcx/examples/models"
        "time"
    
        "github.com/smallnest/rpcx/client"
    )
    
    var (
        addr1 = flag.String("addr1", "localhost:8972", "server address")
        addr2 = flag.String("addr2", "localhost:8973", "server address")
    )
    
    func main() {
        flag.Parse()
    
        d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1, Value: "latitude=39.9289&longitude=116.3883"},
            {Key: *addr2, Value: "latitude=139.3453&longitude=23.3243"}})
        xclient := client.NewXClient("Arith", client.Failtry, client.ConsistentHash, d, client.DefaultOption)
        defer xclient.Close()
        xclient.ConfigGeoSelector(39.30, 116.40)
    
        args := &models.Args{
            A: 10,
            B: 20,
        }
    
        for i := 0; i < 10; i++ {
            reply := &models.Reply{}
            err := xclient.Call(context.Background(), "Mul", args, reply)
            if err != nil {
                log.Fatalf("failed to call: %v", err)
            }
    
            log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    
            time.Sleep(time.Second)
        }
    
    }
    
    geo-server.go源码
    package main
    
    import (
        "flag"
        "rpcx/examples/models"
    
        "github.com/smallnest/rpcx/server"
    )
    
    var (
        addr1 = flag.String("addr1", "localhost:8972", "server address")
        addr2 = flag.String("addr2", "localhost:8973", "server address")
    )
    
    func main() {
        flag.Parse()
    
        go func() {
            s := server.NewServer()
            s.RegisterName("Arith", new(models.Arith), "weight=7")
            s.Serve("tcp", *addr1)
        }()
    
        go func() {
            s := server.NewServer()
            s.RegisterName("Arith", new(models.PBArith), "weight=3")
            s.Serve("tcp", *addr2)
        }()
    
        select {}
    }
    

    三、自定义路由策略

    在上面内置的路由规则不满足用户的实际需求,则可以参考上面的路由器来实现自己的路由规则;
    1、首先实现Selector接口

    type Selector interface {
        Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string
        UpdateServer(servers map[string]string)
    }
    

    2、设置自定义的Selector
    xclient.SetSelector(&alwaysFirstSelector{})

    3、构建xclient指定自定义的selector策略
    xclient := client.NewXClient("Arith", client.Failtry, client.SelectByUser, d, client.DefaultOption)

    自定义路由策略实现源码:位于user-client.go文件中
    // 自定义selector
    type alwaysFirstSelector struct { // 实现Selector接口
        servers []string
    }
    
    // 路由选择服务节点算法
    func (s *alwaysFirstSelector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string {
        var ss = s.servers
        if len(ss) == 0 {
            return ""
        }
    
        return ss[0]
    }
    
    // 更新新的服务节点算法
    func (s *alwaysFirstSelector) UpdateServer(servers map[string]string) {
        var ss = make([]string, 0, len(servers))
        for k := range servers {
            ss = append(ss, k)
        }
    
        sort.Slice(ss, func(i, j int) bool {
            return strings.Compare(ss[i], ss[j]) <= 0
        })
        s.servers = ss
    }
    
    user-client.go源码
    package main
    
    import (
        "context"
        "flag"
        "log"
        "rpcx/examples/models"
        "sort"
        "strings"
        "time"
    
        "github.com/smallnest/rpcx/client"
    )
    
    var (
        addr1 = flag.String("addr1", "localhost:8972", "server address")
        addr2 = flag.String("addr2", "localhost:8973", "server address")
    )
    
    func main() {
        flag.Parse()
    
        d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
        xclient := client.NewXClient("Arith", client.Failtry, client.SelectByUser, d, client.DefaultOption)
        defer xclient.Close()
    
        xclient.SetSelector(&alwaysFirstSelector{})
    
        args := &models.Args{
            A: 10,
            B: 20,
        }
    
        for i := 0; i < 10; i++ {
            reply := &models.Reply{}
            err := xclient.Call(context.Background(), "Mul", args, reply)
            if err != nil {
                log.Fatalf("failed to call: %v", err)
            }
    
            log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    
            time.Sleep(time.Second)
        }
    
    }
    
    user-server.go源码
    package main
    
    import (
        "flag"
        "rpcx/examples/models"
    
        "github.com/smallnest/rpcx/server"
    )
    
    var (
        addr1 = flag.String("addr1", "localhost:8972", "server address")
        addr2 = flag.String("addr2", "localhost:8973", "server address")
    )
    
    func main() {
        flag.Parse()
    
        go func() {
            s := server.NewServer()
            s.RegisterName("Arith", new(models.Arith), "")
            s.Serve("tcp", *addr1)
        }()
    
        go func() {
            s := server.NewServer()
            s.RegisterName("Arith", new(models.PBArith), "")
            s.Serve("tcp", *addr2)
        }()
    
        select {}
    }
    

    四、其他

    通过增加路由策略能够提高服务对外的性能,保证来支撑更大的业务并发需求。上述所罗列的只是写常用的,也可参考nginx这些软件提供的路由算法。
    select源码

    相关文章

      网友评论

        本文标题:Rpcx源码之路由(Selector)

        本文链接:https://www.haomeiwen.com/subject/hkmeyqtx.html