美文网首页
Apache Pulsar——producer consumer

Apache Pulsar——producer consumer

作者: 小波同学 | 来源:发表于2022-05-30 00:01 被阅读0次

    一、添加pom.xml依赖

    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client</artifactId>
        <version>2.10.0</version>
    </dependency>
    

    二、producer

    2.1 producer 同步发送

    /**
     * @Author: huangyibo
     * @Date: 2022/5/27 22:41
     * @Description: Pulsar 生产者 同步发送
     */
    
    public class PulsarProduceSync {
    
        public static void main(String[] args) throws PulsarClientException {
            // 1 创建pulsar的客户端对象
            String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
            PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
    
            // 2 基于客户端对象进行构建生产者对象
            String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
    
            // 发送字符串,Schema的类型为:Schema.STRING
            Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                    .topic(partitionedTopicName).create();
    
            // 3 发送数据生产
            producer.send("hello Pulsar");
    
            // 4 释放资源
            producer.close();
            pulsarClient.close();
        }
    }
    

    2.2 producer 异步发送

    /**
     * @Author: huangyibo
     * @Date: 2022/5/27 22:43
     * @Description: Pulsar 生产者 异步发送
     */
    
    public class PulsarProduceAsync {
    
        public static void main(String[] args) throws PulsarClientException, InterruptedException {
            // 1 创建pulsar的客户端对象
            String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
            PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
    
            // 2 基于客户端对象进行构建生产者对象
            String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
    
            // 发送字符串,Schema的类型为:Schema.STRING
            Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                    .topic(partitionedTopicName).create();
    
            // 3 发送数据生产
            producer.sendAsync("hello Async Pulsar");
    
            // 异步发送, 会先将数据发送到客户端缓存中, 当缓存达到一批后才会进行批量发送
            // 等待一定时间,等消息发送成功了,再关闭客户端
            Thread.sleep(1000);
    
            // 4 释放资源
            producer.close();
            pulsarClient.close();
        }
    }
    

    2.3 producer Schema发送

    /**
     * @Author: huangyibo
     * @Date: 2022/5/27 22:58
     * @Description: Pulsar 生产者 Schema发送
     */
    
    public class PulsarProduceSchema {
    
        public static void main(String[] args) throws PulsarClientException {
            // 1 创建pulsar的客户端对象
            String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
            PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
    
            // 2 基于客户端对象进行构建生产者对象
            String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
    
            // 发送字符串,Schema的类型为:Schema.STRING
            Producer<User> producer = pulsarClient.newProducer(AvroSchema.of(User.class))
                    .topic(partitionedTopicName).create();
    
            // 3 发送数据生产
            User user = new User();
            user.setName("张无忌");
            user.setAge(20);
            producer.send(user);
    
            // 4 释放资源
            producer.close();
            pulsarClient.close();
        }
    }
    

    三、consumer

    3.1 consumer 普通消费方式

    /**
     * @Author: huangyibo
     * @Date: 2022/5/28 0:15
     * @Description: Pulsar 消费者
     */
    
    public class PulsarConsumer {
    
        public static void main(String[] args) throws PulsarClientException {
            // 1 创建pulsar的客户端的对象
            String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
    
            PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
    
            // 2 基于客户端构建消费者对象
            String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
    
            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                    // 可以传入多个topic
                    .topic(partitionedTopicName)
                    //可以消费多个topic
                    //.topics()
                    .subscriptionName("consumeTest")
                    .subscriptionType(SubscriptionType.Exclusive)
                    .subscribe();
    
            // 3 循环从消费者读取数据
            while(true) {
                // 接收消息
                Message<String> message = consumer.receive();
    
                try {
                    // 获取消息
                    String msg = message.getValue();
    
                    // 处理数据
                    System.out.println("获取的数据为: " + msg);
    
                    // ack确认操作,下次重启从ack的position开始消费数据
                    consumer.acknowledge(message);
                } catch(PulsarClientException e) {
                    e.printStackTrace();
                    //消息消费失败
                    consumer.negativeAcknowledge(message);
                }
            }
        }
    }
    

    3.2 consumer Schema方式

    /**
     * @Author: huangyibo
     * @Date: 2022/5/28 0:23
     * @Description: Pulsar 消费者 Schema方式
     */
    
    public class PulsarConsumerSchema {
    
        public static void main(String[] args) throws PulsarClientException {
            // 1 创建pulsar的客户端的对象
            String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
    
            PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
    
            // 2 基于客户端构建消费者对象
            String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
    
            Consumer<User> consumer = pulsarClient.newConsumer(AvroSchema.of(User.class))
                    // 可以传入多个topic
                    .topic(partitionedTopicName)
                    .subscriptionName("consumeTest")
                    .subscriptionType(SubscriptionType.Shared)
                    .subscribe();
    
            // 3 循环从消费者读取数据
            while(true) {
                // 接收消息
                Message<User> message = consumer.receive();
    
                try {
                    // 获取消息
                    User user = message.getValue();
    
                    // 处理数据
                    System.out.println(user);
    
                    // ack确认操作,下次重启从ack的position开始消费数据
                    consumer.acknowledge(message);
                } catch(PulsarClientException e) {
                    e.printStackTrace();
                    //消息消费失败
                    consumer.negativeAcknowledge(message);
                }
            }
        }
    }
    

    3.3 consumer 批量消费方式

    /**
     * @Author: huangyibo
     * @Date: 2022/5/28 0:27
     * @Description: Pulsar 消费者 批量消费方式
     */
    
    public class PulsarConsumerBatch {
    
        public static void main(String[] args) throws PulsarClientException {
            // 1 创建pulsar的客户端的对象
            String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
    
            PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
    
            // 2 基于客户端构建消费者对象
            String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
    
            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                    // 可以传入多个topic
                    .topic(partitionedTopicName)
                    //可以消费多个topic
                    //.topics()
                    .subscriptionName("consumeTest")
                    //设置批量读取数据
                    .batchReceivePolicy(
                        BatchReceivePolicy.builder()
                            // 1M
                            .maxNumBytes(1024 * 1024)
                            //最大消费消息条数
                            .maxNumMessages(100)
                            //等待时间
                            .timeout(2000, TimeUnit.MILLISECONDS)
                            .build()
                    ).subscribe();
    
            // 3 循环从消费者读取数据
            while(true) {
                // 接收消息
                Messages<String> messages = consumer.batchReceive();
    
                messages.forEach(message -> {
                    try {
                        // 获取消息
                        String msg = message.getValue();
                        // 处理数据
                        System.out.println("获取的数据为: " + msg);
    
                        // ack确认操作,下次重启从ack的position开始消费数据
                        consumer.acknowledge(message);
                    } catch (PulsarClientException e) {
                        e.printStackTrace();
                        //消息消费失败
                        consumer.negativeAcknowledge(message);
                    }
                });
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Apache Pulsar——producer consumer

          本文链接:https://www.haomeiwen.com/subject/prhnprtx.html