本文的示例代码参考KafkaBasic
目录
Kafka
搭建
docker run --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime -d zookeeper
docker run --name kafka -p 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=192.168.1.76:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.76 --env KAFKA_ADVERTISED_PORT=9092 -v /etc/localtime:/etc/localtime -d wurstmeister/kafka
测试
docker exec -it kafka /bin/bash
cd /opt/kafka_2.11-2.0.0/
# 创建topic
bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper zookeeper:2181
# 消息生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# hello kafka
docker exec -it kafka /bin/bash
cd /opt/kafka_2.11-2.0.0/
# 消息消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# hello kafka
Spring
spring init -b 1.5.6.RELEASE -dweb,kafka --build gradle KafkaBasic && cd KafkaBasic
vim ./src/main/resources/application.properties
spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=test
Model
vim ./src/main/java/com/example/KafkaBasic/Message.java
package com.example.KafkaBasic;
import java.util.Date;
public class Message {
private Long id;
private String msg;
private Date sendTime;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Date getSendTime() {
return sendTime;
}
public void setSendTime(Date sendTime) {
this.sendTime = sendTime;
}
}
Sender
vim ./src/main/java/com/example/KafkaBasic/Sender.java
package com.example.KafkaBasic;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.UUID;
@Component
public class Sender {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public String send() throws JsonProcessingException {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(UUID.randomUUID().toString());
message.setSendTime(new Date());
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(message);
kafkaTemplate.send("test", json);
return json;
}
}
vim ./src/main/java/com/example/KafkaBasic/SenderConfig.java
package com.example.KafkaBasic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class SenderConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String,Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "0");
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
Receiver
vim ./src/main/java/com/example/KafkaBasic/Receiver.java
package com.example.KafkaBasic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
public class Receiver {
@KafkaListener(topics = {"test"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println(message);
}
}
}
Controller
vim ./src/main/java/com/example/KafkaBasic/TestsController.java
package com.example.KafkaBasic;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/tests")
public class TestsController {
@Autowired
private Sender sender;
@GetMapping
public String index() throws JsonProcessingException {
return sender.send();
}
}
- 测试
./gradlew bootrun
curl localhost:8080/tests | json
{
"id": 1541415087205,
"msg": "3068cc02-8122-459d-b8cf-02ac240ae573",
"sendTime": 1541415087205
}
终端打印:
{"id":1541415087205,"msg":"3068cc02-8122-459d-b8cf-02ac240ae573","sendTime":1541415087205}
网友评论