美文网首页
java操作kafka

java操作kafka

作者: ColorsLee | 来源:发表于2022-08-12 09:20 被阅读0次

1.导包pom

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>3.2.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.2.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.0</version>
        </dependency>

2.配置文件application.yml

  kafka:
    producer:
      batch-size: 30
    consumer:
      group-id: test
    bootstrap-servers: 192.168.72.128:9092

3.生产者

@RestController
public class KafkaProducer {

    @Autowired
    KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("test/kafka/producer/{msg}")
    public String testKafkaProducer(@PathVariable("msg") String msg){
        kafkaTemplate.send("top_name","key",msg);
        return "ok";
    }
}

4.消费者

@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = {"top_name"})
    public void handMessage(ConsumerRecord<?,?> consumerRecord){
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            log.info("message:{}", msg);
        }
    }
}

5.启动类

@SpringBootApplication(scanBasePackages = {"com.example.demo"})
@EnableAspectJAutoProxy(proxyTargetClass = true)
@EnableScheduling
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

6.打开浏览器访问

localhost:7070/test/kafka/producer/kangkang

相关文章

  • java操作kafka

    【准备】1 : zookeeper集群 搭建在224, 225, 2262 : kafka使用3个节点224...

  • java操作kafka

    1.导包pom 2.配置文件application.yml 3.生产者 4.消费者 5.启动类 6.打开浏览器访问

  • kafka集群安装

    环境 操作系统:CentOS 7.5 运行环境:Java 1.8 zookeeper-3.4.12 kafka_2...

  • kafka文章精选

    问答:为什么kafka高吞吐量: 怎么达到高吞吐量呢?Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别...

  • [Kafka 101-1] Kafka安装使用

    Kafka 101系列文章第2篇:Kafka安装使用。 安装Java Kafka是用Java和Scala写的,Ka...

  • kafka安装

    kafka 集群安装 Kafka常用操作命令

  • java操作kafka生产消费

    前言 kafka版本更新之后,其java调用的API也发生了变化,具体是从2.11.0.9.0之后(不包括0.9....

  • Kafka

    Kafka简述 Kafka is written in Scala and Java.publish-subscr...

  • 使用php连接操作kafka

    使用php连接操作kafka,从安装kafka到引入php扩展来操作kafka。 一、安装 注:需安装JDK 1....

  • filebeat+ELK+kafka集群搭建(三:kafka集群

    搭建kafka集群 Kafka官方文档:http://kafka.apache.org 1. 安装java   E...

网友评论

      本文标题:java操作kafka

      本文链接:https://www.haomeiwen.com/subject/zndtgrtx.html