美文网首页
1.2.Go如何使用pulsar

1.2.Go如何使用pulsar

作者: Goplayer王布斯 | 来源:发表于2022-03-15 17:51 被阅读0次

    1.首先引入 

    github.com/apache/pulsar-client-go/pulsar

    2.启动pulsar的client

    /*启动一个pulsar  client*/

    defer func() {

      ifr:=recover();r!=nil{

         logger.Fatalf(ctx,"pulsar is down stack[%+v] recover[%+v]",string(debug.Stack()),r)

       }

    }()

    varerrerror

    SunacClient,err=pulsar.NewClient(pulsar.ClientOptions{

      URL:conf.PulsarSetting.Url,

    })

    iferr!=nil{

      logger.Errorf(ctx,"setup pulsar failed to get new client err[%v]",err)

      return

    }

    3.生产者

    funcSendMessage(ctx*gin.Context,clientpulsar.Client,topicstring,msg []byte) {

      /*

       提供生产者方法

       */

       /*

         (1)初始化一个producer设置好主题

       */

      producer,err:=client.CreateProducer(pulsar.ProducerOptions{

         Topic:topic,

      })

      /*

        */

      iferr!=nil{

         logger.Errorf(ctx,"SendMessage Error[%v]",err)

       }

      /*

        */

       //(2)把消息结构体发给pusar

      _,err=producer.Send(context.Background(), &pulsar.ProducerMessage{

         Payload:msg,

         EventTime:time.Now(),

      })

      logger.Infof(ctx,"Pulsar_Published_SendMessage [%v]",string(msg))

      deferproducer.Close()

      iferr!=nil{

         logger.Errorf(ctx,"Failed to publish message",err)

       }

    }

    4.消费者

    funcnewConsumer(ctx*gin.Context,clientpulsar.Client,topicstring) {

      //创建消费者

      defer func() {

         ifr:=recover();r!=nil{

            logger.Fatalf(ctx,"Pulsar is down [stack=%+v] [recover=%+v]",string(debug.Stack()),r)

          }

       }()

      consumer,err:=client.Subscribe(pulsar.ConsumerOptions{

         Topics:[]string{topic},

         SubscriptionName:conf.RcSeverSetting.ServiceName,

         Type:pulsar.Failover,

      })

      iferr!=nil{

         logger.Errorf(ctx,"NewConsumer failed to start pulsar consumer,err[%v]",err)

         return

      }

      deferconsumer.Close()

      //循环的获取pulsar的消息     e

      for{

         msg,err:=consumer.Receive(context.Background())

         iferr!=nil{

            logger.Error(ctx,"newConsumer failed to receive message,err[%v],topic[%s]",err,msg.Topic())

          }

         //根据消息的类型 对消息进行处理

         logger.Infof(ctx,"newConsumer RCPULSARFACEADD topic[%s] msg[%v]",msg.Topic(),string(msg.Payload()))

         switchmsg.Topic() {

         casecommon.RCPULSARFACEADD://人脸添加或更新

            logger.Infof(ctx,"newConsumer RCPULSARFACEADD topic[%s]",msg.Topic())

         casecommon.RCPULSARPASSRECORDADD://通行记录添加

            logger.Infof(ctx,"newConsumer RCPULSARPASSRECORDADD topic[%s]",msg.Topic())

         default:

    logger.Errorf(ctx,"pulsar consumer got an wrong topic,message[%v]",msg)

          }

         iferr!=nil{

            logger.Errorf(ctx,"newConsumer handle got error,msg will nack,topic=[%s],err=[%v]",topic,err)

            consumer.Nack(msg)//When a message is "negatively acked" it will be marked for redelivery after some fixed delay

            continue

         }

         consumer.Ack(msg)

       }

    }

    相关文章

      网友评论

          本文标题:1.2.Go如何使用pulsar

          本文链接:https://www.haomeiwen.com/subject/oqmydrtx.html