安装
# 下载 tar 文件
wget https://archive.apache.org/dist/pulsar/pulsar-2.6.2/apache-pulsar-2.6.2-bin.tar.gz
tar xvfz apache-pulsar-2.6.2-bin.tar.gz
cd apache-pulsar-2.6.2
# 下载 offloader 数据卸载器
wget https://archive.apache.org/dist/pulsar/pulsar-2.6.2/apache-pulsar-offloaders-2.6.2-bin.tar.gz
tar xvfz apache-pulsar-offloaders-2.6.2-bin.tar.gz
mv apache-pulsar-offloaders-2.6.2/offloaders offloaders
# 启动单机模式
./bin/pulsar standalone
# 启动 presto 查询器
./bin/pulsar sql-worker run
# 执行查询
./bin/pulsar sql
# 获取所有catalogs
presto> show catalogs;
# 获取 pulsar 下的所有 schema 信息
presto> show schemas in pulsar;
# 查询默认 public/default 模式下的表
presto> show tables in pulsar."public/default";
# 查询某个数据
presto> select a.field1 as name, count(1) as c from pulsar."public/default"."test-topic" a group by field1 order by c desc, name asc limit 10;
注
默认会启动 pulsar://localhost:6650
和 http://localhost:8080
两个默认端口
安装 pulsar-manager
管理后台
# 安装 pulsar-manager 管理
# 拉取 pulsar-manager 管理镜像
docker pull apachepulsar/pulsar-manager:latest
# 启动 pulsar-manager 管理
docker run -it \
-p 9527:9527 -p 7750:7750 \
-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
apachepulsar/pulsar-manager:latest
注
通过 http://172.xxx.xxx.75:9527/
访问 pulsar-manager
后台页面,默认用户为 pulsar/pulsar
注
在 pulsar-manager
中添加 service-url
填入 http://localhost:8080
即可
注
修改默认密码, 具体操作如下:
CSRF_TOKEN=$(curl http://backend-service:7750/pulsar-manager/csrf-token)
curl \
-H "X-XSRF-TOKEN: $CSRF_TOKEN" \
-H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \
-H 'Content-Type: application/json' \
-X PUT http://backend-service:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'
测试吞吐量
# 生产者
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://172.xxx.xxx.75:6650").build();
Producer<Foo> producer = pulsarClient.newProducer(AvroSchema.of(Foo.class)).topic("persistent://public/default/test-topic").create();
for(int c = 0; c < 100000; c++) {
for (int i = 0; i < 1000; i++) {
Foo foo = new Foo();
foo.setField1(i);
foo.setField2("foo" + i);
foo.setField3(System.currentTimeMillis());
producer.newMessage().value(foo).send();
}
}
producer.close();
# 消费者
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*-topic");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern(allTopicsInNamespace)
.subscriptionName("subscription-1")
.subscribe();
Messages<byte[]> batches = null;
try {
do {
batches = allTopicsConsumer.batchReceive();
for (Message<byte[]> msg : batches) {
System.out.println(AvroSchema.of(Foo.class).decode(msg.getData()));
}
} while (true);
} catch (PulsarClientException e) {
e.printStackTrace();
}
allTopicsConsumer.close();
网友评论