1.安装zookeeper
安装完成后,修改配置文件名
mv zoo_sample.cfg zoo.cfg
2.启动zookeeper
zkServer.sh start
2.安装kafka
安装完成后,修改/config/server.properties
# 改成IP地址,用于intelij访问
host.name=192.168.242.204
3.启动kafka
./kafka-server-start.sh ../config/server.properties
4.创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my_topic
./kafka-topics.sh --list --zookeeper localhost:2181
5.创建消费者
./kafka-console-consumer.sh --zookeeper 192.168.242.204:2181 --topic my_topic --from-beginning
6.创建生产者
./kafka-console-producer.sh --broker-list 192.168.242.204:9092 --topic my_topic
7.编写java代码测试消息发送,注意一定要是同步的方式
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Before;
import org.junit.Test;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaTest {
public Properties props;
@Before
public void init() {
props = new Properties();
props.put("bootstrap.servers", "192.168.242.204:9092");//kafka集群,broker-list
props.put("acks", "all");
props.put("retries", 1);//重试次数
props.put("batch.size", 16384);//批次大小
props.put("linger.ms", 1);//等待时间
props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
@Test
public void sendMsg() throws ExecutionException, InterruptedException{
KafkaProducer producer = new KafkaProducer(props);
for (int i = 0;i<100;i++){
producer.send(new ProducerRecord<String, String>("my_topic","hello kafka"+i)).get();
}
}
}
网友评论