前言
上一节我们使用Redis实现了发布订阅者模式,关于Redis实现的发布订阅模式的特点,我们也了解了
- 基于Redis服务主动推送消息,而非订阅者循环拉取.
- 消息即发即丢(就是消息一发布,就丢失了,不会保存)
那么,实际场景时,我们也会遇到如下的业务场景:
- 需要保存消息,按需取出
- 需要异步延迟处理消息,比如预定订单会先入消息队列,而后统一在一个时间集中处理
此时,发布订阅模式就不太适用了
而生产消费模式便能满足这样的需求
生产消费者模式的特点如下:
- 只有一个消费者将获得消息
- 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
- 每一个成功处理的消息都由接收者签收
本节,我们就适用Redis实现生产消费模式
Redis实现生产这消费者模式
Redis,对于队列模型,我们可以使用list数据结构,通过LPUSH和RPOP来实现一个队列.
Springboot+Redis实现
- 引入redis依赖
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- application.yml
spring:
redis:
port: 6379
database: 0
host: 127.0.0.1
password: 123456
jedis:
pool:
max-active: 8
max-wait: -1ms
max-idle: 8
min-idle: 0
timeout: 5000ms
server:
port: 9999
- redis配置类
package com.mrcoder.sbredisproducerconsumer.config;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* @Description: Redis配置类
*/
@Configuration
@ConditionalOnClass({RedisTemplate.class})
public class RedisConfig {
/**
* Redis操作模板配置
*
* @param connectionFactory
* @return
*/
@Bean
public RedisTemplate<?, ?> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<byte[], byte[]> template = new RedisTemplate<byte[], byte[]>();
template.setConnectionFactory(connectionFactory);
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
@Bean
public RedisTemplate<?, ?> redisTemplate(RedisConnectionFactory connectionFactory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer) {
RedisTemplate<byte[], byte[]> template = new RedisTemplate<byte[], byte[]>();
template.setConnectionFactory(connectionFactory);
// 设置key/hashkey序列化
RedisSerializer<String> stringSerializer = new StringRedisSerializer();
template.setKeySerializer(stringSerializer);
template.setHashKeySerializer(stringSerializer);
// 设置值序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
/**
* 序列化定制
*
* @return
*/
@Bean
public Jackson2JsonRedisSerializer<Object> jackson2JsonSerializer() {
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(
Object.class);
// 初始化objectmapper
ObjectMapper mapper = new ObjectMapper();
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(mapper);
return jackson2JsonRedisSerializer;
}
}
- 定义生产者
package com.mrcoder.sbredisproducerconsumer.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
@Service
public class ProducerService {
@Autowired
RedisTemplate<String, String> redisTemplate;
@Scheduled(initialDelay = 5000, fixedDelay = 30 * 1000)
public void producer() {
String message = "消息 " + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
System.out.println("-----------生产者------------>生产了:" + message);
redisTemplate.opsForList().leftPush("queue", message);
}
}
- 定义消费者
package com.mrcoder.sbredisproducerconsumer.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class ConsumerService {
@Autowired
RedisTemplate<String, String> redisTemplate;
@Scheduled(initialDelay = 5000, fixedDelay = 60 * 1000)
public void consumer() {
String message = redisTemplate.opsForList().rightPop("queue");
System.out.println("-----------消费者------------>消费了:" + message);
}
}
- 入口类
package com.mrcoder.sbredisproducerconsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class SbRedisProducerConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SbRedisProducerConsumerApplication.class, args);
}
}
运行
-----------消费者------------>消费了:消息 Mon Sep 02 15:44:42 CST 2019
-----------生产者------------>生产了:消息 20190902155016
-----------生产者------------>生产了:消息 20190902155046
项目地址:
https://github.com/MrCoderStack/SpringBootDemo/tree/master/sb-redis-producer-consumer
请关注我的订阅号

网友评论