美文网首页Spring Boot
Spring Cloud对于Kafka的基本管理

Spring Cloud对于Kafka的基本管理

作者: EasyNetCN | 来源:发表于2020-06-01 11:41 被阅读0次

    bootstrap.yml配置

    spring:
      cloud:
        stream:
          kafka:
            binder:
              brokers: ${kafka.brokers:localhost:9092}
              auto-add-partitions: false
              auto-create-topics: false
      kafka:
        bootstrap-servers: ${kafka.brokers:localhost:9092}
    

    用于管理的Controller

    @RestController
    @RequestMapping("kafka")
    public class KafkaServiceController {
        @Autowired
        private KafkaAdmin kafkaAdmin;
    
        @GetMapping("describe-cluster")
        public Mono<KafkaDescribeClusterResult> describeCluster() throws InterruptedException, ExecutionException {
            var kafkaDescribeClusterResult = new KafkaDescribeClusterResult();
    
            try (var adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
                var describeClusterResult = adminClient.describeCluster();
    
                kafkaDescribeClusterResult.setNodes(
                        describeClusterResult.nodes().get().stream().map(this::convert).collect(Collectors.toList()));
                kafkaDescribeClusterResult.setController(convert(describeClusterResult.controller().get()));
                kafkaDescribeClusterResult.setClusterId(describeClusterResult.clusterId().get());
                kafkaDescribeClusterResult.setAuthorizedOperations(describeClusterResult.authorizedOperations().get());
    
            }
    
            return Mono.justOrEmpty(kafkaDescribeClusterResult);
        }
    
        @GetMapping("topics")
        public Flux<KafkaTopicListing> topics() throws InterruptedException, ExecutionException {
            try (var adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
                return Flux.fromIterable(
                        adminClient.listTopics().listings().get().stream().map(this::convert).collect(Collectors.toList()));
    
            }
        }
    
        @GetMapping("topics/{name}")
        public Mono<KafkaTopicDescription> describeTopic(@PathVariable("name") String name)
                throws InterruptedException, ExecutionException {
            var kafkaTopicDescription = new KafkaTopicDescription();
    
            try (var adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
                var topicNames = new ArrayList<String>(1);
    
                topicNames.add(name);
    
                kafkaTopicDescription = convert(adminClient.describeTopics(topicNames).values().get(name).get());
    
            }
    
            return Mono.justOrEmpty(kafkaTopicDescription);
        }
    
        @PostMapping("topics")
        public Mono<Void> createTopic(@RequestBody KafkaNewTopic kafkaNewTopic)
                throws InterruptedException, ExecutionException {
            try (var adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
                var newTopics = new ArrayList<NewTopic>(1);
    
                newTopics.add(convert(kafkaNewTopic));
    
                adminClient.createTopics(newTopics).values().get(kafkaNewTopic.getName()).get();
    
            }
    
            return Mono.empty();
        }
    
        @DeleteMapping("topics/{name}")
        public Mono<Void> deleteTopic(@PathVariable("name") String name) throws InterruptedException, ExecutionException {
            try (var adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
                var newTopics = new ArrayList<String>(1);
    
                newTopics.add(name);
    
                adminClient.deleteTopics(newTopics).values().get(name).get();
    
            }
    
            return Mono.empty();
        }
    
        private KafkaNode convert(Node node) {
            var kafkaNode = new KafkaNode();
    
            if (null != node) {
                kafkaNode.setId(node.id());
                kafkaNode.setIdString(node.idString());
                kafkaNode.setHost(node.host());
                kafkaNode.setPort(node.port());
                kafkaNode.setRack(node.rack());
            }
    
            return kafkaNode;
        }
    
        private KafkaTopicListing convert(TopicListing topicListing) {
            var kafkaTopicListing = new KafkaTopicListing();
    
            if (null != topicListing) {
                kafkaTopicListing.setName(topicListing.name());
                kafkaTopicListing.setInternal(topicListing.isInternal());
            }
    
            return kafkaTopicListing;
        }
    
        private KafkaTopicDescription convert(TopicDescription topicDescription) {
            var kafkaTopicDescription = new KafkaTopicDescription();
    
            if (null != topicDescription) {
                kafkaTopicDescription.setName(topicDescription.name());
                kafkaTopicDescription.setInternal(topicDescription.isInternal());
                kafkaTopicDescription.setPartitions(
                        topicDescription.partitions().stream().map(this::convert).collect(Collectors.toList()));
                kafkaTopicDescription.setAuthorizedOperations(topicDescription.authorizedOperations());
            }
    
            return kafkaTopicDescription;
        }
    
        private KafkaTopicPartitionInfo convert(TopicPartitionInfo topicPartitionInfo) {
            var kafkaTopicPartitionInfo = new KafkaTopicPartitionInfo();
    
            if (null != topicPartitionInfo) {
                kafkaTopicPartitionInfo.setPartition(topicPartitionInfo.partition());
                kafkaTopicPartitionInfo.setLeader(convert(topicPartitionInfo.leader()));
                kafkaTopicPartitionInfo.setReplicas(
                        topicPartitionInfo.replicas().stream().map(this::convert).collect(Collectors.toList()));
                kafkaTopicPartitionInfo
                        .setIsr(topicPartitionInfo.isr().stream().map(this::convert).collect(Collectors.toList()));
            }
    
            return kafkaTopicPartitionInfo;
        }
    
        private NewTopic convert(KafkaNewTopic kafkaNewTopic) {
            return new NewTopic(kafkaNewTopic.getName(), kafkaNewTopic.getNumPartitions(),
                    kafkaNewTopic.getReplicationFactor());
        }
    }
    

    KafkaDescribeClusterResult

    import java.util.Collection;
    import java.util.Set;
    
    import org.apache.kafka.common.acl.AclOperation;
    
    public class KafkaDescribeClusterResult {
        private Collection<KafkaNode> nodes;
    
        private KafkaNode controller;
    
        private String clusterId;
    
        private Set<AclOperation> authorizedOperations;
    
        public Collection<KafkaNode> getNodes() {
            return nodes;
        }
    
        public void setNodes(Collection<KafkaNode> nodes) {
            this.nodes = nodes;
        }
    
        public KafkaNode getController() {
            return controller;
        }
    
        public void setController(KafkaNode controller) {
            this.controller = controller;
        }
    
        public String getClusterId() {
            return clusterId;
        }
    
        public void setClusterId(String clusterId) {
            this.clusterId = clusterId;
        }
    
        public Set<AclOperation> getAuthorizedOperations() {
            return authorizedOperations;
        }
    
        public void setAuthorizedOperations(Set<AclOperation> authorizedOperations) {
            this.authorizedOperations = authorizedOperations;
        }
    
    }
    

    KafkaNewTopic

    import java.util.List;
    import java.util.Map;
    
    public class KafkaNewTopic {
        private String name;
    
        private int numPartitions;
    
        private short replicationFactor;
    
        private Map<Integer, List<Integer>> replicasAssignments;
    
        private Map<String, String> configs = null;
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public int getNumPartitions() {
            return numPartitions;
        }
    
        public void setNumPartitions(int numPartitions) {
            this.numPartitions = numPartitions;
        }
    
        public short getReplicationFactor() {
            return replicationFactor;
        }
    
        public void setReplicationFactor(short replicationFactor) {
            this.replicationFactor = replicationFactor;
        }
    
        public Map<Integer, List<Integer>> getReplicasAssignments() {
            return replicasAssignments;
        }
    
        public void setReplicasAssignments(Map<Integer, List<Integer>> replicasAssignments) {
            this.replicasAssignments = replicasAssignments;
        }
    
        public Map<String, String> getConfigs() {
            return configs;
        }
    
        public void setConfigs(Map<String, String> configs) {
            this.configs = configs;
        }
    
    }
    

    KafkaNode

    public class KafkaNode {
        private int id;
    
        private String idString;
    
        private String host;
    
        private int port;
    
        private String rack;
    
        public int getId() {
            return id;
        }
    
        public void setId(int id) {
            this.id = id;
        }
    
        public String getIdString() {
            return idString;
        }
    
        public void setIdString(String idString) {
            this.idString = idString;
        }
    
        public String getHost() {
            return host;
        }
    
        public void setHost(String host) {
            this.host = host;
        }
    
        public int getPort() {
            return port;
        }
    
        public void setPort(int port) {
            this.port = port;
        }
    
        public String getRack() {
            return rack;
        }
    
        public void setRack(String rack) {
            this.rack = rack;
        }
    }
    

    KafkaTopicDescription

    import java.util.List;
    import java.util.Set;
    
    import org.apache.kafka.common.acl.AclOperation;
    
    public class KafkaTopicDescription {
        private String name;
    
        private boolean internal;
    
        private List<KafkaTopicPartitionInfo> partitions;
    
        private Set<AclOperation> authorizedOperations;
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public boolean isInternal() {
            return internal;
        }
    
        public void setInternal(boolean internal) {
            this.internal = internal;
        }
    
        public List<KafkaTopicPartitionInfo> getPartitions() {
            return partitions;
        }
    
        public void setPartitions(List<KafkaTopicPartitionInfo> partitions) {
            this.partitions = partitions;
        }
    
        public Set<AclOperation> getAuthorizedOperations() {
            return authorizedOperations;
        }
    
        public void setAuthorizedOperations(Set<AclOperation> authorizedOperations) {
            this.authorizedOperations = authorizedOperations;
        }
    }
    

    KafkaTopicListing

    public class KafkaTopicListing {
        private String name;
    
        private boolean internal;
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public boolean isInternal() {
            return internal;
        }
    
        public void setInternal(boolean internal) {
            this.internal = internal;
        }
    }
    

    KafkaTopicPartitionInfo

    import java.util.List;
    
    public class KafkaTopicPartitionInfo {
        private int partition;
    
        private KafkaNode leader;
    
        private List<KafkaNode> replicas;
    
        private List<KafkaNode> isr;
    
        public int getPartition() {
            return partition;
        }
    
        public void setPartition(int partition) {
            this.partition = partition;
        }
    
        public KafkaNode getLeader() {
            return leader;
        }
    
        public void setLeader(KafkaNode leader) {
            this.leader = leader;
        }
    
        public List<KafkaNode> getReplicas() {
            return replicas;
        }
    
        public void setReplicas(List<KafkaNode> replicas) {
            this.replicas = replicas;
        }
    
        public List<KafkaNode> getIsr() {
            return isr;
        }
    
        public void setIsr(List<KafkaNode> isr) {
            this.isr = isr;
        }
    }
    

    相关文章

      网友评论

        本文标题:Spring Cloud对于Kafka的基本管理

        本文链接:https://www.haomeiwen.com/subject/fhmjzhtx.html