美文网首页
pulsar 使用笔记

pulsar 使用笔记

作者: 走在成长的道路上 | 来源:发表于2020-11-27 15:55 被阅读0次

安装

# 下载 tar 文件
wget https://archive.apache.org/dist/pulsar/pulsar-2.6.2/apache-pulsar-2.6.2-bin.tar.gz
tar xvfz apache-pulsar-2.6.2-bin.tar.gz
cd apache-pulsar-2.6.2

# 下载 offloader 数据卸载器
wget https://archive.apache.org/dist/pulsar/pulsar-2.6.2/apache-pulsar-offloaders-2.6.2-bin.tar.gz
tar xvfz apache-pulsar-offloaders-2.6.2-bin.tar.gz
mv apache-pulsar-offloaders-2.6.2/offloaders offloaders

# 启动单机模式
./bin/pulsar standalone

# 启动 presto 查询器
./bin/pulsar sql-worker run

# 执行查询
./bin/pulsar sql
# 获取所有catalogs
presto> show catalogs;

# 获取 pulsar 下的所有 schema 信息
presto> show schemas in pulsar;

# 查询默认 public/default 模式下的表
presto> show tables in pulsar."public/default";

# 查询某个数据
presto> select a.field1 as name, count(1) as c from pulsar."public/default"."test-topic" a group by field1 order by c desc, name asc limit 10;

默认会启动 pulsar://localhost:6650http://localhost:8080 两个默认端口

安装 pulsar-manager 管理后台

# 安装 pulsar-manager 管理
# 拉取 pulsar-manager 管理镜像
docker pull apachepulsar/pulsar-manager:latest

# 启动 pulsar-manager 管理
docker run -it \
    -p 9527:9527 -p 7750:7750 \
    -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
    apachepulsar/pulsar-manager:latest

通过 http://172.xxx.xxx.75:9527/ 访问 pulsar-manager 后台页面,默认用户为 pulsar/pulsar
pulsar-manager 中添加 service-url 填入 http://localhost:8080 即可

修改默认密码, 具体操作如下:

CSRF_TOKEN=$(curl http://backend-service:7750/pulsar-manager/csrf-token)
curl \
    -H "X-XSRF-TOKEN: $CSRF_TOKEN" \
    -H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \
    -H 'Content-Type: application/json' \
    -X PUT http://backend-service:7750/pulsar-manager/users/superuser \
    -d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'

测试吞吐量

# 生产者
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://172.xxx.xxx.75:6650").build();
Producer<Foo> producer = pulsarClient.newProducer(AvroSchema.of(Foo.class)).topic("persistent://public/default/test-topic").create();

for(int c = 0; c < 100000; c++) {
    for (int i = 0; i < 1000; i++) {
        Foo foo = new Foo();
        foo.setField1(i);
        foo.setField2("foo" + i);
        foo.setField3(System.currentTimeMillis());
        producer.newMessage().value(foo).send();
    }
}
producer.close();

# 消费者
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*-topic");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
        .topicsPattern(allTopicsInNamespace)
        .subscriptionName("subscription-1")
        .subscribe();

Messages<byte[]> batches = null;
try {
    do {
        batches = allTopicsConsumer.batchReceive();
        for (Message<byte[]> msg : batches) {
            System.out.println(AvroSchema.of(Foo.class).decode(msg.getData()));
        }
    } while (true);
} catch (PulsarClientException e) {
    e.printStackTrace();
}
allTopicsConsumer.close();

相关文章

网友评论

      本文标题:pulsar 使用笔记

      本文链接:https://www.haomeiwen.com/subject/qqmxwktx.html