go micro 源码阅读-Broker

作者: inclee | 来源:发表于2018-05-18 17:30 被阅读14次

作用

从前面的博文可以看出Broker是Service异步通信的基础功能组件。那么好奇的是Broker的代码逻辑到底是怎么样的,如何提供异步通信呢?

整体代码逻辑

type Broker interface {
    Options() Options
    Address() string
    Connect() error ///启动broker服务
    Disconnect() error ///关闭Broker服务
    Init(...Option) error
    Publish(string, *Message, ...PublishOption) error  ///publish topic message
    Subscribe(string, Handler, ...SubscribeOption) (Subscriber, error)  ///注册 topic message 的 subscribe
    String() string
}

从Broker的接口可以看出Broker基于替丁topic 的Pub/Sub的方式提供异步通信。

通过调用Connect 开启Broker

通过Subsribe 注册对某个topic的监听

通过Publish 发布某个topic的消息

通过调用Disconnect关闭Broker

代码解析

创建

func NewBroker(opts ...Option) Broker {
    return newHttpBroker(opts...)
}

通过NewBroker调用newHTTPBroker返回时间Broker接口的httpBroker实例。[关于Option的处理,请参考]

func newHttpBroker(opts ...Option) Broker {
        .......
    h := &httpBroker{
        id:          "broker-" + uuid.NewUUID().String(),
        address:     addr,
        opts:        options,
        r:           reg,
        c:           &http.Client{Transport: newTransport(options.TLSConfig)},  ///用于publish时发送消息
        subscribers: make(map[string][]*httpSubscriber),
        exit:        make(chan chan error),
        mux:         http.NewServeMux(),
    }

    h.mux.Handle(DefaultSubPath, h) ///添加默认路由handler,所有publish过来的method 到h.HTTPServer处理
    return h
}

启动/关闭监听

Connect

启动监听是Connnect函数,感觉这个名字起得很不好,有点迷惑,并不是去连接什么?其实这个函数的功能是创建HttpServer接受Publisher发送来的消息,并且坚定Broker的exit时间,反注册Subscriber。[Run或者Start含义会更清楚一点]

func (h *httpBroker) Connect() error {
    ....
    go http.Serve(l, h.mux) ///启动HTTPServer
    go h.run(l)            ///帧循环
    ....
}
func (h *httpBroker) run(l net.Listener) {
    t := time.NewTicker(registerInterval)
    defer t.Stop()

    for {
        select {
        // heartbeat for each subscriber
        case <-t.C:
            h.RLock()
            for _, subs := range h.subscribers {
                for _, sub := range subs {
                    h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))//TTL时间验证服务状态,如果服务Died,则重新注册他。
                }
            }
            h.RUnlock()
        // received exit signal
        case ch := <-h.exit:
            ch <- l.Close()
            h.RLock()
            for _, subs := range h.subscribers {
                for _, sub := range subs {
                    h.r.Deregister(sub.svc)
                }
            }
            h.RUnlock()
            return
        }
    }
}

DisConnect

那关闭监听 Disconnect做了什么呢?[同样感觉Stop之类的函数名会清楚一点]

func (h *httpBroker) Disconnect() error {
    h.Lock()
    defer h.Unlock()

    if !h.running {
        return nil
    }

    // stop rcache
    rc, ok := h.r.(rcache.Cache)
    if ok {
        rc.Stop()
    }

    // exit and return err
    ch := make(chan error)
    h.exit <- ch
    err := <-ch

    // set not running
    h.running = false
    return err
}

向chan h.exit发送关闭消息,帧循环中会接受到关闭消息,然后进行相应的关闭清理操作。

注意这里发送的关闭消息是err对象,这是一个应该学习的地方,可以知道是正常关闭和异常关闭,如果是异常关闭,可以知道具体错误信息是什么

订阅

func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
        .....
    // register service
    /// 当注册一个subscriber的时候实际上注册了一个服务。然后publish通过服务的名称找到这个注册的地址,然后发送消息。
    node := &registry.Node{
        Id:      id,
        Address: addr,
        Port:    port,
        Metadata: map[string]string{
            "secure": fmt.Sprintf("%t", secure),
        },
    }

    // check for queue group or broadcast queue
    version := options.Queue
    if len(version) == 0 {
        version = broadcastVersion
    }

    service := &registry.Service{
        Name:    "topic:" + topic,
        Version: version,
        Nodes:   []*registry.Node{node},
    }

    // generate subscriber
    subscriber := &httpSubscriber{
        opts:  options,
        hb:    h,
        id:    id,
        topic: topic,
        fn:    handler,///等收到publish是的回调。
        svc:   service,
    }

    // subscribe now
    ////注册服务。并且把subscribe append 到 httpBroker.subscribers中
    if err := h.subscribe(subscriber); err != nil {
        return nil, err
    }

    // return the subscriber
    return subscriber, nil
}

func (h *httpBroker) subscribe(s *httpSubscriber) error {
    h.Lock()
    defer h.Unlock()

    if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
        return err
    }
    h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
    return nil
}

可以看到订阅服务就是注册一个Topic serivce 到 Consul,如果对应Socke的观点我在这个端口(topic)进行监听了,想发消息的就发给我吧。

发布

func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
    h.RLock()
    s, err := h.r.GetService("topic:" + topic)///发现相关服务
    if err != nil {
        h.RUnlock()
        return err
    }
    h.RUnlock()

    m := &Message{
        Header: make(map[string]string),
        Body:   msg.Body,
    }

    for k, v := range msg.Header {
        m.Header[k] = v
    }

    m.Header[":topic"] = topic

    b, err := h.opts.Codec.Marshal(m)///对消息进行编码
    if err != nil {
        return err
    }

    pub := func(node *registry.Node, b []byte) {
        scheme := "http"

        // check if secure is added in metadata
        if node.Metadata["secure"] == "true" {
            scheme = "https"
        }

        vals := url.Values{}
        vals.Add("id", node.Id)

        uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode())
        r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
        if err == nil {
            io.Copy(ioutil.Discard, r.Body)
            r.Body.Close()
        }
    }

    for _, service := range s {
        // only process if we have nodes
        if len(service.Nodes) == 0 {
            continue
        }

        switch service.Version {
        // broadcast version means broadcast to all nodes
        case broadcastVersion:///广播
            for _, node := range service.Nodes {
                // publish async
                go pub(node, b)
            }
        default:
            // select node to publish to///随机publish一个service
            node := service.Nodes[rand.Int()%len(service.Nodes)]

            // publish async
            go pub(node, b)
        }
    }

    return nil
}

从上面的代码可以肯出,整个逻辑也非常简单

  1. 获取对应topic的server
  2. 编码对应的消息
  3. 按照service的类型把消息通过http post的方式发送出去【异步发送】。

订阅收到 publisher 发送的消息handle处理

对应上面Create的时候启动的HTTPServer,收到post的消息,读取然后解码,根据对应的topic获取httpBroker.handler[string]Handler中的handle进行调用。这个逻辑也是比较简单。

相关文章

网友评论

    本文标题:go micro 源码阅读-Broker

    本文链接:https://www.haomeiwen.com/subject/gdzgdftx.html