美文网首页springboot
SpringBoot整合kafka

SpringBoot整合kafka

作者: 热爱源于追求 | 来源:发表于2019-05-26 20:28 被阅读0次

    解释:

    Kafka是一个分布式的消息存储系统,提供了四大核心接口:
    1.Producer API允许了应用可以向Kafka中的topics发布消息;
    2.Consumer API允许了应用可以订阅Kafka中的topics,并消费消息;
    3.Streams API允许应用可以作为消息流的处理者,比如可以从topicA中消费消息,处理的结果发布到topicB中;
    4.Connector API提供Kafka与现有的应用或系统适配功能,比如与数据库连接器可以捕获表结构的变化;

    Topic —> 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic.
    Producer —> 负责发布消息到Kafka broker.
    Consumer —> 消息消费者,向Kafka broker读取消息的客户端.

    Kafka安装:

    Kafka下载地址:(http://kafka.apache.org/downloads)
    
    解压下载文件目录结构如下: 目录

    Windows启动方式:

    分别启动Zookeeper、Kafka

     \bin\windows\zookeeper-server-start.bat config\zookeeper.properties
     \bin\windows\kafka-server-start.bat config\server.properties
    

    提供kafka服务不需要在本地安装。

    Spring Boot整合Kafaka:

    非注解使用方式:
    pom引入:

    <!--kafka-clients发送消息所需jar包-->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.0.0</version>
    </dependency>
    

    添加配置.properties文件:

    #============== kafka ===================
    # 指定kafka 代理地址,可以多个
    spring.kafka.bootstrap-servers=localhost:9092
    #=============== provider  =======================
    spring.kafka.producer.retries=0 设置大于0的值,则客户端会将发送失败的记录重新发送
    # 每次批量发送消息的数量
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.buffer-memory=33554432
    # 指定消息key和消息体的编解码方式 UTF-8
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    

    编写一个生产者者:

    package com.zhongway.modules.kafka.provider;
    
    import com.google.common.io.Resources;
    import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    import com.zhongway.modules.kafka.entity.MessageEntity;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Date;
    import java.util.Properties;
    import java.util.concurrent.Future;
    
    /**
     * @author Minko
     */
    public class KafkaSender {
        private static Logger logger = LoggerFactory.getLogger(KafkaSender.class);
        private static KafkaProducer<String, String> producer;
        private Gson gson = new GsonBuilder().create();
    
        static {
            try {
                InputStream props = Resources.getResource("producer.props").openStream();
                Properties properties = new Properties();
                properties.load(props);
                producer = new KafkaProducer<>(properties);
            } catch (IOException e) {
                logger.error("初始化Kafka配置文件失败");
            }
        }
    
        /**
         * 发送消息方法
         *
         * @param topic 主题
         * @param msg 消息体
         */
        public void sendMsg(String topic, String msg) {
            MessageEntity message = new MessageEntity();
            message.setMsg(msg);
            message.setSendTime(new Date());
            logger.info("sendMessage = {}", gson.toJson(message));
            try {
                Future<RecordMetadata> record = producer.send(new ProducerRecord<>(topic, gson.toJson(message)));
                record.get();
            } catch (Exception e) {
                logger.error("sendErrorMessage = {}", gson.toJson(message));
            }
        }
    }
    

    更简单的使用注解方式:

    pom引入:

    spring for kafka对应版本 对应版本
    此处引入2.1.x 其对应kafka-clients版本为所需的1.0.0
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.1.0.RELEASE</version>
    </dependency>
    

    添加配置文件application.yml:

      kafka:
        bootstrap-servers: localhost:9092 # 指定kafka 代理地址,可以多个
        producer: # 生产者
          retries: 1 # 设置大于0的值,则客户端会将发送失败的记录重新发送
          # 每次批量发送消息的数量
          batch-size: 16384
          buffer-memory: 33554432
          # 指定消息key和消息体的编解码方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
    

    编写一个生产者:

    /**
     * 生产者
     * @author Minko
     */
    @Component
    public class KafkaSender {
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
    
        /**
         * 发送消息到kafka
         *@param topic 主题
         *@param message 内容体
         */
        public void sendMsg(String topic , String message){
            kafkaTemplate.send(topic ,message);
        }
    }
    

    两种方式外部只需提供 topic 和发送的 json字符串即可 。
    关于定时任务,更新后的renren框架取消了job 中 method,每个定时任务需要实现ITask的 run方法,源码中会获取存入schedule_job表中的bean名称 和 run方法根据cron表达式去执行该方法。
    示例:

    @Component("KafkaSenderTask ")
    public class KafkaSenderTask implements ITask {
        private Logger logger = LoggerFactory.getLogger(getClass());
           /**
         * params 可为空
         * @param params   参数,多参数使用JSON数据
         */
        @Override
        public void run(String params){
            KafkaSender kafkaSender = new KafkaSender();
            kafkaSender.sendMsg("M","工单消息内容");
        }
    }
    

    kafka葵花宝典:传送门=>

    Demo:

    public class GsonTest {
        public static void main(String[] args) {
            List<Map<String, String>> mapList = new ArrayList<>();
            Map map = new HashMap();
            map.put("id", "1");
            map.put("name", "葵花宝典");
            Map map2 = new HashMap();
            map2.put("id", "2");
            map2.put("name", "九阴真经");
            mapList.add(map);
            mapList.add(map2);
            Gson gson = new Gson();
            System.out.println(gson.toJson(mapList));
        }
    }
    打印结果:[{"name":"葵花宝典","id":"1"},{"name":"九阴真经","id":"2"}]
    
                                       ...end

    相关文章

      网友评论

        本文标题:SpringBoot整合kafka

        本文链接:https://www.haomeiwen.com/subject/lwmftctx.html