stream的作用:
引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
添加配置项
server.port=20001
spring.application.name=sys-tool-kafka-stream
kafka.topic=test
spring.kafka.bootstrap-servers=localhost:9092
spring.cloud.stream.bindings.output.destination=${kafka.topic}
spring.cloud.stream.bindings.input.destination=${kafka.topic}
功能测试
生产者bean
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
@EnableBinding(Source.class)
public class MessageProducerBean {
@Autowired
@Qualifier(Source.OUTPUT)
private MessageChannel messageChannel;
public void send(String message){
final boolean send = messageChannel.send(MessageBuilder.withPayload(message).build());
}
}
消费者bean
package com.tool.stream.kafka.stream.consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import javax.annotation.PostConstruct;
@EnableBinding(Sink.class)
public class MessageConsumerBean {
@Autowired
@Qualifier(Sink.INPUT)
private SubscribableChannel subscribableChannel;
@Autowired
private Sink sink;
//当bean注入完成后调用,以下三中方式用一种即可
//第一种方式
@PostConstruct
public void init() {
subscribableChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Object payload = message.getPayload();
System.out.println(new String((byte[]) payload));
}
});
}
//第二种方式
@ServiceActivator(inputChannel = Sink.INPUT)
public void onMessage(Object message) {
System.out.println("@ServiceActivator-->" +message);
}
//第三种方式
@StreamListener(Sink.INPUT)
public void onMessage(String message) {
System.out.println("@StreamListener-->" + message);
}
}
消息测试
注意这些和kafka有关,所以应该先开启kafka
package com.tool.stream.kafka;
import com.tool.stream.kafka.stream.producer.MessageProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
@RequestMapping("kafkastream")
@RestController
public class SysToolKafkaStreamApplication {
@Autowired
private MessageProducerBean messageProducerBean;
@RequestMapping("test")
public String send(String msg){
messageProducerBean.send(msg);
return "success";
}
public static void main(String[] args) {
SpringApplication.run(SysToolKafkaStreamApplication.class, args);
}
}
网友评论