一、Spring Cloud Stream介绍
Spring Cloud Stream是一个建立在Spring Boot和Spring Integration之上的框架,有助于创建事件驱动或消息驱动的微服务。在本文中,我们将通过一些简单的例子来介绍Spring Cloud Stream的概念和构造。
在 SpringBoot 的之中为了方便开发者去整合消息组件,也提供有一系列的处理支持,整合方式无非是在pom.xml中引入jar包,在配置文件定义好配置,然后通过@Autowired注解将RabbitTemplate引入,之后做业务处理即可。只是在 SpringCloud 里面将消息整合的处理操作进行了进一步的抽象操作, 实现了更加简化的消息处理。在整个 SpringCloud 之中支持有 RabbitMQ、Kafka 组件的消息系统。使得我们可以实现消息的生产端和消费端分别使用RabbitMQ、Kafka。
利用 SpringCloudStream 可以实现更加方便的消息系统的整合处理。SpringCloudStream 就是实现了 MDB 功能,同时可以更加简化方便的整合消息组件。
二、Spring Cloud Stream与RMQ整合原理
说明:最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消息消费者,顶层可以向绑定层生产消息和和获取消息消费
- Barista接口:Barista接口是定义来作为后面类的参数,这一接口定义通道类型和通道名称,
通道名称
是作为配置用
,通道类型
则决定
了app会使用这一通道进行发送消息
还是从中接收消息
。 - @Output: 输出注解,用于定义发送消息接口
- @Input: 输入注解,用于定义消息的消费者接口
- @StreamListener: 用于定义监听方法的注解
使用Spring Cloud Stream 非常简单,只需要使用好3个注解即可,在实现高性能消息的生成和消费场景非常合适,但是使用Spring Cloud Stream框架有一个非常大的问题就是不能实现可靠性投递
,也就是没法保证消息的100%可靠性,会存在少量消息丢失
问题。
这个原因是因为SpringCloudStream框架为了和Kafka兼顾
所有在实际工作中使用它的目的就是针对高性能的消息通信
的!
三、Spring Cloud Stream与RMQ整合的代码实现
因为Spring Cloud为微服务架构,在领域设计中一般将消息的生产者和消费者分开,此处我们也按2个微服务项目来讨论整合步骤。
针对消息生产者:
1、在pom.xml中添加maven依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>1.3.4.RELEASE</version>
</dependency>
2、在application.properties或application.yml中添加自定义配置
spring.cloud.stream.bindings.output_channel.destination=exchange-3
spring.cloud.stream.bindings.output_channel.group=queue-3
spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster
spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.11.76:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
3、创建Barista.java类
package com.bfxy.rabbitmq.stream;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* <B>中文类名:</B><BR>
* <B>概要说明:</B><BR>
* 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
* 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
*/
public interface Barista {
String OUTPUT_CHANNEL = "output_channel";
//注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题
//注解@Output声明了它是一个输出类型的通道,名字是output_channel。这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。
@Output(Barista.OUTPUT_CHANNEL)
MessageChannel logoutput();
// String INPUT_BASE = "queue-1";
// String OUTPUT_BASE = "queue-1";
// @Input(Barista.INPUT_BASE)
// SubscribableChannel input1();
// MessageChannel output1();
}
4、创建生产者发送消息的类
首先给类加上@EnableBinding(Barista.class),将我们之前创建的Barista类初始化,然后 @Autowired private Barista barista; 注入进来,最后写send方法的具体实现。
package com.bfxy.rabbitmq.stream;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@EnableBinding(Barista.class)
@Service
public class RabbitmqSender {
@Autowired
private Barista barista;
// 发送消息
public String sendMessage(Object message, Map<String, Object> properties) throws Exception {
try{
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
boolean sendStatus = barista.logoutput().send(msg);
System.err.println("--------------sending -------------------");
System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
}catch (Exception e){
System.err.println("-------------error-------------");
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
return null;
}
}
针对消息的消费端
1、在pom.xml中添加maven依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>1.3.4.RELEASE</version>
</dependency>
2、在application.properties或application.yml中添加自定义配置
spring.cloud.stream.bindings.input_channel.destination=exchange-3
spring.cloud.stream.bindings.input_channel.group=queue-3
spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster
spring.cloud.stream.bindings.input_channel.consumer.concurrency=1
spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false
spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL
spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000
spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true
spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5
spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.11.76:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
消费端有个性化设置,配置比生产端要多
3、创建Barista.java类
package com.bfxy.rabbitmq.stream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* <B>中文类名:</B><BR>
* <B>概要说明:</B><BR>
* 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
* 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
* @author ashen(Alienware)
*/
public interface Barista {
String INPUT_CHANNEL = "input_channel";
//注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题
@Input(Barista.INPUT_CHANNEL)
SubscribableChannel loginput();
}
4、创建消费者消费消息的类
package com.bfxy.rabbitmq.stream;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
@EnableBinding(Barista.class)
@Service
public class RabbitmqReceiver {
@StreamListener(Barista.INPUT_CHANNEL)
public void receiver(Message message) throws Exception {
Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
System.out.println("Input Stream 1 接受数据:" + message);
System.out.println("消费完毕------------");
channel.basicAck(deliveryTag, false);
}
}
最后新建一个测试类,
package com.bfxy.rabbitmq;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.http.client.utils.DateUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.bfxy.rabbitmq.stream.RabbitmqSender;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Autowired
private RabbitmqSender rabbitmqSender;
@Test
public void sendMessageTest1() {
for(int i = 0; i < 1; i ++){
try {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put("SERIAL_NUMBER", "12345");
properties.put("BANK_NUMBER", "abc");
properties.put("PLAT_SEND_TIME", DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"));
rabbitmqSender.sendMessage("Hello, I am amqp sender num :" + i, properties);
} catch (Exception e) {
System.out.println("--------error-------");
e.printStackTrace();
}
}
//TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
}
启动测试类,消费者和生产这款,查看生产者控制台发送数据成功,消费者控制台消费数据成功,rabbitmq管控台消息发送消费成功。
网友评论