美文网首页
2021-08-04 springboot 2.x redisT

2021-08-04 springboot 2.x redisT

作者: 江江江123 | 来源:发表于2021-08-06 11:13 被阅读0次

起因

在拆分项目时,自然而然的引入消息队列来解耦。例:如果没有消息队列 业务调用可能A->B->A,如果B可以调用A,A又可以调用B自然会形成循环引用,为了解决这个问题 B调A可以改成一次通知,A收到通知处理对应业务员即可。

为什么选redis stream

1 不希望引入其它中间件
2 业务规模目前不大 ,可以试水
3 redis stream 比kafka还快?

和springboot 集成

由于该功能目前比较新,个别功能在最新的springboot中才有,所以选择了spring boot 2.4.8
依赖:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.11</version>
        </dependency>

application.yml

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    time-out: 2592000000
    lettuce:
      shutdown-timeout: 200
      pool:
        max-active: 100
        max-wait: -1
        max-idle: 80
        min-idle: 0
     stream:
        maxLen: 100 #订阅最大消息数,防止消息队列无限增大

conf

1.自定义注解

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RedisStreamMqListen {
    String value();

    Class type();
}

2.监听和发送方法封装

@Component
public class RedisStreamMqStartService {
    private static final Logger log = LoggerFactory.getLogger(RedisStreamMqStartService.class);
    private final long dataCenterId = getDataCenterId();

    private final StringRedisTemplate redisTemplate;

    @Value("${spring.application.name:default}")
    private String group;
    @Value("${spring.redis.stream.maxLen:100}")
    long maxLen = 1000;

    public RedisStreamMqStartService(@Qualifier("stringRedisTemplate") StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }


    void listener(String event, Class type, StreamListener streamListener) {
        createGroup(event);
        startSubscription(event, type, streamListener);
    }

    public <V> void coverSend(String event, V val) {
        ObjectRecord<String, V> record = StreamRecords.newRecord()
                .ofObject(val)
                .withId(RecordId.autoGenerate())
                .withStreamKey(event);
        redisTemplate.opsForStream().add(record);
        redisTemplate.opsForStream().trim(event, maxLen, true);
        log.info("event {} send content {}", event, val);
    }

    private void startSubscription(String event, Class type, StreamListener streamListener) {
        RedisConnectionFactory redisConnectionFactory = redisTemplate.getConnectionFactory();
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .targetType(type)
                .build();

        StreamMessageListenerContainer listenerContainer = StreamMessageListenerContainer
                .create(redisConnectionFactory, options);
        listenerContainer.receiveAutoAck(
                Consumer.from(group, group + dataCenterId),
                StreamOffset.create(event, ReadOffset.lastConsumed()),
                streamListener);

        listenerContainer.start();
    }

    private void createGroup(String event) {
        try {
            redisTemplate.opsForStream().createGroup(event, group);
        } catch (RedisSystemException e) {
            if (e.getRootCause().getClass().equals(RedisBusyException.class)) {
                log.info("STREAM - Redis group already exists, skipping Redis group creation: order");
            } else if (e.getRootCause().getClass().equals(RedisCommandExecutionException.class)) {
                log.info("STREAM - Stream does not yet exist, creating empty stream: event-stream");
                // TODO: There has to be a better way to create a stream than this!?
                redisTemplate.opsForStream().add(event, Collections.singletonMap("", ""));
                redisTemplate.opsForStream().createGroup(event, group);
            } else throw e;
        }
    }

    private static Long getDataCenterId() {
        try {
            String hostName = Inet4Address.getLocalHost().getHostName();
            int[] ints = StringUtils.toCodePoints(hostName);
            int sums = 0;
            for (int b : ints) {
                sums += b;
            }
            return (long) (sums % 32);
        } catch (UnknownHostException e) {
            // 如果获取失败,则使用随机数备用
            return RandomUtils.nextLong(0, 31);
        }
    }
}

3.项目启动后自动对注解添加监听

@Component
@ConditionalOnBean(RedisStreamMqStartService.class)
public class ListenAnnotation implements ApplicationListener<ContextRefreshedEvent> {
    private static final Logger log = LoggerFactory.getLogger(ListenAnnotation.class);


    final RedisStreamMqStartService redisStreamMqStartService;

    public ListenAnnotation(RedisStreamMqStartService redisStreamMqStartService) {
        this.redisStreamMqStartService = redisStreamMqStartService;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (event.getApplicationContext().getParent() == null) {
            Map<String, Object> beans = event.getApplicationContext().getBeansWithAnnotation(RedisStreamMqListen.class);
            for (Object bean : beans.values()) {
                RedisStreamMqListen ca = bean.getClass().getAnnotation(RedisStreamMqListen.class);
                redisStreamMqStartService.listener(ca.value(), ca.type(), (StreamListener) bean);
                log.info("event {} start listen", ca.value());
            }

        }
    }

}

具体应用

1.定义的消息体

public class TestMessage1 {
    public String content;
}
public class TestMessage2 {
    public String content;
}

2.发送消息测试

@SpringBootTest
public class ProducerApplicationTest {
    final String event1 = "message1Listener";
    final String event2 = "message2Listener";
    @Autowired
    RedisStreamMqStartService redisStreamMqStartService;

    @Test
    public void sendMsg() throws InterruptedException {
        for (int i = 0; i < 300; i++) {
            if (i % 2 == 1) {
                TestMessage1 testMessage1 = new TestMessage1();
                testMessage1.content = "testMessage1:" + i;
                redisStreamMqStartService.coverSend(event1, testMessage1);
            } else {
                TestMessage2 testMessage2 = new TestMessage2();
                testMessage2.content = "testMessage2:" + i;
                redisStreamMqStartService.coverSend(event2, testMessage2);
            }
            Thread.sleep(1000l);
        }

    }
}

3.实现监听类

@Component
@RedisStreamMqListen(value = "message1Listener", type = TestMessage1.class)
public class Message1Listener implements StreamListener<String, ObjectRecord<String, TestMessage1>> {
    @Override
    public void onMessage(ObjectRecord<String, TestMessage1> message) {
        System.out.println(message.getValue().content);
    }
}
@Component
@RedisStreamMqListen(value = "message2Listener", type = TestMessage2.class)
public class Message2Listener implements StreamListener<String, ObjectRecord<String, TestMessage2>> {
    @Override
    public void onMessage(ObjectRecord<String, TestMessage2> message) {
        System.out.println(message.getValue().content);
    }
}

具体源码:github

备注:目前只是简单的封装了自动载入监听,ack,最大队列数。之后想把这个做成工具类,加入死信,延迟消费等消息中件间功能,可惜实力不允许,慢慢来。

2021-08-09
关于springboot-starter的封装
这次重构主要解决问题:springbootApplication注解默认扫描的是启动类及子目录中所有的类,然而封装为jar包不可能总是在启动类的子包内,所以要依靠引入核心类AutoConfigure,之前的初始加载类的@Component都可以删掉,改为Configuration注入

@Configuration
public class RedisStreamMqAutoConfigure {
    @Value("${spring.application.name:default}")
    private String group;
    @Value("${spring.redis.stream.maxLen:100}")
    long maxLen = 1000;

    @Bean
    @ConditionalOnMissingBean(RedisStreamMqStartService.class)
    public RedisStreamMqStartService redisStreamMqStartService(StringRedisTemplate stringRedisTemplate) {
        return new RedisStreamMqStartServiceImpl(stringRedisTemplate, group, maxLen);
    }

    @Bean
    @ConditionalOnBean(RedisStreamMqStartService.class)
    public ListenAnnotation listenAnnotation(RedisStreamMqStartService redisStreamMqStartService) {
        return new ListenAnnotation(redisStreamMqStartService);
    }
}

这样还有一个新问题就是AutoConfigure何时注入?
1在resourses下新建文件/META-INF/spring.factories
2写入

rg.springframework.boot.autoconfigure.EnableAutoConfiguration=com.github.ynhj123.redismq.stream.autoConfigure.RedisStreamMqAutoConfigure

这样其它项目引入该jar包都会自动装配RedisStreamMqStartService和ListenAnnotation

2022-02-18
最近发现一篇文章分析redis与专业消息中间件的区别写的非常好,拿来分享
把Redis当作队列来用,真的合适吗?

相关文章

网友评论

      本文标题:2021-08-04 springboot 2.x redisT

      本文链接:https://www.haomeiwen.com/subject/nsnuvltx.html