美文网首页nsq
Go消息中间件Nsq系列(三)------apps/nsq_to

Go消息中间件Nsq系列(三)------apps/nsq_to

作者: Yangwenliu | 来源:发表于2019-07-04 18:26 被阅读0次

上一篇: Go消息中间件Nsq系列(二)------Nsq目录结构

apps/nsq_to_nsq程序

功能描述: nsq客户端读取(消费)指定topic/channel数据,然后通过均衡策略由生产者再次发送

通过此次nsq_to_nsq程序源码阅读, 可以学习到

  1. goroutine channel 的简单使用方法
  2. flag的自定义, 还有透传技巧
  3. 负载均衡的简单实现, 和三方库 host-poolhttps://github.com/bitly/go-hostpool
  4. 耗时统计三方库 https://github.com/bitly/timer_metrics
  5. signal的简单使用, 和生产者消费者的使用案例

主要实现:

  1. 程序需要读取解析命令行参数
  2. N个消费者(Consumer)通过直连nsqd或者服务发现(nslookup)读取指定topic/channel数据,然后通知到配置的N个生产者(Producer)继续生产
  3. 生产者均衡使用了RoundRobin,HostPool 两种可选策略
  4. 使用timermetrics 去统计生产(publish)消息耗时
  5. 使用了signal对程序的收尾工作

源码实现

程序执行顺序: init( ) -> main()

func init() {
      //  自定义类型 实现 flag.Getter(组合flag.Value)接口
    flag.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)")
    flag.Var(&destNsqdTCPAddrs, "destination-nsqd-tcp-address", "destination nsqd TCP address (may be given multiple times)")
    flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
    flag.Var(&topics, "topic", "nsq topic (may be given multiple times)")
    flag.Var(&whitelistJSONFields, "whitelist-json-field", "for JSON messages: pass this field (may be given multiple times)")
}

init() 方法主要是自定义类型入参的映射, 自定义类型需要实现flag.Value接口

func main() {
    var selectedMode int
    // ...  省略
    //  一个技巧用法 实现其他参数透传
    flag.Var(&nsq.ConfigFlag{cCfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, see http://godoc.org/github.com/nsqio/go-nsq#Config)")
    // ...  省略
    // 命令行参数解析
    flag.Parse()

    if *showVersion {
        fmt.Printf("nsq_to_nsq v%s\n", version.Binary)
        return
    }
    // 一系列 输入参数校验 
    if len(topics) == 0 || *channel == "" {
    // ...  省略

    // 负载均衡策略
    switch *mode {
    case "round-robin": // 轮询的方式
        selectedMode = ModeRoundRobin
    case "hostpool", "epsilon-greedy": // 主机池轮询 --- 贪婪算法
        selectedMode = ModeHostPool
    }
      // 信号源的监听对程序进行收尾工作
    termChan := make(chan os.Signal, 1)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

    //多个消费者 maxInFlight 需要配置,否则接收会有问题
    cCfg.MaxInFlight = *maxInFlight

    // 生产者 添加进集合, key为地址
    producers := make(map[string]*nsq.Producer)
    // ... 省略,
       // 给生产者添加 耗时统计
    perAddressStatus := make(map[string]*timer_metrics.TimerMetrics)
       // ... 省略

    // 根据模式选择池子策略 默认的 或者  贪婪算法
    hostPool := hostpool.New(destNsqdTCPAddrs)
    if *mode == "epsilon-greedy" {
        hostPool = hostpool.NewEpsilonGreedy(destNsqdTCPAddrs, 0, &hostpool.LinearEpsilonValueCalculator{})
    }

    var consumerList []*nsq.Consumer

    // 生产者 处理封装
    publisher := &PublishHandler{
        addresses:        destNsqdTCPAddrs,
        producers:        producers,
        mode:             selectedMode,
        hostPool:         hostPool,
        respChan:         make(chan *nsq.ProducerTransaction, len(destNsqdTCPAddrs)),
        perAddressStatus: perAddressStatus,
        timermetrics:     timer_metrics.NewTimerMetrics(*statusEvery, "[aggregate]:"),
    }
    // 多消费者循环添加到 consumerList ,并添加handlermessage添加
    // ... 省略
        
    //  根据生产者个数去开启len个goroutine去异步处理发送结果以及统计 
    for i := 0; i < len(destNsqdTCPAddrs); i++ {
        go publisher.responder()
    }
    // 消费者直连配置的nsqds去连接消费
    for _, consumer := range consumerList {
        err := consumer.ConnectToNSQDs(nsqdTCPAddrs)
    // ... 省略

    // 通过服务发现去建立连接进行消费
    for _, consumer := range consumerList {
    err := consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
        
       // 收到中断信号, consumer 停止消费,释放并退出
    <-termChan // wait for signal
    for _, consumer := range consumerList {
        consumer.Stop()
    }
    for _, consumer := range consumerList {
        <-consumer.StopChan
    }
}

main() 方法主要做了解析命令行参数,根据配置创建producer,consumer,然后producer开启N个goroutine(responder())去异步处理消息与更新统计信息,consumer通过直连或者服务发现形式去进行消费, 最后就是信号源监听程序收尾工作

consumer消息消费..

// 主要是用来处理传递过来的消息
func (ph *PublishHandler) HandleMessage(m *nsq.Message, destinationTopic string) error {
    var err error
    msgBody := m.Body
        // 根据配置进行 消息过滤 主要两个函数
        //  1.  ph.shouldPassMessage(js);
        // 2.   filterMessage(js, msgBody)
        // ... 省略
         // 计时开始, 通过时间差Sub 进行耗时计算
    startTime := time.Now()

        // 根据均衡策略,生产者去发送异步消息
    switch ph.mode {
    case ModeRoundRobin:
        counter := atomic.AddUint64(&ph.counter, 1)
        idx := counter % uint64(len(ph.addresses))
        addr := ph.addresses[idx]
        p := ph.producers[addr]
                // 使用atomic原子操作, 自增 然后进行取模运算 轮询选取producer, 发布异步消息在ph.respChan进行通知
        err = p.PublishAsync(destinationTopic, msgBody, ph.respChan, m, startTime, addr)
    case ModeHostPool:
                // 在主机池里面根据算法获取生产者,然后发送异步消息
        hostPoolResponse := ph.hostPool.Get()
        p := ph.producers[hostPoolResponse.Host()]
        err = p.PublishAsync(destinationTopic, msgBody, ph.respChan, m, startTime, hostPoolResponse)
        if err != nil {
            hostPoolResponse.Mark(err)
        }
    }

    if err != nil {
        return err
    }
       //  禁用自动响应反馈 (就是auto finish)
    m.DisableAutoResponse()
    return nil
}

PublishHandler实现HandleMessage(), consumer的消费也是调用的该方法. 方法中主要就是过滤消息, 然后根据均衡策略算法去获取生产者,然后去发送异步消息, 禁用自动响应,使得异步消息结果在responder()方法中异步处理,
responder() 方法代码如下

func (ph *PublishHandler) responder() {
    var msg *nsq.Message
    var startTime time.Time
    var address string
    var hostPoolResponse hostpool.HostPoolResponse

    // ... 省略
             // 前面判断HostPoolResponse,就是从主机池里面获取的producer发送有无错误

        if success {
            msg.Finish() // 正常消费
        } else {
            msg.Requeue(-1)  // 重新入队列
        }
               // 更新统计状态
        ph.perAddressStatus[address].Status(startTime)
        ph.timermetrics.Status(startTime)
    }
}

responder() 是通过goroutine异步开启的, 主要是异步消息通知进行最终处理. 然后更新统计状态, 可能发生的问题就是producer发送出现故障了,导致消息没正常发送出去,就重新入队列

相关文章

网友评论

    本文标题:Go消息中间件Nsq系列(三)------apps/nsq_to

    本文链接:https://www.haomeiwen.com/subject/czdkcctx.html