介绍kafka
kafka 是一个常用的消息队列组件,广泛的应用在分布式场景中,且具有非常优秀的性能。这篇简单介绍一个kafka的使用示例,对原理后边单独做文章补充,目前手头的资料不多。另外挖个坑,下次准备Redis相关原理和应用的文章,项目中也用的比较多,十分有必要进行更深入的了解。
下载和使用
下载
直接去官网下载,这里只是示例,没关注版本。如果是业务系统,一定要注意不同版本的改动。 取最新的2.12-2.4.0 (截止2020-01-13)。正常下载之后解压
---kafka_2.12-2.4.0
--/bin
--/config
--/logs
--/libs
其中bin目录下是所有的启动和管理命令,config目录下是对应的配置文件。
启动
-
首先启动zookeeper, 使用命令
bin/zookeeper-server-start.sh config/zookeeper.properties
, 如果报错按错误信息修改,可能要改动config/zookeeper.properties
配置文件。我们知道kafka使用zookeeper来管理元数据,确认启动没有报错完成后就可以进行下一步,如果没有启动zookeeper,kafka是无法启动的。 -
启动kafka server。 使用命令
bin/kafka-server-start.sh config/server.properties
。有个疑惑,刚开始将config/producer.properties
作为启动配置,添加了zookeeper地址后也成功了,暂时推测是使用了默认的参数。
代码
创建一个producer , 发送个时间标记,如下:
String topic = "test_topic";
@Test
public void product() throws InterruptedException, ExecutionException {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
while (true) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, LocalDateTime.now().toString() + " from demo producer");
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
System.out.println(metadata);
TimeUnit.SECONDS.sleep(10);
}
}
创建一个consumer , 获取信息并打印出来,如下:
@Test
public void consumer() throws InterruptedException {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "3");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Lists.newArrayList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : records.records(topic)) {
System.out.println("----------------consumer");
System.out.println(record.value());
}
TimeUnit.SECONDS.sleep(20);
}
}
至此,例子就能够跑通了。
网友评论