美文网首页
订阅模式

订阅模式

作者: 寂静的春天1988 | 来源:发表于2019-04-08 00:01 被阅读0次

之前都是一个消息被一个消费者消费。如果想一个消费者被多个消费者消费可以使用订阅模式。
订阅模式:生产者不会将消息直接发送到队列,而是发送到交换机(exchange)上,每个队列绑定到交换机上。
生产者发送到交换机,再到达队列,就能实现一个消息被多个消费者消费。

生产者

package com.demo.ps;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.demo.util.RabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 订阅模式
 * @author yl
 *
 */
public class Send {
    public void send() throws IOException, TimeoutException {
        //得到连接
        Connection con=RabbitMQ.getConnection();
        //创建通道
        Channel channel=con.createChannel();
        //声明交换机
        channel.exchangeDeclare("test_exchage", "fanout");
        //发送消息
        String msg="hello world";
        channel.basicPublish("test_exchage", "", null, msg.getBytes());
        channel.close();
        con.close();
    }
    public static void main(String[] args) throws IOException, TimeoutException {
        new Send().send();
    }
}

消费者1

package com.demo.ps;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.demo.util.RabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Get1 {
    /**
     * 消费者1
     * @throws IOException
     * @throws TimeoutException
     */
    public void get() throws IOException, TimeoutException {
        //创建连接
        Connection  connection=RabbitMQ.getConnection();
        //得到通道
        Channel channel =connection.createChannel();
        //得到队列
        channel.queueDeclare("ps_queue1", false, false, false, null);
        //绑定到交换机
        channel.queueBind("ps_queue1", "test_exchage", "");
        //每次最多只发送一个消息
        channel.basicQos(1);
        DefaultConsumer consumer=new DefaultConsumer(channel) {
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body);
                try {
                    Thread.sleep(2000);
                    System.out.println("消费者1"+msg);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    //手动消息回执
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }

            };
        };
        //自动应答改成false
        channel.basicConsume("ps_queue1", false, consumer);
    }
    public static void main(String[] args) throws IOException, TimeoutException {
        new Get1().get();
    }
}

消费者2

package com.demo.ps;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.demo.util.RabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Get2 {
    /**
     * 消费者2
     * @throws IOException
     * @throws TimeoutException
     */
    public void get() throws IOException, TimeoutException {
        //创建连接
        Connection  connection=RabbitMQ.getConnection();
        //得到通道
        Channel channel =connection.createChannel();
        //得到队列
        channel.queueDeclare("ps_queue2", false, false, false, null);
        //绑定到交换机
        channel.queueBind("ps_queue2", "test_exchage", "");
        //每次最多只发送一个消息
        channel.basicQos(1);
        DefaultConsumer consumer=new DefaultConsumer(channel) {
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body);
                try {
                    Thread.sleep(2000);
                    System.out.println("消费者2"+msg);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    //手动消息回执
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }

            };
        };
        //自动应答改成false
        channel.basicConsume("ps_queue2", false, consumer);
    }
    public static void main(String[] args) throws IOException, TimeoutException {
        new Get2().get();
    }
}

运行发现,两个消费者都消费了消息!

相关文章

网友评论

      本文标题:订阅模式

      本文链接:https://www.haomeiwen.com/subject/khesiqtx.html