美文网首页推荐系统从0到1
「推荐系统从0到1」服务发现

「推荐系统从0到1」服务发现

作者: 寒寒落弦 | 来源:发表于2020-11-14 13:16 被阅读0次
    奥利给!

    前沿

    首先谈谈我对推荐系统的引擎和算法的理解。

    现在市面上讲起推荐系统,大多都是讲各种算法,讲的天花乱坠,高深莫测,其实很多算法都是大同小异,核心思想是差不多的,只不过实现手段略有差异。而在工业上,各种复杂算法能够落地的,我认为不多,大部分的厂商,运用的算法都是很集中的那一部分算法。

    一套好的推荐系统,对于引擎是非常依赖的,实验显示,响应时长与各项指标之间都是有直接关联的,响应时长越长,指标越低。

    作为一个朴实的推荐码农,我还是想从基础做起,朴朴实实,脚踏实地,先把引擎部分做好。当然,算法后面也会有,毕竟引擎和算法缺一不可。

    那么,废话少说,推荐引擎,搞起吧!

    服务发现

    既然是搞引擎,也就是后端,当然要先把架构先搭建起来。

    后端服务,微服务已经成为了当前的主流,具有非常多的优点,比如高内聚,单独部署,各自负载均衡等,当然缺点也有,通信更复杂了等。具体就不在这里展开了,有兴趣的兄弟们可以百度,google一下。

    而微服务之间的通信中,客户端如何确定服务端的地址,就需要服务发现了。

    在整个流程中,可以分为服务端的要做的,以及客户端要做的,下面依次来看一下。

    服务端比较简单,只需要将自己的信息存储到某个存储中。

    客户端呢,首先要从存储中拿到服务端信息列表,然后根据一些负载均衡的原则,选择一个地址,最终来调用。

    是不是原理上非常简单!那么进入实操吧!

    etcd介绍

    以前有zookeeper,而zookeeper可以看到,早就不再维护更新了。

    而etcd,用go语言开发,因kubernetes而闻名,在kubernetes中,使用etcd作为分布式存储获取分布式锁。

    所以我们当然用更年轻,更轻量,并且也非常稳定的etcd搞了!就是这么喜新厌旧= =

    etcd使用raft算法实现的一致性,至于raft算法,可以看下面这个动画演示,很完美生动。

    raft动画演示

    etcd实战

    我这边用docker来做自己的测试环境,上我的docker-compose.yaml

    version: '2.2'
    services:
      etcd:
        image: gcr.io/etcd-development/etcd:v3.4.13
        container_name: etcd
        restart: always
        ports:
          - 2379:2379
          - 2380:2380
        command:
          - "/usr/local/bin/etcd"
          - "--name"
          - "s1"
          - "--data-dir"
          - "/etcd-data"
          - "--advertise-client-urls"
          - "http://0.0.0.0:2379"
          - "--listen-client-urls"
          - "http://0.0.0.0:2379"
          - "--initial-advertise-peer-urls"
          - "http://0.0.0.0:2380"
          - "--listen-peer-urls"
          - "http://0.0.0.0:2380"
          - "--initial-cluster-token"
          - "tkn"
          - "--initial-cluster"
          - "s1=http://0.0.0.0:2380"
          - "--initial-cluster-state"
          - "new"
    

    如果想通过其他途径安装可以看官方的说明:

    安装etcd

    那么,既然是存储,我们就来测试一下CRUD吧,还有etcd的租约功能。

    CRUD:

    # etcdctl put test/key hello
    OK
    # etcdctl get test/key
    test/key
    hello
    # etcdctl put test/key goodbye
    OK
    # etcdctl get test/key
    test/key
    goodbye
    # etcdctl del test/key
    1
    # etcdctl get test/key
    

    租约:

    创建租约,120s过期

    # etcdctl lease grant 120
    lease 3f3575c45fa5ff26 granted with TTL(120s)
    

    查看租约列表

    # etcdctl lease list
    found 1 leases
    3f3575c45fa5ff26
    

    新建kv,并绑定租约

    # etcdctl put test/key hello --lease="3f3575c45fa5ff26"
    OK
    

    查看租约下的key剩余时间

    # etcdctl lease timetolive 3f3575c45fa5ff26 --keys
    lease 3f3575c45fa5ff26 granted with TTL(120s), remaining(46s), attached keys([test/key])
    

    查看还存在的key

    # etcdctl get --prefix ""
    test/key
    hello
    

    等租约过期后,查看key,key已被自动删除

    # etcdctl lease timetolive 3f3575c45fa5ff26 --keys
    lease 3f3575c45fa5ff26 already expired
    # etcdctl get --prefix ""
    

    租约续约:

    同样创建租约,绑定kv

    # etcdctl lease grant 30
    lease 3f3575c45fa5ff2c granted with TTL(30s)
    # etcdctl put test/key hello --lease="3f3575c45fa5ff2c"
    OK
    

    续约

    # etcdctl lease keep-alive 3f3575c45fa5ff2c
    lease 3f3575c45fa5ff2c keepalived with TTL(30)
    lease 3f3575c45fa5ff2c keepalived with TTL(30)
    lease 3f3575c45fa5ff2c keepalived with TTL(30)
    lease 3f3575c45fa5ff2c keepalived with TTL(30)
    

    打开个新窗口查看租约与key

    # etcdctl lease timetolive 3f3575c45fa5ff2c --keys
    lease 3f3575c45fa5ff2c granted with TTL(30s), remaining(23s), attached keys([test/key])
    # etcdctl get --prefix ""
    test/key
    hello
    

    发现并没有过期。

    golang+grpc+etcd 服务发现终极实战!

    先上github仓库:github仓库

    代码目录/go_server/src/lib/discovery/

    说一下整个流程:

    服务端向etcd注册服务,就是将本服务的信息写进etcd。

    客户端大体流程:

    1. 从etcd取服务端地址列表,并watch列表变化,并更新。
    2. 把地址列表写进grpc resolver的resolver.ClientConn的地址列表中。
    3. grpc创建连接,根据负载均衡请求。

    整个模块分为7个文件:

    • config.go,配置文件。
    • discovery.go,用于初始化。
    • register.go,用于服务注册。
    • resolver.go,用于解析etcd里注册的服务地址,以及grpc负载均衡。
    • util.go,公共方法。
    • wrapper.go,对外部提供的调用封装。
    • ctx.go,context,设置超时时间。

    config.go

    package config
    
    import "time"
    
    // etcd
    const (
        Timeout        = 15 * time.Second
        Expires        = 10
        TickerInterval = 5
        // scheme
        Scheme = "etcd"
        // etcd中存储key的格式前缀:/scheme/authority/endpoint
        DirFormat = "/%s/%s/%s/"
        // grpc resolver中自定义解析需要提供的格式:scheme://authority/endpoint
        // 其中scheme可以理解为解析策略,authority可以理解为权限管理,endpoint为地址
        TargetFormat = "%s://%s/%s"
    )
    
    // server name
    const (
        GreetServer = "greet_server"
    )
    

    discovery.go

    package discovery
    
    import (
        "fmt"
        "go_server/src/lib/discovery/config"
        "go_server/src/lib/logger"
        "strings"
    
        "go.etcd.io/etcd/clientv3"
    )
    
    var (
        client *clientv3.Client
    )
    
    // Init 初始化etcd
    func Init(etcdAddr string) error {
        var err error
        if client == nil {
            //构建etcd client
            client, err = clientv3.New(clientv3.Config{
                Endpoints:   strings.Split(etcdAddr, ";"),
                DialTimeout: config.Timeout,
            })
            if err != nil {
                logger.Error("连接etcd失败:%s\n", err)
                fmt.Printf("连接etcd失败:%s\n", err)
                return err
            }
        }
        return nil
    }
    

    register.go

    package discovery
    
    import (
        "context"
        "errors"
        "fmt"
        "go_server/src/lib/discovery/config"
        "os"
        "os/signal"
        "syscall"
        "time"
    
        "go.etcd.io/etcd/clientv3"
    )
    
    //Service 服务端用于服务注册的对象
    type Service struct {
        Name string //服务名称
        Host string //{ip}:{port}
        Env  string //所属环境
    
        Key string //保存在etcd中的key
    }
    
    var service *Service
    
    func (s *Service) register() error {
        if s.Env == "" {
            return errors.New("env is null")
        }
        s.Key = fmt.Sprintf(config.DirFormat, config.Scheme, s.Env, s.Name) + s.Host
        ticker := time.NewTicker(time.Second * time.Duration(config.TickerInterval))
        go func() {
            for {
                resp, err := client.Get(context.Background(), s.Key)
                if err != nil {
                    fmt.Printf("获取服务地址失败:%s", err)
                } else if resp.Count == 0 { //尚未注册
                    err = s.keepAlive()
                    if err != nil {
                        fmt.Printf("保持连接失败:%s", err)
                    }
                }
                <-ticker.C
            }
        }()
        return nil
    }
    
    // keepAlive 创建租约,绑定,并续期
    func (s *Service) keepAlive() error {
        //创建租约
        leaseResp, err := client.Grant(context.Background(), config.Expires)
        if err != nil {
            fmt.Printf("创建租期失败:%s\n", err)
            return err
        }
    
        //将服务地址注册到etcd中
        _, err = client.Put(context.Background(), s.Key, s.Host, clientv3.WithLease(leaseResp.ID))
        if err != nil {
            fmt.Printf("注册服务失败:%s", err)
            return err
        }
    
        //租约续期
        ch, err := client.KeepAlive(context.Background(), leaseResp.ID)
        if err != nil {
            fmt.Printf("租约续期失败:%s\n", err)
            return err
        }
    
        //清空keepAlive返回的channel
        go func() {
            for {
                <-ch
            }
        }()
        return nil
    }
    
    //取消注册
    func (s *Service) unRegister() {
        if client != nil {
            _, _ = client.Delete(context.Background(), s.Key)
        }
    }
    
    func WaitForClose() {
        ch := make(chan os.Signal, 1)
        signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
        sig := <-ch
        service.unRegister()
        if i, ok := sig.(syscall.Signal); ok {
            os.Exit(int(i))
        } else {
            os.Exit(0)
        }
    }
    

    resolver.go

    package discovery
    
    import (
        "context"
        "fmt"
        "go_server/src/lib/discovery/config"
        "strings"
    
        "go.etcd.io/etcd/clientv3"
        "google.golang.org/grpc"
        "google.golang.org/grpc/resolver"
    )
    
    //EtcdResolver解析器
    type EtcdResolver struct {
        dir        string
        clientConn resolver.ClientConn
    }
    
    func Resolver(env string, name string) *grpc.ClientConn {
        //注册etcd解析器
        r := &EtcdResolver{}
        resolver.Register(r)
        target := fmt.Sprintf(config.TargetFormat, r.Scheme(), env, name)
        //客户端连接服务器(负载均衡:轮询) 会同步调用r.Build()
        dailOpts := []grpc.DialOption{
            grpc.WithBalancerName("round_robin"), // grpc内部提供的轮询负载均衡
            grpc.WithInsecure(),
            grpc.WithDefaultCallOptions(
                grpc.MaxCallRecvMsgSize(1024 * 1024 * 16),
            ),
        }
        conn, err := grpc.Dial(target, dailOpts...)
        if err != nil {
            fmt.Println("连接服务器失败:", err)
        }
        return conn
    }
    
    func (r *EtcdResolver) Scheme() string {
        return config.Scheme
    }
    
    //构建解析器 grpc.Dial()同步调用
    func (r *EtcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
        r.clientConn = clientConn
        r.dir = fmt.Sprintf(config.DirFormat, target.Scheme, target.Authority, target.Endpoint)
        go r.watch()
        return r, nil
    }
    
    //监听etcd中某个key前缀的服务地址列表的变化
    func (r *EtcdResolver) watch() {
        //初始化服务地址列表
        var addrList []resolver.Address
    
        resp, err := client.Get(context.Background(), r.dir, clientv3.WithPrefix())
        if err != nil {
            fmt.Println("获取服务地址列表失败:", err)
        } else {
            for i := range resp.Kvs {
                fmt.Println(strings.TrimPrefix(string(resp.Kvs[i].Key), r.dir))
                addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), r.dir)})
            }
        }
    
        r.clientConn.NewAddress(addrList)
    
        //监听服务地址列表的变化
        rch := client.Watch(context.Background(), r.dir, clientv3.WithPrefix())
        for n := range rch {
            for _, ev := range n.Events {
                addr := strings.TrimPrefix(string(ev.Kv.Key), r.dir)
                switch ev.Type {
                case clientv3.EventTypePut:
                    if !exists(addrList, addr) {
                        addrList = append(addrList, resolver.Address{Addr: addr})
                        r.clientConn.NewAddress(addrList)
                    }
                case clientv3.EventTypeDelete:
                    if s, ok := remove(addrList, addr); ok {
                        addrList = s
                        r.clientConn.NewAddress(addrList)
                    }
                }
            }
        }
    }
    
    func exists(l []resolver.Address, addr string) bool {
        for i := range l {
            if l[i].Addr == addr {
                return true
            }
        }
        return false
    }
    
    func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
        for i := range s {
            if s[i].Addr == addr {
                s[i] = s[len(s)-1]
                return s[:len(s)-1], true
            }
        }
        return nil, false
    }
    
    //Close ...
    func (r *EtcdResolver) Close() {}
    
    //ResolveNow ...
    func (r *EtcdResolver) ResolveNow(_ resolver.ResolveNowOption) {}
    

    util.go

    package discovery
    
    import (
        "fmt"
        "net"
    )
    
    // 获取本机ip地址
    func getIntranetIP() (ip string) {
        if addrs, err := net.InterfaceAddrs(); err == nil {
            for _, address := range addrs {
                if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
                    if ipnet.IP.To4() != nil {
                        ip = ipnet.IP.String()
                        break
                    }
                }
            }
        }
        return
    }
    
    // 自动获取本机的ip以及端口号,ip:port格式
    func getListener() (listener net.Listener, host string, err error) {
        host = "0.0.0.0:0"
        listener, err = net.Listen("tcp", host)
        if err == nil {
            addr := listener.Addr().String()
            _, portString, _ := net.SplitHostPort(addr)
            host = fmt.Sprintf("%s:%s", getIntranetIP(), portString)
        }
        return
    }
    

    wrapper.go

    package discovery
    
    import (
        "fmt"
        "go_server/src/lib/discovery/config"
        "go_server/src/lib/proto/greet"
    
        "google.golang.org/grpc"
    )
    
    func GreetRegister(env string, server greet.GreetServer) error {
        listener, host, err := getListener()
        if err != nil {
            fmt.Println("监听网络失败:", err)
            return err
        }
        fmt.Println("host:", host)
        srv := grpc.NewServer()
        go srv.Serve(listener)
        greet.RegisterGreetServer(srv, server)
        service = &Service{Name: config.GreetServer, Host: host, Env: env}
        err = service.register()
        if err != nil {
            fmt.Println(err)
            return err
        }
        return nil
    }
    
    func GreetResolve(env string) greet.GreetClient {
        return greet.NewGreetClient(Resolver(env, config.GreetServer))
    }
    

    ctx.go

    package discovery
    
    import (
        "context"
        "time"
    )
    
    // 1s超时
    func Context1s() (ctx context.Context, cancel context.CancelFunc) {
        return context.WithTimeout(context.TODO(), time.Second)
    }
    

    测试一下吧,测试文件也都在github仓库里:

    搞个测试的proto,server和client,也直接上代码:

    greet.proto

    syntax = "proto3";
    
    
    option go_package = "src/lib/proto/greet";
    
    service Greet {
      rpc Morning(GreetRequest)returns(GreetResponse){}
      rpc Night(GreetRequest)returns(GreetResponse){}
    }
    
    message GreetRequest {
      string name = 1;
    }
    
    message GreetResponse {
      string message = 1;
      string from = 2;
    }
    

    server main.go

    package main
    
    import (
        "context"
        "flag"
        "fmt"
        "go_server/src/lib/discovery"
        proto "go_server/src/lib/proto/greet"
    )
    
    var (
        Flag     = flag.String("flag", "a", "flag")
        EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address")
        Env      = flag.String("Env", "test", "env")
    )
    
    //rpc服务接口
    type GreetServer struct{}
    
    func (gs *GreetServer) Morning(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
        fmt.Printf("Morning 调用: %s\n", req.Name)
        return &proto.GreetResponse{
            Message: "Good morning, " + req.Name,
            From:    *Flag,
        }, nil
    }
    
    func (gs *GreetServer) Night(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
        fmt.Printf("Night 调用: %s\n", req.Name)
        return &proto.GreetResponse{
            Message: "Good night, " + req.Name,
            From:    *Flag,
        }, nil
    }
    
    func main() {
        flag.Parse()
        err := discovery.Init(*EtcdAddr)
        if err != nil {
            fmt.Println(err)
            return
        }
        err = discovery.GreetRegister(*Env, &GreetServer{})
        if err != nil {
            fmt.Println(err)
            return
        }
        discovery.WaitForClose()
    }
    

    client main.go

    package main
    
    import (
        "flag"
        "fmt"
        "go_server/src/lib/discovery"
        proto "go_server/src/lib/proto/greet"
        "time"
    )
    
    var (
        EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address")
        Env      = flag.String("Env", "test", "env")
    )
    
    func main() {
        flag.Parse()
        err := discovery.Init(*EtcdAddr)
        if err != nil {
            fmt.Println(err)
            return
        }
        c := discovery.GreetResolve(*Env)
        ticker := time.NewTicker(1 * time.Second)
        for range ticker.C {
            fmt.Println("Morning 调用...")
            ctx, cancel := discovery.Context1s()
            resp1, err := c.Morning(
                ctx,
                &proto.GreetRequest{Name: "Jinfeng"},
            )
            cancel()
            if err != nil {
                fmt.Println("Morning调用失败:", err)
                return
            }
            fmt.Printf("Morning 响应:%s,来自:%s\n", resp1.Message, resp1.From)
    
            fmt.Println("Night 调用...")
            ctx, cancel = discovery.Context1s()
            resp2, err := c.Night(
                ctx,
                &proto.GreetRequest{Name: "Jinfeng"},
            )
            cancel()
            if err != nil {
                fmt.Println("Night调用失败:", err)
                return
            }
            fmt.Printf("Night 响应:%s,来自:%s\n", resp2.Message, resp2.From)
        }
    }
    

    跑起来吧,起3个server,可以看到,在etcd已经注册了3台服务。

    # etcdctl get --prefix ""
    /etcd/test/greet_server/192.168.31.71:52963
    192.168.31.71:52963
    /etcd/test/greet_server/192.168.31.71:52969
    192.168.31.71:52969
    /etcd/test/greet_server/192.168.31.71:52973
    192.168.31.71:52973
    

    client调用

    ➜  client git:(main) ✗ go run .
    192.168.31.71:52963
    192.168.31.71:52969
    192.168.31.71:52973
    Morning 调用...
    Morning 响应:Good morning, Jinfeng,来自:c
    Night 调用...
    Night 响应:Good night, Jinfeng,来自:a
    Morning 调用...
    Morning 响应:Good morning, Jinfeng,来自:b
    Night 调用...
    Night 响应:Good night, Jinfeng,来自:c
    Morning 调用...
    Morning 响应:Good morning, Jinfeng,来自:a
    Night 调用...
    Night 响应:Good night, Jinfeng,来自:b
    Morning 调用...
    Morning 响应:Good morning, Jinfeng,来自:c
    

    shutdown一台服务

    Morning 响应:Good morning, Jinfeng,来自:a
    Night 调用...
    Night 响应:Good night, Jinfeng,来自:b
    Morning 调用...
    Morning 响应:Good morning, Jinfeng,来自:a
    Night 调用...
    Night 响应:Good night, Jinfeng,来自:b
    Morning 调用...
    Morning 响应:Good morning, Jinfeng,来自:a
    Night 调用...
    Night 响应:Good night, Jinfeng,来自:b
    

    重新启动

    Morning 响应:Good morning, Jinfeng,来自:a
    Night 调用...
    Night 响应:Good night, Jinfeng,来自:b
    Morning 调用...
    Morning 响应:Good morning, Jinfeng,来自:c
    Night 调用...
    Night 响应:Good night, Jinfeng,来自:a
    Morning 调用...
    Morning 响应:Good morning, Jinfeng,来自:b
    Night 调用...
    Night 响应:Good night, Jinfeng,来自:c
    Morning 调用...
    Morning 响应:Good morning, Jinfeng,来自:a
    Night 调用...
    Night 响应:Good night, Jinfeng,来自:b
    

    这一轮,只是用grpc内部简单的轮训来做负载均衡,后面有空了,再加入一致性哈希等方法吧!

    到现在,服务发现已经有了,下面就可以先做一个简单的推荐系统,把流程跑起来了!

    后面计划先做一个只有简单召回的推荐系统,然后再慢慢优化整套系统。

    兄弟们,奥利给!


    原文链接

    相关文章

      网友评论

        本文标题:「推荐系统从0到1」服务发现

        本文链接:https://www.haomeiwen.com/subject/swtwbktx.html