美文网首页工作生活
pulsar 初步小试

pulsar 初步小试

作者: dayeshisir | 来源:发表于2019-07-04 21:01 被阅读0次

    pulsar 简介

    Apache Pulsar是一个企业级的分布式消息系统,最初由Yahoo开发并在2016年开源,目前正在Apache基金会下孵化。Plusar已经在Yahoo的生产环境使用了三年多,主要服务于Mail、Finance、Sports、 Flickr、 the Gemini Ads platform、 Sherpa以及Yahoo的KV存储。

    Pulsar之所以能够称为下一代消息队列,主要是因为以下特性:

    • 线性扩展。能够丝滑的扩容到成百上千个节点(Kafka扩容需要占用很多系统资源在节点间拷贝数据,而Plusar完全不用)

    • 高吞吐。已经在Yahoo的生产环境中经受了考验,每秒数百万消息

    • 低延迟。在大规模的消息量下依然能够保持低延迟(< 5ms)

    • 持久化机制。Plusar的持久化机制构建在Apache BookKeeper之上,提供了写与读之前的IO隔离

    基于地理位置的复制。Plusar将多地域/可用区的复制作为首要特性支持。用户只需配置好可用区,消息就会被源源不断的复制到其他可用区。当某一个可用区挂掉或者发生网络分区,plusar会在之后不断的重试。

    部署方式的多样化。既可以运行在裸机,也支持目前例如Docker、K8S的一些容器化方案以及不同的云厂商,同时在本地开发时也只需要一行命令即可启动整个环境。

    Topic支持多种消费模式:exclusive、shared、failover

    docker安装

    docker指令使用起来较为繁琐,当下有很多可视化docker工具,例如kitematic,就是一款很好的docker管理工具。

    • 新建docker
    image.png
    • 查找资源,新建pulsar
    image.png
    • 启动docker


      image.png
    • 端口设置


      image.png

    demo小试

    直接上代码:

    // client 端
    package common;
    
    import lombok.Getter;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.PulsarClientException;
    
    public class Client {
        @Getter
        private PulsarClient client;
    
        public Client() throws PulsarClientException {
            client = PulsarClient.builder().serviceUrl("pulsar://localhost:36650").build();
        }
    
        public void Close() throws PulsarClientException {
            client.close();
        }
    }
    
    package producer;
    
    import common.Client;
    import org.apache.pulsar.client.api.Producer;
    import org.apache.pulsar.client.api.PulsarClientException;
    
    import java.util.concurrent.TimeUnit;
    
    public class MessageProducer {
        private Client client;
    
        private Producer<byte[]> producer;
    
        public MessageProducer(String topic) throws PulsarClientException {
            client = new Client();
    
            producer = createProducer(topic);
        }
    
        private Producer<byte[]> createProducer(String topic) throws PulsarClientException {
            return client.getClient()
                    .newProducer()
                    .topic(topic)
                    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                    .sendTimeout(10, TimeUnit.SECONDS)
                    .blockIfQueueFull(true)
                    .create();
        }
    
        public void sendMessage(String message) {
            producer.sendAsync(message.getBytes()).thenAccept(msgId -> {
                System.out.printf("Message with ID %s successfully sent\n", msgId);
            });
        }
    
        public void sendOnce(String message) {
            try {
                producer.send(message.getBytes());
                System.out.printf("Message with content %s successfully sent\n", message);
                producer.close();
                client.Close();
            } catch (PulsarClientException e) {
                e.printStackTrace();
            }
        }
    
        public void close(Producer<byte[]> producer) {
            producer.closeAsync().thenRun(() -> System.out.println("Producer closed\n"));
        }
    
        public static void main(String[] args) throws PulsarClientException {
            MessageProducer producer = new MessageProducer("topic");
    
    //        producer.sendOnce("Hello Pulsar");
    
            for (int i = 0; i < 100; i++) {
                producer.sendMessage("Hello Pulsar " + i);
    
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    package consumer;
    
    import common.Client;
    import org.apache.pulsar.client.api.Consumer;
    import org.apache.pulsar.client.api.Message;
    import org.apache.pulsar.client.api.PulsarClientException;
    import org.apache.pulsar.client.api.SubscriptionType;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    public class MessageConsumer {
        private Client client;
    
        private Consumer consumer;
    
        public MessageConsumer(String topic, String subscription) throws PulsarClientException {
            client = new Client();
    
            consumer = createConsumer(topic, subscription);
        }
    
        private Consumer<byte[]> createConsumer(String topic, String subscription) throws PulsarClientException {
            return client.getClient()
                    .newConsumer()
                    .topic(topic)
                    .subscriptionName(subscription)
                    .ackTimeout(10, TimeUnit.SECONDS)
                    .subscriptionType(SubscriptionType.Exclusive)
                    .subscribe();
        }
    
        public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {
            System.out.println("Start pulsar");
    
            CompletableFuture<Message> msg = consumer.receiveAsync();
    
            String result = "topic is: " + msg.get().getTopicName() + ", data is:" + new String(msg.get().getData());
    
            consumer.acknowledge(msg.get());
            consumer.close();
            client.Close();
    
            return result;
        }
    
        public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {
            do {
                CompletableFuture<Message> msg = consumer.receiveAsync();
    
                System.out.printf("Message received: %s\n", new String(msg.get().getData()));
    
                consumer.acknowledge(msg.get());
            } while (true);
        }
    
        public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
            MessageConsumer consumer = new MessageConsumer("topic", "my-sub");
    
            consumer.receiveMessage();
        }
    }
    
    

    效果演示

    (略)

    参考

    相关文章

      网友评论

        本文标题:pulsar 初步小试

        本文链接:https://www.haomeiwen.com/subject/wljdhctx.html