一、发布与订阅
实际中,redis很少使用发布与订阅来代替MQ角色。
二、使用redis客户端实现
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。一个redis客户端可以订阅任意多大频道channel,一个频道也可以被多个客户端订阅。
1.创建并监听频道
发布2.向创建的频道发送一条消息
订阅三、demo
1.订阅者需要继承抽象类JedisPubSub;重写方法onMessage接收发布者发送的消息
Farmer Worker Programmer2.定义发布者
3.测试类
public class Main {
public static final String CHANNEL_NAME = "MyChannel";
public static final String REDIS_HOST = "localhost";
public static final int REDIS_PORT = 6379;
private final static Logger logger = Logger.getLogger(Main.class);
private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig();
private final static JedisPool JEDIS_POOL =
new JedisPool(POOL_CONFIG, REDIS_HOST, REDIS_PORT, 0);
public static void main(String[] args) throws Exception {
PropertyConfigurator.configure("src/log4j.properties");
/*订阅者redis客户端*/
final Jedis farmerJedis = JEDIS_POOL.getResource();
final Jedis workerJedis = JEDIS_POOL.getResource();
final Jedis programmerJedis = JEDIS_POOL.getResource();
/*发布者redis*/
final Jedis publisherJedis = JEDIS_POOL.getResource();
final Farmer farmer = new Farmer();
final Worker worker = new Worker();
final Programmer programmer = new Programmer();
//订阅线程:接收消息,因为Jedis是以阻塞的方式等待发布者消息的,所以每个Jedis客户端必须对应一个独立的线程。不然只会有第一个Jedis接受到消息。
new Thread(new Runnable() {
public void run() {
try {
farmerJedis.subscribe(farmer,CHANNEL_NAME);
logger.info("Subscription ended.");
} catch (Exception e) {
logger.error("Subscribing failed.", e);
}
}
}).start();
new Thread(new Runnable() {
public void run() {
try {
workerJedis.subscribe(worker,CHANNEL_NAME);
logger.info("Subscription ended.");
} catch (Exception e) {
logger.error("Subscribing failed.", e);
}
}
}).start();
new Thread(new Runnable() {
public void run() {
try {
programmerJedis.subscribe(programmer,CHANNEL_NAME);
logger.info("Subscription ended.");
} catch (Exception e) {
logger.error("Subscribing failed.", e);
}
}
}).start();
//主线程:发布消息到CHANNEL_NAME频道上
WeatherServer weatherServer = new WeatherServer();
weatherServer.publishMessage(publisherJedis,CHANNEL_NAME,"rain");
farmerJedis.close();
workerJedis.close();
programmerJedis.close();
}
}
网友评论