美文网首页
单机测试kafka

单机测试kafka

作者: chen_666 | 来源:发表于2020-09-30 17:04 被阅读0次

    1.安装zookeeper

    安装完成后,修改配置文件名

    mv zoo_sample.cfg zoo.cfg
    

    2.启动zookeeper

    zkServer.sh start
    

    2.安装kafka

    安装完成后,修改/config/server.properties

    # 改成IP地址,用于intelij访问
    host.name=192.168.242.204
    

    3.启动kafka

    ./kafka-server-start.sh ../config/server.properties
    

    4.创建topic

    ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my_topic
    
    ./kafka-topics.sh --list --zookeeper localhost:2181
    

    5.创建消费者

    ./kafka-console-consumer.sh --zookeeper 192.168.242.204:2181 --topic my_topic --from-beginning
    

    6.创建生产者

    ./kafka-console-producer.sh --broker-list 192.168.242.204:9092 --topic my_topic
    

    7.编写java代码测试消息发送,注意一定要是同步的方式

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    public class KafkaTest {
    
        public  Properties props;
        @Before
        public void init()  {
            props = new Properties();
            props.put("bootstrap.servers", "192.168.242.204:9092");//kafka集群,broker-list
            props.put("acks", "all");
            props.put("retries", 1);//重试次数
            props.put("batch.size", 16384);//批次大小
            props.put("linger.ms", 1);//等待时间
            props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        }
    
    
        @Test
        public void sendMsg() throws ExecutionException, InterruptedException{
    
            KafkaProducer producer = new KafkaProducer(props);
            for (int i = 0;i<100;i++){
                producer.send(new ProducerRecord<String, String>("my_topic","hello kafka"+i)).get();
            }
        }
    }
    
    

    相关文章

      网友评论

          本文标题:单机测试kafka

          本文链接:https://www.haomeiwen.com/subject/aixpuktx.html