bootstrap.yml配置
spring:
cloud:
stream:
kafka:
binder:
brokers: ${kafka.brokers:localhost:9092}
auto-add-partitions: false
auto-create-topics: false
kafka:
bootstrap-servers: ${kafka.brokers:localhost:9092}
用于管理的Controller
@RestController
@RequestMapping("kafka")
public class KafkaServiceController {
@Autowired
private KafkaAdmin kafkaAdmin;
@GetMapping("describe-cluster")
public Mono<KafkaDescribeClusterResult> describeCluster() throws InterruptedException, ExecutionException {
var kafkaDescribeClusterResult = new KafkaDescribeClusterResult();
try (var adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
var describeClusterResult = adminClient.describeCluster();
kafkaDescribeClusterResult.setNodes(
describeClusterResult.nodes().get().stream().map(this::convert).collect(Collectors.toList()));
kafkaDescribeClusterResult.setController(convert(describeClusterResult.controller().get()));
kafkaDescribeClusterResult.setClusterId(describeClusterResult.clusterId().get());
kafkaDescribeClusterResult.setAuthorizedOperations(describeClusterResult.authorizedOperations().get());
}
return Mono.justOrEmpty(kafkaDescribeClusterResult);
}
@GetMapping("topics")
public Flux<KafkaTopicListing> topics() throws InterruptedException, ExecutionException {
try (var adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
return Flux.fromIterable(
adminClient.listTopics().listings().get().stream().map(this::convert).collect(Collectors.toList()));
}
}
@GetMapping("topics/{name}")
public Mono<KafkaTopicDescription> describeTopic(@PathVariable("name") String name)
throws InterruptedException, ExecutionException {
var kafkaTopicDescription = new KafkaTopicDescription();
try (var adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
var topicNames = new ArrayList<String>(1);
topicNames.add(name);
kafkaTopicDescription = convert(adminClient.describeTopics(topicNames).values().get(name).get());
}
return Mono.justOrEmpty(kafkaTopicDescription);
}
@PostMapping("topics")
public Mono<Void> createTopic(@RequestBody KafkaNewTopic kafkaNewTopic)
throws InterruptedException, ExecutionException {
try (var adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
var newTopics = new ArrayList<NewTopic>(1);
newTopics.add(convert(kafkaNewTopic));
adminClient.createTopics(newTopics).values().get(kafkaNewTopic.getName()).get();
}
return Mono.empty();
}
@DeleteMapping("topics/{name}")
public Mono<Void> deleteTopic(@PathVariable("name") String name) throws InterruptedException, ExecutionException {
try (var adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
var newTopics = new ArrayList<String>(1);
newTopics.add(name);
adminClient.deleteTopics(newTopics).values().get(name).get();
}
return Mono.empty();
}
private KafkaNode convert(Node node) {
var kafkaNode = new KafkaNode();
if (null != node) {
kafkaNode.setId(node.id());
kafkaNode.setIdString(node.idString());
kafkaNode.setHost(node.host());
kafkaNode.setPort(node.port());
kafkaNode.setRack(node.rack());
}
return kafkaNode;
}
private KafkaTopicListing convert(TopicListing topicListing) {
var kafkaTopicListing = new KafkaTopicListing();
if (null != topicListing) {
kafkaTopicListing.setName(topicListing.name());
kafkaTopicListing.setInternal(topicListing.isInternal());
}
return kafkaTopicListing;
}
private KafkaTopicDescription convert(TopicDescription topicDescription) {
var kafkaTopicDescription = new KafkaTopicDescription();
if (null != topicDescription) {
kafkaTopicDescription.setName(topicDescription.name());
kafkaTopicDescription.setInternal(topicDescription.isInternal());
kafkaTopicDescription.setPartitions(
topicDescription.partitions().stream().map(this::convert).collect(Collectors.toList()));
kafkaTopicDescription.setAuthorizedOperations(topicDescription.authorizedOperations());
}
return kafkaTopicDescription;
}
private KafkaTopicPartitionInfo convert(TopicPartitionInfo topicPartitionInfo) {
var kafkaTopicPartitionInfo = new KafkaTopicPartitionInfo();
if (null != topicPartitionInfo) {
kafkaTopicPartitionInfo.setPartition(topicPartitionInfo.partition());
kafkaTopicPartitionInfo.setLeader(convert(topicPartitionInfo.leader()));
kafkaTopicPartitionInfo.setReplicas(
topicPartitionInfo.replicas().stream().map(this::convert).collect(Collectors.toList()));
kafkaTopicPartitionInfo
.setIsr(topicPartitionInfo.isr().stream().map(this::convert).collect(Collectors.toList()));
}
return kafkaTopicPartitionInfo;
}
private NewTopic convert(KafkaNewTopic kafkaNewTopic) {
return new NewTopic(kafkaNewTopic.getName(), kafkaNewTopic.getNumPartitions(),
kafkaNewTopic.getReplicationFactor());
}
}
KafkaDescribeClusterResult
import java.util.Collection;
import java.util.Set;
import org.apache.kafka.common.acl.AclOperation;
public class KafkaDescribeClusterResult {
private Collection<KafkaNode> nodes;
private KafkaNode controller;
private String clusterId;
private Set<AclOperation> authorizedOperations;
public Collection<KafkaNode> getNodes() {
return nodes;
}
public void setNodes(Collection<KafkaNode> nodes) {
this.nodes = nodes;
}
public KafkaNode getController() {
return controller;
}
public void setController(KafkaNode controller) {
this.controller = controller;
}
public String getClusterId() {
return clusterId;
}
public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}
public Set<AclOperation> getAuthorizedOperations() {
return authorizedOperations;
}
public void setAuthorizedOperations(Set<AclOperation> authorizedOperations) {
this.authorizedOperations = authorizedOperations;
}
}
KafkaNewTopic
import java.util.List;
import java.util.Map;
public class KafkaNewTopic {
private String name;
private int numPartitions;
private short replicationFactor;
private Map<Integer, List<Integer>> replicasAssignments;
private Map<String, String> configs = null;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getNumPartitions() {
return numPartitions;
}
public void setNumPartitions(int numPartitions) {
this.numPartitions = numPartitions;
}
public short getReplicationFactor() {
return replicationFactor;
}
public void setReplicationFactor(short replicationFactor) {
this.replicationFactor = replicationFactor;
}
public Map<Integer, List<Integer>> getReplicasAssignments() {
return replicasAssignments;
}
public void setReplicasAssignments(Map<Integer, List<Integer>> replicasAssignments) {
this.replicasAssignments = replicasAssignments;
}
public Map<String, String> getConfigs() {
return configs;
}
public void setConfigs(Map<String, String> configs) {
this.configs = configs;
}
}
KafkaNode
public class KafkaNode {
private int id;
private String idString;
private String host;
private int port;
private String rack;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getIdString() {
return idString;
}
public void setIdString(String idString) {
this.idString = idString;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getRack() {
return rack;
}
public void setRack(String rack) {
this.rack = rack;
}
}
KafkaTopicDescription
import java.util.List;
import java.util.Set;
import org.apache.kafka.common.acl.AclOperation;
public class KafkaTopicDescription {
private String name;
private boolean internal;
private List<KafkaTopicPartitionInfo> partitions;
private Set<AclOperation> authorizedOperations;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isInternal() {
return internal;
}
public void setInternal(boolean internal) {
this.internal = internal;
}
public List<KafkaTopicPartitionInfo> getPartitions() {
return partitions;
}
public void setPartitions(List<KafkaTopicPartitionInfo> partitions) {
this.partitions = partitions;
}
public Set<AclOperation> getAuthorizedOperations() {
return authorizedOperations;
}
public void setAuthorizedOperations(Set<AclOperation> authorizedOperations) {
this.authorizedOperations = authorizedOperations;
}
}
KafkaTopicListing
public class KafkaTopicListing {
private String name;
private boolean internal;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isInternal() {
return internal;
}
public void setInternal(boolean internal) {
this.internal = internal;
}
}
KafkaTopicPartitionInfo
import java.util.List;
public class KafkaTopicPartitionInfo {
private int partition;
private KafkaNode leader;
private List<KafkaNode> replicas;
private List<KafkaNode> isr;
public int getPartition() {
return partition;
}
public void setPartition(int partition) {
this.partition = partition;
}
public KafkaNode getLeader() {
return leader;
}
public void setLeader(KafkaNode leader) {
this.leader = leader;
}
public List<KafkaNode> getReplicas() {
return replicas;
}
public void setReplicas(List<KafkaNode> replicas) {
this.replicas = replicas;
}
public List<KafkaNode> getIsr() {
return isr;
}
public void setIsr(List<KafkaNode> isr) {
this.isr = isr;
}
}
网友评论