美文网首页
kakfa 测试接入

kakfa 测试接入

作者: 西5d | 来源:发表于2020-01-15 20:05 被阅读0次

    介绍kafka

    kafka 是一个常用的消息队列组件,广泛的应用在分布式场景中,且具有非常优秀的性能。这篇简单介绍一个kafka的使用示例,对原理后边单独做文章补充,目前手头的资料不多。另外挖个坑,下次准备Redis相关原理和应用的文章,项目中也用的比较多,十分有必要进行更深入的了解。

    下载和使用

    下载

    直接去官网下载,这里只是示例,没关注版本。如果是业务系统,一定要注意不同版本的改动。 取最新的2.12-2.4.0 (截止2020-01-13)。正常下载之后解压

     ---kafka_2.12-2.4.0
      --/bin
      --/config
      --/logs
      --/libs
    

    其中bin目录下是所有的启动和管理命令,config目录下是对应的配置文件。

    启动

    1. 首先启动zookeeper, 使用命令 bin/zookeeper-server-start.sh config/zookeeper.properties , 如果报错按错误信息修改,可能要改动config/zookeeper.properties配置文件。我们知道kafka使用zookeeper来管理元数据,确认启动没有报错完成后就可以进行下一步,如果没有启动zookeeper,kafka是无法启动的。

    2. 启动kafka server。 使用命令 bin/kafka-server-start.sh config/server.properties 。有个疑惑,刚开始将config/producer.properties 作为启动配置,添加了zookeeper地址后也成功了,暂时推测是使用了默认的参数。

    代码

    创建一个producer , 发送个时间标记,如下:

     String topic = "test_topic";
     @Test
     public void product() throws InterruptedException, ExecutionException {
         Properties properties = new Properties();
         properties.setProperty("bootstrap.servers", "localhost:9092");
         properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
         while (true) {
             ProducerRecord<String, String> record = new ProducerRecord<>(topic, LocalDateTime.now().toString() + " from demo producer");
             Future<RecordMetadata> future = producer.send(record);
             RecordMetadata metadata = future.get();
             System.out.println(metadata);
             TimeUnit.SECONDS.sleep(10);
         }
     }
    
    

    创建一个consumer , 获取信息并打印出来,如下:

        @Test
        public void consumer() throws InterruptedException {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9092");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("group.id", "3");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
            consumer.subscribe(Lists.newArrayList(topic));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
                for (ConsumerRecord<String, String> record : records.records(topic)) {
                    System.out.println("----------------consumer");
                    System.out.println(record.value());
                }
                TimeUnit.SECONDS.sleep(20);
            }
        }
    
    

    至此,例子就能够跑通了。

    相关文章

      网友评论

          本文标题:kakfa 测试接入

          本文链接:https://www.haomeiwen.com/subject/vlcxzctx.html