起因
在拆分项目时,自然而然的引入消息队列来解耦。例:如果没有消息队列 业务调用可能A->B->A,如果B可以调用A,A又可以调用B自然会形成循环引用,为了解决这个问题 B调A可以改成一次通知,A收到通知处理对应业务员即可。
为什么选redis stream
1 不希望引入其它中间件
2 业务规模目前不大 ,可以试水
3 redis stream 比kafka还快?
和springboot 集成
由于该功能目前比较新,个别功能在最新的springboot中才有,所以选择了spring boot 2.4.8
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
application.yml
spring:
redis:
host: 127.0.0.1
port: 6379
database: 0
time-out: 2592000000
lettuce:
shutdown-timeout: 200
pool:
max-active: 100
max-wait: -1
max-idle: 80
min-idle: 0
stream:
maxLen: 100 #订阅最大消息数,防止消息队列无限增大
conf
1.自定义注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RedisStreamMqListen {
String value();
Class type();
}
2.监听和发送方法封装
@Component
public class RedisStreamMqStartService {
private static final Logger log = LoggerFactory.getLogger(RedisStreamMqStartService.class);
private final long dataCenterId = getDataCenterId();
private final StringRedisTemplate redisTemplate;
@Value("${spring.application.name:default}")
private String group;
@Value("${spring.redis.stream.maxLen:100}")
long maxLen = 1000;
public RedisStreamMqStartService(@Qualifier("stringRedisTemplate") StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
void listener(String event, Class type, StreamListener streamListener) {
createGroup(event);
startSubscription(event, type, streamListener);
}
public <V> void coverSend(String event, V val) {
ObjectRecord<String, V> record = StreamRecords.newRecord()
.ofObject(val)
.withId(RecordId.autoGenerate())
.withStreamKey(event);
redisTemplate.opsForStream().add(record);
redisTemplate.opsForStream().trim(event, maxLen, true);
log.info("event {} send content {}", event, val);
}
private void startSubscription(String event, Class type, StreamListener streamListener) {
RedisConnectionFactory redisConnectionFactory = redisTemplate.getConnectionFactory();
StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.targetType(type)
.build();
StreamMessageListenerContainer listenerContainer = StreamMessageListenerContainer
.create(redisConnectionFactory, options);
listenerContainer.receiveAutoAck(
Consumer.from(group, group + dataCenterId),
StreamOffset.create(event, ReadOffset.lastConsumed()),
streamListener);
listenerContainer.start();
}
private void createGroup(String event) {
try {
redisTemplate.opsForStream().createGroup(event, group);
} catch (RedisSystemException e) {
if (e.getRootCause().getClass().equals(RedisBusyException.class)) {
log.info("STREAM - Redis group already exists, skipping Redis group creation: order");
} else if (e.getRootCause().getClass().equals(RedisCommandExecutionException.class)) {
log.info("STREAM - Stream does not yet exist, creating empty stream: event-stream");
// TODO: There has to be a better way to create a stream than this!?
redisTemplate.opsForStream().add(event, Collections.singletonMap("", ""));
redisTemplate.opsForStream().createGroup(event, group);
} else throw e;
}
}
private static Long getDataCenterId() {
try {
String hostName = Inet4Address.getLocalHost().getHostName();
int[] ints = StringUtils.toCodePoints(hostName);
int sums = 0;
for (int b : ints) {
sums += b;
}
return (long) (sums % 32);
} catch (UnknownHostException e) {
// 如果获取失败,则使用随机数备用
return RandomUtils.nextLong(0, 31);
}
}
}
3.项目启动后自动对注解添加监听
@Component
@ConditionalOnBean(RedisStreamMqStartService.class)
public class ListenAnnotation implements ApplicationListener<ContextRefreshedEvent> {
private static final Logger log = LoggerFactory.getLogger(ListenAnnotation.class);
final RedisStreamMqStartService redisStreamMqStartService;
public ListenAnnotation(RedisStreamMqStartService redisStreamMqStartService) {
this.redisStreamMqStartService = redisStreamMqStartService;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().getParent() == null) {
Map<String, Object> beans = event.getApplicationContext().getBeansWithAnnotation(RedisStreamMqListen.class);
for (Object bean : beans.values()) {
RedisStreamMqListen ca = bean.getClass().getAnnotation(RedisStreamMqListen.class);
redisStreamMqStartService.listener(ca.value(), ca.type(), (StreamListener) bean);
log.info("event {} start listen", ca.value());
}
}
}
}
具体应用
1.定义的消息体
public class TestMessage1 {
public String content;
}
public class TestMessage2 {
public String content;
}
2.发送消息测试
@SpringBootTest
public class ProducerApplicationTest {
final String event1 = "message1Listener";
final String event2 = "message2Listener";
@Autowired
RedisStreamMqStartService redisStreamMqStartService;
@Test
public void sendMsg() throws InterruptedException {
for (int i = 0; i < 300; i++) {
if (i % 2 == 1) {
TestMessage1 testMessage1 = new TestMessage1();
testMessage1.content = "testMessage1:" + i;
redisStreamMqStartService.coverSend(event1, testMessage1);
} else {
TestMessage2 testMessage2 = new TestMessage2();
testMessage2.content = "testMessage2:" + i;
redisStreamMqStartService.coverSend(event2, testMessage2);
}
Thread.sleep(1000l);
}
}
}
3.实现监听类
@Component
@RedisStreamMqListen(value = "message1Listener", type = TestMessage1.class)
public class Message1Listener implements StreamListener<String, ObjectRecord<String, TestMessage1>> {
@Override
public void onMessage(ObjectRecord<String, TestMessage1> message) {
System.out.println(message.getValue().content);
}
}
@Component
@RedisStreamMqListen(value = "message2Listener", type = TestMessage2.class)
public class Message2Listener implements StreamListener<String, ObjectRecord<String, TestMessage2>> {
@Override
public void onMessage(ObjectRecord<String, TestMessage2> message) {
System.out.println(message.getValue().content);
}
}
具体源码:github
备注:目前只是简单的封装了自动载入监听,ack,最大队列数。之后想把这个做成工具类,加入死信,延迟消费等消息中件间功能,可惜实力不允许,慢慢来。
2021-08-09
关于springboot-starter的封装
这次重构主要解决问题:springbootApplication注解默认扫描的是启动类及子目录中所有的类,然而封装为jar包不可能总是在启动类的子包内,所以要依靠引入核心类AutoConfigure,之前的初始加载类的@Component都可以删掉,改为Configuration注入
@Configuration
public class RedisStreamMqAutoConfigure {
@Value("${spring.application.name:default}")
private String group;
@Value("${spring.redis.stream.maxLen:100}")
long maxLen = 1000;
@Bean
@ConditionalOnMissingBean(RedisStreamMqStartService.class)
public RedisStreamMqStartService redisStreamMqStartService(StringRedisTemplate stringRedisTemplate) {
return new RedisStreamMqStartServiceImpl(stringRedisTemplate, group, maxLen);
}
@Bean
@ConditionalOnBean(RedisStreamMqStartService.class)
public ListenAnnotation listenAnnotation(RedisStreamMqStartService redisStreamMqStartService) {
return new ListenAnnotation(redisStreamMqStartService);
}
}
这样还有一个新问题就是AutoConfigure何时注入?
1在resourses下新建文件/META-INF/spring.factories
2写入
rg.springframework.boot.autoconfigure.EnableAutoConfiguration=com.github.ynhj123.redismq.stream.autoConfigure.RedisStreamMqAutoConfigure
这样其它项目引入该jar包都会自动装配RedisStreamMqStartService和ListenAnnotation
2022-02-18
最近发现一篇文章分析redis与专业消息中间件的区别写的非常好,拿来分享
把Redis当作队列来用,真的合适吗?
网友评论