美文网首页
十.SpringBoot集成Kafka

十.SpringBoot集成Kafka

作者: __元昊__ | 来源:发表于2019-05-14 16:16 被阅读0次

    首先看一下整体项目代码结构:

    image.png

    首先看一下pom.xml的内容:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.4.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.travelsky</groupId>
        <artifactId>swagger_mysql_redis_kafka_demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>swagger_mysql_redis_kafka_demo</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>1.3.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-thymeleaf</artifactId>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.46</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.47</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>2.2.6.RELEASE</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
    
                <plugin>
                    <groupId>org.mybatis.generator</groupId>
                    <artifactId>mybatis-generator-maven-plugin</artifactId>
                    <version>1.3.7</version>
                    <dependencies>
                        <dependency>
                            <groupId>mysql</groupId>
                            <artifactId>mysql-connector-java</artifactId>
                            <version>5.1.21</version>
                        </dependency>
                        <dependency>
                            <groupId>org.mybatis.generator</groupId>
                            <artifactId>mybatis-generator-core</artifactId>
                            <version>1.3.7</version>
                        </dependency>
                    </dependencies>
                    <executions>
                        <execution>
                            <id>Generate MyBatis Artifacts</id>
                            <phase>package</phase>
                            <goals>
                                <goal>generate</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <!--允许移动生成的文件 -->
                        <verbose>true</verbose>
                        <!-- 是否覆盖 -->
                        <overwrite>true</overwrite>
                        <!-- 自动生成的配置 -->
                        <configurationFile>src/main/resources/generatorConfig.xml</configurationFile>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    我的springboot版本2.14,跟kafka整个jar包只有一个,我用低版本jar包都报错,这个没问题

    <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>2.2.6.RELEASE</version>
            </dependency>
    

    首先定义一个bean用来发送消息的载体:

    public class UserLog {
        private String username;
        private String userid;
        private String state;
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        public String getUserid() {
            return userid;
        }
    
        public void setUserid(String userid) {
            this.userid = userid;
        }
    
        public String getState() {
            return state;
        }
    
        public void setState(String state) {
            this.state = state;
        }
    
        @Override
        public String toString() {
            return "UserLog{" +
                    "username='" + username + '\'' +
                    ", userid='" + userid + '\'' +
                    ", state='" + state + '\'' +
                    '}';
        }
    }
    

    定义消息的发送者:

    import com.alibaba.fastjson.JSON;
    import com.travelsky.swagger_mysql_redis_kafka_demo.domain.UserLog;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    public class UserLogProducer {
        @Autowired
        KafkaTemplate kafkaTemplate;
    
        public void sendLog(String userid){
            UserLog userLog = new UserLog();
            userLog.setUsername("jhp");
            userLog.setUserid(userid);
            userLog.setState("0");
            System.err.println("发送用户日志数据:"+userLog);
            kafkaTemplate.send("test_kafka", JSON.toJSONString(userLog));
        }
    }
    

    消息的发送直接使用KafkaTemplate模板即可,都封装好了,直接使用

    再看消息的消费者:

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Optional;
    
    @Component
    public class UserLogConsumer {
        @KafkaListener(topics = {"test_kafka"})
        public void consumer(ConsumerRecord<?,?> consumerRecord){
            //判断是否为null
            Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
    
            if(kafkaMessage.isPresent()){
                //得到Optional实例中的值
                Object message = kafkaMessage.get();
                System.err.println("消费消息:"+message);
            }
        }
    }
    

    消费机制是通过监听器实现的,直接使用这个@KafkaListener(topics = {"test_kafka"})注解接口,它可以根据指定的条件进行消息的监听

    写一个启动应用类:

    @GetMapping("/kafka")
        public void testKafka(){
            for (int i = 0; i < 10; i++) {
                userLogProducer.sendLog(String.valueOf(i));
            }
        }
    

    再看一下对应的配置文件:

    #============== kafka ===================
    #172.24.112.13:9092,172.24.112.14:9092,172.24.112.15:9092
    spring.kafka.bootstrap-servers=172.24.112.13:9092,172.24.112.14:9092,172.24.112.15:9092
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.consumer.group-id=test-consumer-group
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=1000
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    

    这个时候观察控制台打印的消息:

    微信截图_20190514172841.png

    相关文章

      网友评论

          本文标题:十.SpringBoot集成Kafka

          本文链接:https://www.haomeiwen.com/subject/wbxeaqtx.html