kafka入门

作者: 喷气式蜗牛 | 来源:发表于2018-01-11 00:28 被阅读22次

1.环境配置

kafka依赖zookeeper来调度,以及选举leader,因此需要先安装zookeeper

1.1 安装zookeeper

点击下载zookeeper下载合适版本的zookeeper,当前最新的稳定版本是3.4.9创建好数据目录,命名为data,下一步配置用到

$ cd opt/ && tar -zxf zookeeper-3.4.6.tar.gz  && cd zookeeper-3.4.6
$ mkdir data

1.2 配置zookeeper

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

1.3 启动zookeeper

$ bin/zkServer.sh start

相应的停止zookeeper的命令为:

 $ bin/zkServer.sh stop

1.4 启动zookeeper CLI

$ bin/zkCli.sh

1.2 安装kafka

1.2.1 下载并解压

点击下载kafka的压缩包

$ cd opt/
$ tar -zxf kafka_2.11-0.10.1.0.tgz
$ cd kafka_2.11-0.10.1.0

1.3.1 启动和关闭Kafka

启动kafka

$ bin/kafka-server-start.sh config/server.properties

关闭kafka

$ bin/kafka-server-stop.sh config/server.properties

2.测试单broker

我的kafka服务创建在Linux虚拟机上,IP地址为:192.168.61.131(按需替换成自己的IP地址),在这里需要配置server.properties文件,将advertised.host.name设置为虚拟机的IP地址 advertised.host.name=192.168.61.131,否则在宿主机上无法访问虚拟机上面的服务

2.1 使用Shell命令测试topic

2.1.1 创建topic

在命令行界面kafka目录,输入下面命令:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic page_visits

2.1.2 测试发布者

输入以下命令,打开发布消息CLI

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic page_visits

在CLI界面输入,两行测试消息

Hello kafka
你好吗?

2.1.3 测试订阅者

输入一下命令打开订阅者CLI

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper localhost:2181 --from-beginning --topic page_visits

如果执行正确,会显示刚才发布者发送的两行消息

2.2 使用Java代码创建Client来发布订阅消息

需要先在pom中添加kafka依赖:

     <dependencies>
             <dependency>
                 <groupId>org.apache.kafka</groupId>
                 <artifactId>kafka_2.9.2</artifactId>
                 <version>0.8.1.1</version>
                 <scope>compile</scope>
                 <exclusions>
                     <exclusion>
                         <artifactId>jmxri</artifactId>
                         <groupId>com.sun.jmx</groupId>
                     </exclusion>
                     <exclusion>
                         <artifactId>jms</artifactId>
                         <groupId>javax.jms</groupId>
                     </exclusion>
                     <exclusion>
                         <artifactId>jmxtools</artifactId>
                         <groupId>com.sun.jdmk</groupId>
                     </exclusion>
                 </exclusions>
             </dependency>
             <dependency>
                 <groupId>org.apache.kafka</groupId>
                 <artifactId>kafka-clients</artifactId>
                 <version>0.9.0.0</version>
             </dependency>
     
         </dependencies>

2.2.1 创建发布者发布消息

下面一段代码,会每隔3秒中发布一个测试消息

  public class MyProducer {
      private final static String TOPIC = "page_visits";
  
      public static void main(String[] args) throws InterruptedException {
          long events = 100;
          Properties properties = new Properties();
          properties.put("metadata.broker.list", "192.168.61.131:9092");
          properties.put("serializer.class", "kafka.serializer.StringEncoder");
  
          ProducerConfig config = new ProducerConfig(properties);
          Producer<String, String> producer = new Producer<String, String>(config);
          for (long nEvent = 0; nEvent< events; nEvent++){
              SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
              KeyedMessage<String,String> data = new KeyedMessage<String, String>(TOPIC,String.valueOf(nEvent),"Test message from java program " + sdf.format(new Date()));
              Thread.sleep(3000);
              producer.send(data);
          }
          producer.close();
  
  
      }
  }

2.2.2 创建订阅者订阅消息

下面的代码会绑定到虚拟机长的kafka服务,当发布者发布消息时,订阅者会不断地打印发布者发布的消息:

public class MyConsumer {
    private final static String TOPIC = "page_visits";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers","192.168.61.131:9092");
        properties.put("enable.auto.commit", "true");
        properties.put("group.id", "test");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList(TOPIC));
        System.out.println("Subscribe to topic "+TOPIC);
        while (true){
            ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
            for(ConsumerRecord<String,String> record: consumerRecords){
                System.out.printf("offset = %d,key = %s,value = %s\n",record.offset(),record.key(),record.value());
            }
        }

    }
}
运行结果

相关文章

  • Kafka视频集

    kafka企业级入门实战完整版 Kafka系列教程 Kafka入门 分布式消息通信Kafka原理剖析 阿里架构师直...

  • 再看kafka——spring boot集成kafka

    之前自己写过一篇入门文章kafka简单入门及与spring boot整合,主要是结合kafka官方的文档入门,学习...

  • Kafka学习

    MQ入门总结(六)Kafka的原理和使用 Kafka的架构原理,你真的理解吗? 真的,Kafka 入门一篇文章就够...

  • Kafka快速开始

    入门 1.简介 Kafka is a distributed streaming platform,kafka是一...

  • 【kafka】为什么要学习Kafka?

    KAFKA官方文档入门指南 http://ifeve.com/kafka-1 为什么要学习Kafka? http:...

  • (3)kafka的安装部署以及基本操作

    1.kafka 的安装部署 可以去看kafka的快速入门:http://kafka.apache.org/quic...

  • kafka入门

    Apache Kafka 入门 1.kafka简介和产生的背景 什么是 Kafka Kafka 是一款分布式消息发...

  • kafka极简入门(四)--常用配置

    回顾:kafka极简入门(三)--创建topic 前言 kafka针对broker, topic, produce...

  • kafka极简入门(二) --安装

    回顾kafka极简入门(一) --简介 1.单机版kafka安装 kafka需要结合zookeeper使用,所以本...

  • Kafka入门

    Kafka官网:http://kafka.apache.org/入门1.1 介绍Kafka™ 是一个分布式流处理系...

网友评论

    本文标题:kafka入门

    本文链接:https://www.haomeiwen.com/subject/peoanxtx.html