美文网首页
kafka简介

kafka简介

作者: 紫玥迩 | 来源:发表于2017-05-17 15:12 被阅读167次

    简介

    消息中间件
    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。
    broker
    topic
    partition
    producer
    consumer
    consumer group

    安装

    1. 下载解压
    2. 启动zookeeper
    3. 修改config/server.proprties的:
    log.dirs=E:\data\kafka-logs
    zookeeper.connect=10.129.83.213:2181
    listeners=PLAINTEXT://10.143.47.32:9092
    
    1. 启动命令
      kafka_2.12-0.10.2.1 需要jdk1.8
      kafka_2.11-0.10.0.1 需要jdk1.7
      Kafka Shell基本命令(包括topic的增删改查)
    #win
    .\bin\windows\kafka-server-start.bat .\config\server.properties
    #linux
     bin/kafka-server-start.sh config/server.properties
    #创建一个名为“test”的Topic,只有一个分区和一个备份
    bin/kafka-topics.sh --create --zookeeper 10.129.83.213:2181 --replication-factor 1 --partitions 1 --topic test
    #查看topic
    bin/kafka-topics.sh --list --zookeeper 10.129.83.213:2181
    #查看consumer-groups
    bin/kafka-consumer-groups.sh --list --bootstrap-server 10.143.47.32:9092
    bin/kafka-consumer-groups.sh --bootstrap-server 10.143.47.32:9092 --describe --group myGroup
    #查看消费了多少数据
     bin/kafka-run-class.sh  kafka.tools.ConsumerOffsetChecker --group myGroup  --topic test  --zookeeper 10.129.83.213:2181
    #查看test详细信息
    bin/kafka-topics.sh --describe --topic test --zookeeper 10.129.83.213:2181
    #发送消息
    bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test 
    #发送消息
    bin/kafka-console-producer.sh --broker-list 10.143.47.32:9092 --topic test 
    #消息内容
    This is a message
    This is another message
    Hello World
    #消费消息
    #消费者线程数必须是小等于topic的partition分区数
    bin/kafka-console-consumer.sh --zookeeper 10.129.83.213:2181 --topic test --from-beginning
    #消费消息
    bin/kafka-console-consumer.sh --bootstrap-server 10.143.47.32:9092 --topic test --from-beginning
    
    Paste_Image.png

    每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
    无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:

    基于时间:log.retention.hours=168
    基于大小:log.retention.bytes=1073741824
    

    producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:

    指定了 patition,则直接使用;
    未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
    patition 和 key 都未指定,使用轮询选出一个 patition。
    
    1. kafka_2.10-0.10.2.0 需要jdk1.7(springboot 1.5.3集成)
      pom.xml
    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>1.5.3.RELEASE</version>
            <exclusions>  
                        <exclusion>  
                            <groupId>org.springframework.boot</groupId>  
                            <artifactId>spring-boot-starter-logging</artifactId>  
                        </exclusion>  
                    </exclusions> 
        </dependency>
        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <version>1.5.3.RELEASE</version>
            </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.2.1.RELEASE</version>
        </dependency>
        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-log4j2</artifactId>
                <version>1.5.3.RELEASE</version>
            </dependency>
    

    application.properties

    spring.application.name=springboot-kafka-test
    #kafka
    spring.kafka.bootstrap-servers=10.143.47.32:9092
    spring.kafka.consumer.group-id=myGroup
    #charset
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.template.default-topic=test
    spring.kafka.listener.concurrency=1
    spring.kafka.producer.batch-size=1000
    #log4j2
    logging.config=classpath:log4j2.xml
    

    配置

    @Configuration
    @EnableKafka
    public class KafkaConfig {
    }
    

    生产者

    @Component
    public class MsgProducer {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
        
        public void send(String value){
            System.out.println("send start-----------");
            kafkaTemplate.send("test", value+"1");
            kafkaTemplate.send("test", value+"2");
            System.out.println("send end-----------");
        }
    }
    

    消费者

    @Component
    public class MsgConsumer {
        static Logger subscribelogger = LoggerFactory.getLogger("subscribelogger"); 
        
        @KafkaListener(topics="test")
        public void processMsg1(String s){
            subscribelogger.info("{}|{}","myGroup",s); 
        }
        
        /*@KafkaListener(topics="test")
        public void processMsg(ConsumerRecord<?, ?> record){
            subscribelogger.info("{}|{}","myGroup1",record.value());    
        }*/
    }
    

    参考

    kafka入门经典教程
    kafka教程

    相关文章

      网友评论

          本文标题:kafka简介

          本文链接:https://www.haomeiwen.com/subject/wrvkjttx.html