pulsar 简介
Apache Pulsar是一个企业级的分布式消息系统,最初由Yahoo开发并在2016年开源,目前正在Apache基金会下孵化。Plusar已经在Yahoo的生产环境使用了三年多,主要服务于Mail、Finance、Sports、 Flickr、 the Gemini Ads platform、 Sherpa以及Yahoo的KV存储。
Pulsar之所以能够称为下一代消息队列,主要是因为以下特性:
-
线性扩展。能够丝滑的扩容到成百上千个节点(Kafka扩容需要占用很多系统资源在节点间拷贝数据,而Plusar完全不用)
-
高吞吐。已经在Yahoo的生产环境中经受了考验,每秒数百万消息
-
低延迟。在大规模的消息量下依然能够保持低延迟(< 5ms)
-
持久化机制。Plusar的持久化机制构建在Apache BookKeeper之上,提供了写与读之前的IO隔离
基于地理位置的复制。Plusar将多地域/可用区的复制作为首要特性支持。用户只需配置好可用区,消息就会被源源不断的复制到其他可用区。当某一个可用区挂掉或者发生网络分区,plusar会在之后不断的重试。
部署方式的多样化。既可以运行在裸机,也支持目前例如Docker、K8S的一些容器化方案以及不同的云厂商,同时在本地开发时也只需要一行命令即可启动整个环境。
Topic支持多种消费模式:exclusive、shared、failover
docker安装
docker指令使用起来较为繁琐,当下有很多可视化docker工具,例如kitematic
,就是一款很好的docker管理工具。
- 新建docker
- 查找资源,新建pulsar
-
启动docker
image.png -
端口设置
image.png
demo小试
直接上代码:
// client 端
package common;
import lombok.Getter;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
public class Client {
@Getter
private PulsarClient client;
public Client() throws PulsarClientException {
client = PulsarClient.builder().serviceUrl("pulsar://localhost:36650").build();
}
public void Close() throws PulsarClientException {
client.close();
}
}
package producer;
import common.Client;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.concurrent.TimeUnit;
public class MessageProducer {
private Client client;
private Producer<byte[]> producer;
public MessageProducer(String topic) throws PulsarClientException {
client = new Client();
producer = createProducer(topic);
}
private Producer<byte[]> createProducer(String topic) throws PulsarClientException {
return client.getClient()
.newProducer()
.topic(topic)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.sendTimeout(10, TimeUnit.SECONDS)
.blockIfQueueFull(true)
.create();
}
public void sendMessage(String message) {
producer.sendAsync(message.getBytes()).thenAccept(msgId -> {
System.out.printf("Message with ID %s successfully sent\n", msgId);
});
}
public void sendOnce(String message) {
try {
producer.send(message.getBytes());
System.out.printf("Message with content %s successfully sent\n", message);
producer.close();
client.Close();
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
public void close(Producer<byte[]> producer) {
producer.closeAsync().thenRun(() -> System.out.println("Producer closed\n"));
}
public static void main(String[] args) throws PulsarClientException {
MessageProducer producer = new MessageProducer("topic");
// producer.sendOnce("Hello Pulsar");
for (int i = 0; i < 100; i++) {
producer.sendMessage("Hello Pulsar " + i);
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
package consumer;
import common.Client;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer {
private Client client;
private Consumer consumer;
public MessageConsumer(String topic, String subscription) throws PulsarClientException {
client = new Client();
consumer = createConsumer(topic, subscription);
}
private Consumer<byte[]> createConsumer(String topic, String subscription) throws PulsarClientException {
return client.getClient()
.newConsumer()
.topic(topic)
.subscriptionName(subscription)
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
}
public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {
System.out.println("Start pulsar");
CompletableFuture<Message> msg = consumer.receiveAsync();
String result = "topic is: " + msg.get().getTopicName() + ", data is:" + new String(msg.get().getData());
consumer.acknowledge(msg.get());
consumer.close();
client.Close();
return result;
}
public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {
do {
CompletableFuture<Message> msg = consumer.receiveAsync();
System.out.printf("Message received: %s\n", new String(msg.get().getData()));
consumer.acknowledge(msg.get());
} while (true);
}
public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
MessageConsumer consumer = new MessageConsumer("topic", "my-sub");
consumer.receiveMessage();
}
}
效果演示
(略)
网友评论