美文网首页pulsar
Apache Pulsar——Java API操作tenant、

Apache Pulsar——Java API操作tenant、

作者: 小波同学 | 来源:发表于2022-05-29 20:54 被阅读0次

    一、添加pom.xml依赖

    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client</artifactId>
        <version>2.10.0</version>
    </dependency>
    

    二、tenant租户的Java API

    public class PulsarTenant {
    
    
        public static void main(String[] args) throws PulsarClientException, PulsarAdminException {
            String serviceHttpUrl = "http://192.168.23.111:8080,192.168.23.112:8080,192.168.23.113:8080";
    
            // 1.创建pulsar的Admin管理对象
            PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
    
            // 2.基于pulsar的Admin对象进行相关的操作
    
            // 2.1 创建租户
            Set<String> allowedClusters = new HashSet<>();
            allowedClusters.add("pulsar-cluster");
            TenantInfo tenantInfo = TenantInfo.builder().allowedClusters(allowedClusters).build();
            pulsarAdmin.tenants().createTenant("my-tenant", tenantInfo);
    
            // 2.2 查看当前有那些租户
            List<String> tenants = pulsarAdmin.tenants().getTenants();
            tenants.forEach(System.out::println);
    
            // 2.3 删除租户操作
            pulsarAdmin.tenants().deleteTenant("my-tenant");
    
            // 3.关闭管理对象
            pulsarAdmin.close();
        }
    }
    

    三、namespace命令空间的Java API

    public class PulsarNamespace {
    
        public static void main(String[] args) throws PulsarAdminException, PulsarClientException {
            String serviceHttpUrl = "http://192.168.23.111:8080,192.168.23.112:8080,192.168.23.113:8080";
    
            // 1.创建pulsar的Admin管理对象
            PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
    
            // 2.基于pulsar的Admin对象进行相关的操作
    
            // 2.1 创建名称空间
            String tenant = "my-tenant";
            String namespace = tenant + "/my-ns";
            pulsarAdmin.namespaces().createNamespace(namespace);
    
            // 2.2 获取租户下的名称空间列表
            List<String> namespaces = pulsarAdmin.namespaces().getNamespaces(tenant);
            namespaces.forEach(System.out::println);
    
            // 2.3 删除名称空间
            pulsarAdmin.namespaces().deleteNamespace(namespace);
    
            // 3.关闭管理对象
            pulsarAdmin.close();
        }
    }
    

    四、topic的Java API

    public class PulsarTopic {
    
        public static void main(String[] args) throws PulsarClientException, PulsarAdminException {
            String serviceHttpUrl = "http://192.168.23.111:8080,192.168.23.112:8080,192.168.23.113:8080";
    
            // 1.创建pulsar的Admin管理对象
            PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
    
            // 2.基于pulsar的Admin对象进行相关的操作
    
            // 2.1 创建topic相关操作: 有分区和没有分区, 以及持久化和非持久化
            String tenant = "my-tenant";
            String namespace = tenant + "/my-ns";
            String nonPartitionedTopicName = "non-persistent://my-tenant/my-ns/my-non-partitioned-topic";
            String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
    
            // 2.2 创建无分区的topic
            pulsarAdmin.topics().createNonPartitionedTopic(nonPartitionedTopicName);
    
            // 2.3 创建有分区的topic
            pulsarAdmin.topics().createPartitionedTopic(partitionedTopicName,3);
    
            // 2.4 修改有分区的Topic的分区数量
            pulsarAdmin.topics().updatePartitionedTopic(partitionedTopicName,6);
    
            // 2.5 查询当前有那些topic。如果一个topic有3个分区,则返回3个带-partition-N后缀的topic
            List<String> topics = pulsarAdmin.topics().getList(namespace);
            topics.forEach(System.out::println);
    
            // 2.6 查询当前有分区的topic列表
            List<String> partitionedTopicList = pulsarAdmin.topics().getPartitionedTopicList(namespace);
            partitionedTopicList.forEach(System.out::println);
    
            // 2.7 查询有分区的Topic,有多少个分区
            int partitions = pulsarAdmin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions;
            System.out.println(partitions);
    
            // 2.8 删除无分区的Topic
            pulsarAdmin.topics().delete(nonPartitionedTopicName);
    
            // 2.9 删除有分区的Topic
            pulsarAdmin.topics().deletePartitionedTopic(partitionedTopicName);
    
    
            // 3.关闭管理对象
            pulsarAdmin.close();
        }
    }
    

    相关文章

      网友评论

        本文标题:Apache Pulsar——Java API操作tenant、

        本文链接:https://www.haomeiwen.com/subject/pocnprtx.html