美文网首页
go和java测试 nats的jetstream

go和java测试 nats的jetstream

作者: cjlynn | 来源:发表于2022-03-02 15:35 被阅读0次

由于nats-streaming废弃,nats使用jetstream可以存储消息,下面使用java和go分别测试jetstream

1、maven:

<dependency>
                <groupId>io.nats</groupId>
                <artifactId>jnats</artifactId>
                <version>2.13.2</version>
            </dependency>

2、测试代码:

先运行go run jspub.go,再运行go run jssub.go

1、go代码 jetstream

注意几点:

//1、AddStream  pub和sub不同
//pub部分
//stream StreamConfig的Subjects不支持通配符//Subjects: []string{"ORDERS.*"},
js.AddStream(&nats.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"ORDERS.scratch"},
        //Subjects: []string{"ORDERS.*"},//jetstream不支持通配符
        Retention: nats.WorkQueuePolicy,
    })
//sub部分
//stream StreamConfig的Subjects可以使用//Subjects: []string{"ORDERS.*"},
js.AddStream(&nats.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"ORDERS.scratch"},
        //可以Subjects:  []string{"ORDERS.*"}, 
        Retention: nats.WorkQueuePolicy,
    })

//2、三种订阅(选第一或第二种即可)
/**
第一种:js.Subscribe、这2种方式都可以"ORDERS.*"  "ORDERS.scratch"
第二种:js.SubscribeSync可以使用("ORDERS.scratch") 不能使用("ORDERS.*")
第三种:PullSubscribe无法订阅消息
*/
1、go jetstream pub
package main

import (
    "encoding/json"
    "fmt"
    "github.com/nats-io/nats.go"
    "runtime"
    "strconv"
    "time"
)

func main() {
    // Connect to NATS
    nc, _ := nats.Connect(nats.DefaultURL)

    // Create JetStream Context
    js, _ := nc.JetStream(nats.PublishAsyncMaxPending(256))
    //js, _ := nc.JetStream()

    js.DeleteConsumer("ORDERS", "MONITOR")
    js.DeleteStream("ORDERS")

    js.AddStream(&nats.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"ORDERS.scratch"},
        //Subjects: []string{"ORDERS.*"},//jetstream不支持通配符
        Retention: nats.WorkQueuePolicy,
    })
    js.UpdateStream(&nats.StreamConfig{
        Name:     "ORDERS",
        MaxBytes: 8,
    })
    js.AddConsumer("ORDERS", &nats.ConsumerConfig{ //存消息
        Durable: "MONITOR",
    })
    //打印信息
    info, _ := js.StreamInfo("ORDERS")
    marshal, _ := json.Marshal(info)
    fmt.Println("===> StreamInfo ", string(marshal))

    consumerInfo, _ := js.ConsumerInfo("ORDERS", "MONITOR")
    marshal2, _ := json.Marshal(consumerInfo)
    fmt.Println("===> ConsumerInfo ", string(marshal2))

    // Simple Stream Publisher
    js.Publish("ORDERS.scratch", []byte("hello"))

    // Simple Async Stream Publisher
    max := 5000
    for i := 0; i < max; i++ {
        js.PublishAsync("ORDERS.scratch", []byte("hello "+strconv.Itoa(i)))
        time.Sleep(time.Duration(500) * time.Millisecond)
    }

    runtime.Goexit()
}
2、go jetstream sub
package main

import (
    "encoding/json"
    "fmt"
    "github.com/nats-io/nats.go"
    "log"
    "runtime"
)

func printnatsinfo(js nats.JetStreamContext) {
    info, _ := js.StreamInfo("ORDERS")
    marshal, _ := json.Marshal(info)
    fmt.Println("===> StreamInfo ", string(marshal))

    consumerInfo, _ := js.ConsumerInfo("ORDERS", "MONITOR")
    marshal2, _ := json.Marshal(consumerInfo)
    fmt.Println("===> ConsumerInfo ", string(marshal2))
}

func main() {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal("connect error")
    }
    js, _ := nc.JetStream()
    printnatsinfo(js)
    //sub端如何delstream的话 消息会丢失
    //js.DeleteConsumer("ORDERS", "MONITOR")
    //js.DeleteStream("ORDERS")

    js.AddStream(&nats.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"ORDERS.scratch"},
        //Subjects:  []string{"ORDERS.*"}, //jetstream不支持通配符
        Retention: nats.WorkQueuePolicy,
    })
    js.UpdateStream(&nats.StreamConfig{
        Name:     "ORDERS",
        MaxBytes: 8,
    })
    js.AddConsumer("ORDERS", &nats.ConsumerConfig{
        Durable: "MONITOR",
    })

    printnatsinfo(js)

    //2、三种订阅(选第一或第二种即可)
    /**
    第一种:js.Subscribe、这2种方式都可以"ORDERS.*"  "ORDERS.scratch"
    第二种:js.SubscribeSync可以使用("ORDERS.scratch") 不能使用("ORDERS.*")
    第三种:PullSubscribe无法订阅消息
    */
    //第一种订阅
    //js.Subscribe、这2种方式都可以"ORDERS.*"  "ORDERS.scratch"
    js.Subscribe("ORDERS.*", func(m *nats.Msg) {
        //js.Subscribe("ORDERS.scratch", func(m *nats.Msg) {
        fmt.Printf("Received a JetStream message: %s\n", string(m.Data))
        m.Ack()
    })

    //第二种订阅
    //js.SubscribeSync可以使用("ORDERS.scratch") 不能使用("ORDERS.*")
    // Simple Sync Durable Consumer (optional SubOpts at the end)
    /*sub, _ := js.SubscribeSync("ORDERS.scratch", nats.Durable("MONITOR"), nats.MaxDeliver(3))
    for {
        m, _ := sub.NextMsg(3 * time.Second)
        if m != nil {
            fmt.Printf("<===  Received a JetStream message: %s\n", string(m.Data))
            m.Ack()
        }
    }*/

    //第三种订阅
    /**
    PullSubscribe无法订阅消息
    */
    /*subscribe, err := js.PullSubscribe("ORDERS.scratch", "MONITOR")
    for {
        msgs, _ := subscribe.Fetch(3)
        fmt.Println("<=== ", msgs)
        for i, x := range msgs {
            fmt.Printf("第 %d 位 x 的值 = %d\n", i, x)
        }
        time.Sleep(time.Duration(3) * time.Second)
    }*/
    runtime.Goexit()
}

2、java代码 pub/sub jetstream

import io.nats.client.*;
import io.nats.client.api.*;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;

@Slf4j
public class NatsJetStreamTest {

    private Connection nc;
    private JetStream jetStream;
    @Before
    public void pre() throws InterruptedException {
        Options options = new Options.Builder().server("nats://127.0.0.1:4222"))
                .connectionListener(getConnectionListener()).build();
        Nats.connectAsynchronously(options, true);
    }

    private ConnectionListener getConnectionListener() {
        return (conn, type) -> {
            try {
                log.info("nats event {}.", type);
                if (type == ConnectionListener.Events.CONNECTED) {
                    nc = conn;
                    String subject = this.getChannelSubjectName(null);
                    String stream = this.getChannelStreamName(null);
                    this.delConsumer(stream);
                    this.delStream(stream);
                    this.addChannelStream(null);

                    ConsumerConfiguration cc = ConsumerConfiguration.builder().maxAckPending(1).durable("MONITOR").build();
                    PushSubscribeOptions so = PushSubscribeOptions.builder().stream(stream).configuration(cc).build();
                    jetStream = nc.jetStream();
                    Dispatcher dispatcher = nc.createDispatcher();

                    /*jetStream.subscribe(subject, dispatcher, (msg) -> {
                        log.info("Nats receive room Message {}", msg);
                        msg.ack();
                    }, false, so);*/
                    log.info("初始化完成,可以发送数据");
                }
            } catch (Exception e) {
                log.error("ConnectionListener error", e);
            }
        };
    }

    @Test
    public void jetstreamtest() throws InterruptedException, JetStreamApiException, IOException {
        while (jetStream == null) {
            Thread.sleep(1000);
        }
        log.info("开始发送数据");
        int max = 500;
        for (int i = 0; i < max; i++) {
            jetStream.publish(this.getChannelSubjectName(null), ("hello === " + i).getBytes(StandardCharsets.UTF_8));
            Thread.sleep(1000);
        }
    }

    public String getChannelStreamName(String channelId) {
        return "ORDERS";
    }

    public String getChannelSubjectName(String channelId) {
        return "ORDERS.scratch";
    }

    public void publishToStream(String subject, String message) {
        try {
            nc.jetStream().publish(subject, message.getBytes(StandardCharsets.UTF_8));
        } catch (IOException | JetStreamApiException e) {
            log.error("nats publishToStream error", e);
        }
    }

    public static StreamInfo getStreamInfoOrNullWhenNotExist(JetStreamManagement jsm, String streamName) throws IOException, JetStreamApiException {
        try {
            return jsm.getStreamInfo(streamName);
        }
        catch (JetStreamApiException jsae) {
            if (jsae.getErrorCode() == 404) {
                return null;
            }
            throw jsae;
        }
    }
    public static void printStreamInfo(StreamInfo si) {
        printObject(si, "StreamConfiguration", "StreamState", "ClusterInfo", "Mirror", "subjects", "sources");
    }

    public static void printObject(Object o, String... subObjectNames) {
        String s = o.toString();
        for (String sub : subObjectNames) {
            boolean noIndent = sub.startsWith("!");
            String sb = noIndent ? sub.substring(1) : sub;
            String rx1 = ", " + sb;
            String repl1 = (noIndent ? ",\n": ",\n    ") + sb;
            s = s.replace(rx1, repl1);
        }

        log.info(s);
    }

    public void addChannelStream(String channelId) {
        try {
            String channelSubject = getChannelSubjectName(channelId);
            String channelStream = getChannelStreamName(channelId);
            StreamInfo streamInfo = this.createOrGetChannelStream(channelStream);
            StreamConfiguration configuration = streamInfo.getConfiguration();
            List<String> subjects = configuration.getSubjects();
            if (Ts.hv(subjects) && subjects.contains(channelSubject)) {
                return;
            }
            StreamConfiguration streamConfig = StreamConfiguration.builder(configuration).addSubjects(channelSubject).build();
            nc.jetStreamManagement().updateStream(streamConfig);
        } catch (IOException | JetStreamApiException e) {
            log.error("nats addSubjectToStream error", e);
        }
    }

    public void delStream(String channelStream) throws IOException, JetStreamApiException {
        JetStreamManagement jsm = nc.jetStreamManagement();
        try {
            jsm.deleteStream(channelStream);
        } catch (JetStreamApiException e) {
            if (e.getErrorCode() != 404) {
                throw e;
            }
        }
    }

    public StreamInfo createOrGetChannelStream(String streamName) throws IOException, JetStreamApiException {
        JetStreamManagement jsm = nc.jetStreamManagement();
        StreamInfo streamInfo = getStreamInfoOrNullWhenNotExist(jsm, streamName);
        if (!Ts.hv(streamInfo)) {
            StreamConfiguration streamConfig = StreamConfiguration.builder().name(streamName)
//                                .subjects(subject1)
                    .retentionPolicy(RetentionPolicy.WorkQueue)
                    // .maxConsumers(...)
                    // .maxBytes(...)
                    .maxAge(Duration.ofHours(1))
                    // .maxMsgSize(...)
                    .storageType(StorageType.Memory)
                    // .replicas(...)
                    // .noAck(...)
                    // .template(...)
//                                 .discardPolicy(...)
                    .build();
            streamInfo = jsm.addStream(streamConfig);
        }
        printStreamInfo(streamInfo);
        return streamInfo;
    }

    public void delConsumer(String channelStream) throws IOException, JetStreamApiException {
        JetStreamManagement jsm = nc.jetStreamManagement();
        try {
            List<String> consumerNames = jsm.getConsumerNames(channelStream);
            if (Ts.hv(consumerNames)) {
                log.info("Nats stream {} has consumers need del.", channelStream);
                for (String consumerName : consumerNames) {
                    jsm.deleteConsumer(channelStream, consumerName);
                }
            }
        } catch (JetStreamApiException e) {
            if (e.getErrorCode() != 404) {
                throw e;
            }
        }
    }
}

相关文章

  • go和java测试 nats的jetstream

    由于nats-streaming废弃,nats使用jetstream可以存储消息,下面使用java和go分别测试j...

  • nats学习

    初探Nats中间件 一,nats是什么 nats是一个go语言开发的开源的、轻量、高性能的原生消息系统。nats消...

  • Go Micro(4)——基于消息队列NATS构建微服务

    Go Micro(4)——基于消息队列NATS构建微服务 这篇文章我们会讨论基于 NATS 使用 Micro。讨论...

  • 轻量消息中间件NATS与NSQ的介绍和比较

    NATS 1. 语言 server:GO 官方支持client库:GO、C等 2. 设计特点 高效 稳定可用 轻量...

  • go nats安装使用

    目录Nats简介[#Nats%E7%AE%80%E4%BB%8B]一、安装下载[#%E4%B8%80%E3%80%...

  • Laravel 8新功能

    1Laravel Jetstream 版本8引入了Laravel Jetstream,它是具有时尚用户仪表板的框架...

  • go语言测试框架

    go语言内置的测试框架能够完成基本的功能测试,基准测试,和样本测试。 测试框架 go语言测试单元以包为单位组织,包...

  • go中Nats基本使用

    NATS是一个开源的,云原生的消息系统。前面讲过CentOS 7 安装nats server[https://ww...

  • Golang压力测试

    Go Test工具 Go语言中的测试依赖go test命令。编写测试代码和编写普通的Go代码过程是类似的,并不需要...

  • 15. Go极简教程 编写测试

    Go拥有一个轻量级的测试框架,它由 go test 命令和 testing 包构成 hello.go 编写待测试的...

网友评论

      本文标题:go和java测试 nats的jetstream

      本文链接:https://www.haomeiwen.com/subject/bfterrtx.html