美文网首页
golang如何使用Spring Cloud Stream

golang如何使用Spring Cloud Stream

作者: EasyNetCN | 来源:发表于2020-10-07 22:34 被阅读0次

慎重声明,只代表本人观点,不一定代表实际。看了差不多半天Spring Cloud Stream中的kafka源代码,差不多断断续续折腾了一个月,终于在golang中使用kafka发送给Spring Cloud Stream并且成功处理

Spring Cloud Stream当使用@StreamListener中的condition,通过head进行选择的时候,其中MessageHeader是需要包含三个信息:

id UUID类型

contentType 字符串类型,内容类型,可以为:application/json

spring_json_header_types header中的值类型,使用golang的时候,例如:{"partitionKey":"java.lang.String","scst_partition":"java.lang.Integer","contentType":"java.lang.String"}

注意id必须基于java序列化格式,可以参考:https://www.jianshu.com/p/08fe6ffe26d5

直接上代码:

func testKafka() {
    sarama.Logger = log.New(os.Stdout, "", log.LstdFlags)

    config := sarama.NewConfig()

    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    config.Version = sarama.MaxVersion

    client, err := sarama.NewClient(strings.Split("localhost:9092", ","), config)

    if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
    }

    producer, err := sarama.NewSyncProducerFromClient(client)

    if err != nil {
        log.Fatalf("unable to create producer: %q", err)
    }

    defer producer.Close()

    serviceErrorLog := &ServiceErrorLog{ApplicationName: "test-service", ServerIp: "127.0.0.1", Path: "/", QueryParams: "/QueryParams", Message: "Message", Trace: "Trace", LogTime: time.Now().Format(DATE_TIME_PATTERN)}

    if err == nil {
        key, err1 := uuid.NewRandom()

        if err1 != nil {
            log.Fatalf("unable to create uuid: %q", err1)
        }

        //headers := &MessageHeader{Id: key.String(), ContentType: "application/json", PartitionKey: "service-error-logs", Timestamp: time.Now().UnixNano()}

        //genericMessage := &ServiceErrorLogGenericMessage{Headers: *headers, Payload: *serviceErrorLog}

        text, _ := json.Marshal(serviceErrorLog)

        id := utility.UUIDJavaBytes(key)
        contentType := []byte("application/json")
        partitionKey := []byte("service-error-logs")
        springJsonHeaderTypes := []byte{123, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 75, 101, 121, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 115, 99, 115, 116, 95, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125}

        message := &sarama.ProducerMessage{
            Topic: "log-service-topic",
            Headers: []sarama.RecordHeader{
                {Key: []byte("id"), Value: id},
                {Key: []byte("contentType"), Value: contentType},
                {Key: []byte("partitionKey"), Value: partitionKey},
                {Key: []byte("spring_json_header_types"), Value: springJsonHeaderTypes},
            },
            Value: sarama.StringEncoder(text)}

        fmt.Println(message)

        partition, offset, err := producer.SendMessage(message)

        fmt.Println(partition, offset, err)

    }

}

相关文章

网友评论

      本文标题:golang如何使用Spring Cloud Stream

      本文链接:https://www.haomeiwen.com/subject/upiqpktx.html