美文网首页
kafka- windows与springboot2.1.X下使

kafka- windows与springboot2.1.X下使

作者: 七枷琴子 | 来源:发表于2019-11-16 10:59 被阅读0次

    1.kafka的搭建需要依赖zookeeper,现官网最新版已经内置了zookeeper,此处使用内置的ZK.
    先去官网下载kafka,
    kafka参考:https://blog.csdn.net/woshixiazaizhe/article/details/80610432

    image.png

    ,下载后解压目录格式如图


    image.png

    其中config是配置文件,复制一下这个文件夹,放到bin/windows下


    image.png
    2.修改config目录下的zookeeper.properties
    新建一个文件夹用于存放ZK的日志
    image.png

    回到bin/windows目录下,在此目录打开cmd(shift+鼠标右键),输入:

    zookeeper-server-start.bat config\zookeeper.properties
    

    特别注意,此目录层级不能过深和带空格,否则会无法启动

    image.png
    3.此时ZK服务就已经启动了,接下来安装ZK可视化工具ZK-UI,下载地址
    https://github.com/DeemOpen/zkui
    这是一个java工程,依赖maven搭建,mvn install出jar包
    image.png
    复制出这个jar包,和config文件,放到同一个目录
    输入
    java -jar [你的jar包名字]
    例如
    java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar
    

    ZKui启动成功,参考:https://www.jianshu.com/p/8320a6c52f15

    image.png

    网页输入

    账号admin
    密码manager
    
    
    image.png

    4.启动kafka
    修改config下server.properties文件,日志改为你需要的目录,


    image.png

    去kafka的bin/windows目录下

    kafka-server-start.bat config\server.properties
    

    ,kafka启动成功,此时ZKUI界面可以看到节点了


    image.png

    5.kafka可视化管理软件kafkatool,安装后连接到节点即可管理


    image.png

    收到的信息默认是byte类型,可以改成string就可以显示为正常字符串.


    image.png image.png

    6.使用java发送消息
    此处使用的springboot版本是2.1.3,查看依赖的spring版本是5.1.5,因此spring_kafuka包需要对应的版本否则会依赖冲突


    image.png

    去maven仓库中找到2.2.4刚好版本符合,引入2.2.4版本


    image.png
    <dependency>
           <groupId>org.springframework.kafka</groupId>
           <artifactId>spring-kafka</artifactId>
           <version>2.2.4.RELEASE</version>
    </dependency>
    

    配置配置文件,注意有
    factory.setAutoStartup(autoMark);
    方法,为kafka是否随系统自动启动,不需要的时候关闭即可,这个还挺好用的

    
    
    @Configuration
    @EnableKafka
    public class KafkaConfiguration {
        //写入配置文件中,控制项目启动时是否启用kafka
        private String kafkaListenerFlag = PropertiesUtil.getProperty("application", "datacenter.kafkaListenerFlag", "false");
      //写入配置文件中,控制kafka的IP端口连接
      private String kafkaIpPort = PropertiesUtil.getProperty("application", "datacenter.kafkaIpPort", "localhost:9092");
    
    
        //ConcurrentKafkaListenerContainerFactory为创建Kafka监听器的工程类,这里只配置了消费者
        @Bean
        public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            boolean autoMark = Boolean.parseBoolean(kafkaListenerFlag);
            System.out.println("是否启动kafka:" + autoMark);
            factory.setAutoStartup(autoMark);
            return factory;
        }
    
        //根据consumerProps填写的参数创建消费者工厂
        @Bean
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerProps());
        }
    
        //根据senderProps填写的参数创建生产者工厂
        @Bean
        public ProducerFactory<Integer, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(senderProps());
        }
    
        //kafkaTemplate实现了Kafka发送接收等功能
        @Bean
        public KafkaTemplate<Integer, String> kafkaTemplate() {
            KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
            return template;
        }
    
        //消费者配置参数
        private Map<String, Object> consumerProps() {
            Map<String, Object> props = new HashMap<>();
            //连接地址
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaIpPort);
            //GroupID
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "bootKafka");
            //是否自动提交
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            //自动提交的频率
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
            //Session超时设置
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
            //键的反序列化方式
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            //值的反序列化方式
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
    
        //生产者配置
        private Map<String, Object> senderProps() {
            Map<String, Object> props = new HashMap<>();
            //连接地址
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaIpPort);
            //重试,0为不启用重试机制
            props.put(ProducerConfig.RETRIES_CONFIG, 1);
            //控制批处理大小,单位为字节
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
            //批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            //生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
            //键的序列化方式
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            //值的序列化方式
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
    }
    
    

    配置监听者

    
    @Component
    public class DemoListener {
    
    
        //声明consumerID为demo,监听topicName为topic.quick.demo的Topic
        @KafkaListener(id = "demo", topics = "topic.quick.demo")
        public void listen(String msgData) {
    
            System.out.println("测试接收kafka: " + msgData + " 时间:" + new Date());
        }
    }
    
    

    模拟发送信息

    @RestController
    @RequestMapping("/test/kafka")
    
    public class KafKaAction extends BaseAction {
    
        @Autowired
        private KafkaTemplate<Integer, String> kafkaTemplate;
    
        @GetMapping(value = "/testKafKa")
        public ModelAndResult getDataBySocket() {
    
            for (int i = 0; i < 20; i++) {
                kafkaTemplate.send("topic.quick.demo", "测试kafka:" + i);
            }
            return new ModelAndResult(new Date());
        }
    }
    
    

    效率还是挺高的


    image.png

    注意:
    使用远程连接的时候,有时候kafka会去解析写在ZK里面的远程计算机的名字,而不是IP,如果没解析到正确的IP,就会报出 Can't resolve address [计算机:9092]的解析错误,网上很多方法都是改本地hosts来解析,个人觉得不靠谱,总不能每台机器都去改本机hosts吧?
    这里可以通过修改zk里面的配置来正确解析,登录到ZKUI界面,找到kafka,进入brokers/ids,将无法解析的计算机名字换成对应的IP地址即可正常解析,不需要改hosts文件


    image.png

    此时的kafka只有一个分区,即使你的消费者开启了多个实例,也只会有一个消费者能接收到信息,我们一般是需要多个消费者功能消费信息吧,这就要增加partitions节点了,就是加分区


    image.png

    输入命令
    windows的话在win下输入

    kafka-topics.bat --zookeeper [zk服务地址]:2181 --alter --partitions [需要加入的节点数] --topic [需要加入的topic]
    例如
    kafka-topics.bat --zookeeper localhost:2181 --alter --partitions 3 --topic topic.quick.demo
    

    加完后,注意此处分区数应该大于消费者数量,不然又要有消费者接不到


    image.png

    再次发送效果则被多消费者消费到
    消费者1:


    image.png

    消费者2:


    image.png

    暂时就这么多,有新内容再更新
    2019年11月16日10:59:30

    相关文章

      网友评论

          本文标题:kafka- windows与springboot2.1.X下使

          本文链接:https://www.haomeiwen.com/subject/hjfsictx.html