美文网首页
kafka 单机部署

kafka 单机部署

作者: 飞鹰雪玉 | 来源:发表于2020-11-13 19:43 被阅读0次

    一 、部署服务

    1、下载(一定要下载二进制的包,而不是源码的包)
    https://www.apache.org/dyn/closer.cgi?path=/kafka/2.5.0/kafka_2.13-2.5.0.tgz
    2、安装

    mv kafka_2.13-2.5.0.tgz /usr/local
    cd /usr/local
    tar -zxvf kafka_2.13-2.5.0.tgz
    cd kafka_2.13-2.5.0
    

    3 、启动zookeeper(kafka自带的二进制包文件里面就有zookeeper)

    > bin/zookeeper-server-start.sh config/zookeeper.properties
    INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    ...
    

    4、启动kafka
    另开一个终端窗口

    > bin/kafka-server-start.sh config/server.properties
    INFO Verifying properties (kafka.utils.VerifiableProperties)
    INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
    ...
    

    这个时候zookeeper 和kafka 都已经启动
    5 创建一个 topic

    另开一个终端窗口
    创建一个名为“test”的topic,它有一个分区和一个副本:

    > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    

    现在我们可以运行list(列表)命令来查看这个topic:

    > bin/kafka-topics.sh --list --zookeeper localhost:2181
    test
    

    6 、发送一些消息
    另开一个终端窗口

    Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。默认情况下,每行将作为单独的message发送。

    运行 producer,然后在控制台输入一些消息以发送到服务器。

    [root@localhost kafka_2.13-2.5.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    this is a message from liupeng
    this is another message for test
    liuminbo is a good boy
    
    

    7 、启动一个 consumer
    另开一个终端窗口
    Kafka 还有一个命令行consumer(消费者),将消息转储到标准输出。

    [root@localhost kafka_2.13-2.5.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    >this is a message from liupeng
    >this is another message for test 
    >liuminbo is a good boy
    
    

    所有的命令行工具都有其他选项;运行不带任何参数的命令将显示更加详细的使用信息。

    二、 java程序嵌入

    image.png

    1、引入依赖
    pom.xml

            <!-- kafka -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.5.0</version>
            </dependency>
    

    2、编写produce

    package org.springblade.common.kafka;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    /**
     * Kafka Producer
     *
     * @author liupeng
     * @date 2020-11-12
     */
    @Slf4j
    public class ProduceKafka {
    
        private static final String TOPIC = "test";
        private static final String BROKER_LIST = "localhost:9092";
        private static KafkaProducer<String,String> producer = null;
    
        // 初始化生产者
        static {
            Properties configs = initConfig();
            producer = new KafkaProducer<>(configs);
        }
    
        /**
         * 初始化配置
         */
        private static Properties initConfig(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            return properties;
        }
    
        public static void main(String[] args) {
            //消息实体
            ProducerRecord<String , String> record = new ProducerRecord<>(TOPIC,"message"+"this is a message");
            producer.send(record);
            producer.close();
        }
    
    }
    
    

    3、编写consumer

    package org.springblade.common.kafka;
    
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Properties;
    
    /**
     * Kafka Consumer
     *
     * @author liupeng
     * @date 2020-11-12
     */
    @Slf4j
    public class ConsumerKafka {
    
    
        private static final String BROKER_LIST = "localhost:9092";
        private static KafkaConsumer<String,String> consumer = null;
    
        static {
            Properties configs = initConfig();
            consumer = new KafkaConsumer<>(configs);
        }
    
        private static Properties initConfig(){
            Properties properties = new Properties();
            properties.put("bootstrap.servers",BROKER_LIST);
            properties.put("group.id","0");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("enable.auto.commit", "true");
            properties.setProperty("auto.offset.reset", "earliest");
            return properties;
        }
    
    
        public static void main(String[] args) {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(10);
                for (ConsumerRecord<String, String> record : records) {
                    log.info(String.valueOf(record));
                }
            }
        }
    
    }
    
    

    运行produce的main函数

    consumer终端窗口可以看见发送的消息


    image.png

    官方文档中文版:
    https://kafka.apachecn.org/uses.html

    相关文章

      网友评论

          本文标题:kafka 单机部署

          本文链接:https://www.haomeiwen.com/subject/opsbbktx.html