美文网首页
Apache Pulsar[5] API demo

Apache Pulsar[5] API demo

作者: QuinnSun | 来源:发表于2020-06-29 14:37 被阅读0次

    生产者

    public class PulsarProducer {
        private static String localClusterUrl = "pulsar://localhost:6650";
        public static void main(String[] args) {
            try {
                Producer<byte[]> producer = getProducer();
                String msg = "test send";
    
                Long start = System.currentTimeMillis();
                MessageId msgId = producer.send(msg.getBytes());
                System.out.println("spend=" + (System.currentTimeMillis() - start) + ";send a message msgId = " + msgId.toString());
            } catch (Exception e) {
                System.err.println(e);
            }
        }
    
        public static Producer<byte[]> getProducer() throws Exception {
            PulsarClient client;
            client = PulsarClient.builder().serviceUrl(localClusterUrl).build();
            Producer<byte[]> producer = client.newProducer()
                    .topic("persistent://my-tenant/my-namespace/my-topic")
                    .producerName("test-producer")
                    .create();
            return producer;
        }
    }
    

    消费者

    public class PulsarConsumerDemo {
        private static String localClusterUrl = "pulsar://localhost:6650";
    
        public static void main(String[] args) {
            try {
                //将订阅消费者指定的主题和订阅
                Consumer<byte[]> consumer = getClient().newConsumer()
                        .topic("persistent://my-tenant/my-namespace/my-topic")
                        .subscriptionName("my-subscription")
                        .subscriptionType(SubscriptionType.Failover)
                        .subscribe();
                while (true) {
                    Message msg = consumer.receive();
                    System.out.printf("consumer-Message received: %s. \n", new String(msg.getData()));
                    // 确认消息,以便broker删除消息
                    consumer.acknowledge(msg);
                }
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    
        public static PulsarClient getClient() throws Exception {
            PulsarClient client;
            client = PulsarClient.builder()
                    .serviceUrl(localClusterUrl).build();
            return client;
        }
    }
    

    相关文章

      网友评论

          本文标题:Apache Pulsar[5] API demo

          本文链接:https://www.haomeiwen.com/subject/lwjwfktx.html