美文网首页
rabbitmq:java编程(使用rabbitmq原生API)

rabbitmq:java编程(使用rabbitmq原生API)

作者: jyjack | 来源:发表于2019-08-11 15:58 被阅读0次

    参考资料

    https://www.rabbitmq.com/getstarted.html

    实战一:一个生产者,一个消费者

    一个生产者,一个消费者
    • 依赖

    添加rabbitmq依赖

            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.7.3</version>
            </dependency>
    

    com.rabbitmq:amqp-client使用slf4j日志框架,需要添加一个具体的日志实现,如下所示。(当然也可以使用其他的日志实现,如 log4j等)

            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.26</version>
            </dependency>
    
    • 发送消息

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.util.Date;
    
    public class Send {
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] argv) throws Exception {
            //连接信息
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            Connection connection=null;
            Channel channel=null;
            try {
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
                for (int i = 1; i <= 100; i++) {
                    String message = "Hello World!  n=" + i + "  Time=" + new Date().toString();
                    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  //发送消息
                    System.out.println(" [x] Sent '" + message + "'");
                    Thread.sleep(1000);  //sleep 1秒
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                channel.close();
                connection.close();
            }
        }
    }
    
    • 接收消息

    使用 channel.basicConsume 方法可以接收一条消息。

    • 接收消息 (监听方式)

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Receive {
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] argv) throws Exception {
            //连接信息
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
    
            try {
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
                //消息处理回调
                DeliverCallback deliverCallback = new DeliverCallback() {
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException {
                        String message = new String(delivery.getBody(), "UTF-8");
                        System.out.println(" [x] Received '" + message + "'");
                    }
                };
                CancelCallback cancelCallback = null;
    
                channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);  //监听队列
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    实战二:多个消费者 (实现对同一队列的并行消费)

    多个消费者

    我们将模拟如下场景,在这个场景下:

    • 生产者:每1秒产生一条消息
    • 消费者,处理每个消息需要2秒。
      在这种场景下,将产生消息堆积,因此使用2个消息者来并行处理消息
    • 发送消息

    对发送消息程序进行改造,使其每秒发送一条消息

                for (int i = 1; i <= 100; i++) {
                    String message = "Hello World!  n=" + i + "  Time=" + new Date().toString();
                    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  //发送消息
                    System.out.println(" [x] Sent '" + message + "'");
                    Thread.sleep(1000);  //sleep 1秒
                }
    
    • 接收消息(监听方式)

    对接收消息程序进行改造

    • 将消息预取数量设为较小的值,这里设为1 (若预取值较大,虽然有多个消费者,第2个消息费有可能取不到消息,因为这些消息已被第1个消费者预约了)
    • 将自动确认消息,改为手动确认消息 (如果是自动确认消息,一收到消息就自动确认了,即使消息还没处理完,这时又会预取消息)
    • 模拟消息处理耗时 将耗时设置为2秒
           try {
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
                channel.basicQos(1);        //设置预取数量
                //消息处理回调
                DeliverCallback deliverCallback1 = new DeliverCallback() {
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException {
                        try {
                            Thread.sleep(2000);  //模拟消息处理时间2秒
                        } catch (InterruptedException e) {
                        }
    
                        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //手动确认消息
                        System.out.println(" [X] Received '" + message + "'");
                    }
                };
                CancelCallback cancelCallback = null;
    
                boolean autoAck = false;  //设置为手动确认模式
                channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback1, cancelCallback);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
    • 测试1:只运行一个消费者

    这时先运行 Receive,再运行 Send,会发现消息处理速度跟不上消息产生的速度。 (例如发送了10条消息,可能只处理了5条消息)

    • 测试2:运行多个消费者

    运行多个消费者,可以采用如下方法之一:

    • 将Receive复制为Receive2,然后同时运行Receive 及 Receive2
    • 两个cmd窗口下,分别运行Receive: java -cp $CP Receive
    • 在代码中启动两个监听(注意需要使用线程,每个线程启动一个监听。如果在一个线程中启动两个监听,虽然两个监听会轮流处理消息,但速度不会有提升)

    首先启动两个消费者,然后再运行Send ,会发现消息处理的速度基本跟上了消息产生的速度。

    实战三:分发

    image.png
    • 应场景描述:

    将同一消息分发到多个队列,过程如下:

    1. 生产者将消息发送到交换器Exchange(X),交换器X将消息转发到绑定的一个或多个目标队列。
    2. 交换器与目标队列的绑定:可以通过JAVA代码进行绑定;也可以通过rabbitmq的WEB管理页面进行绑定,这种绑定实现了生产者与目标队列的解耦。
    • 说明:交换器不会保存实际的消息。(可以将交换器理解为虚拟队列,该队列指向一个或多个目标队列)

    • 定义交换器

    定义交换器,将设置交换器类型为 fanout,即扇出分发模式

    channel.exchangeDeclare("X1", "fanout");
    
    • 发送消息到交换器

    之前,介绍了向队列发消息的方法:

    channel.basicPublish("", "hello", null, message.getBytes());
    

    现在,介绍向交换器发消息的方法,代码如下:

    channel.basicPublish( "X1", "", null, message.getBytes());
    
    • 绑定队列:通过JAVA代码绑定

    channel.queueBind(queueName1, "X1", "");
    channel.queueBind(queueName2, "X1", "");
    
    • 绑定队列:通过 rabbitmq的WEB管理控制台绑定

    1. 创建队列:
    2. 创建交换器:创建交换器时,Type属性,选择 fanout;Durability属性,选择Transient;
    3. 为交换器绑定队列:
    • 运行

    运行代码,会看到消息被会分发到了两个队列中。

    实战三:关于事务控制

    • 发送消息事务

    事务的实现主要是对信道(Channel)的设置,主要的方法有三个:

    channel.txSelect(); //声明启动事务模式;
    channel.txCommit(); //提交事务;
    channel.txRollback(); //回滚事务;
    

    发送消息事务示例代码:

    try {
        channel.txSelect(); // 声明事务
        // 发送消息
        channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
        channel.txCommit(); // 提交事务
    } catch (Exception e) {
        channel.txRollback();
    } finally {
        channel.close();
        conn.close();
    }
    

    即在发送消息之前,需要声明channel为事务模式,发送消息后提交事务,在异常时回滚事务即可。

    • 发送消息确认

    发送消息使用事务,性能较低。
    在发达大量消息时,建议可以使用消息确认模式。具体可百度。

    • 接收消息(监听方式):应答确认机制

    监听方式接收消息的事务控制,需使用应答确认机制。

    • autoAck=true时(自动应答):消息无论处理成功或失败,都会从队列中移除。
    • autoAck=false时(手动应答):在消息处理完成后,使用channel.basicAck(...)方法手动应答,这时消息会从队列中移除。如果不应答消息不会被移除(当然这时消费者是不会收到下一条消息的)。
    1. 如果是在消息处理过程中,在还未手动应答前,若网络或消费者异常导致连接断开,则消息会回到队列,当有其他消费者时,会再次接收到该消息。
    2. 如果是在消息处理过程中,发生了异常,需要再次重新处理,则需要将消息主动放入队列(队列尾),再手动应答。
    3. actimqmq支持消息延迟投递,rabbitmq并不支持。(如果有异常延迟投递的需求,则需要自行进行特殊处理)

    实战四:关于通配符 & 交换器类型

    activemq在监听队列时,可以使用通配符,实现一个消费者监听多个不同的队列。
    rabbitmq好像在监听队列时,并不支持通配符。那么在rabbitmq中,通配符有什么用途呢?其实是用于交换器,下面进行说明。
    rabbitmq的交换器有多种类型:

    • fanout -分发模式:即交换器会将消息分发给所有绑定的队列
    • direct-路由模式:即交换器只会将消息路由到一个指定的队列,由routing key指定队列名。
    • topic-订阅模式:即交换器会将消息分发给订阅了该消息的队列,由 routing key指定。但这里的routing key可以为队列名,队列名中可以使用通配符,以将某个消息发给一个或多个队列。

    通配符说明:

     通配符 * :表示匹配1个单词
     通配符 # :表示匹配任意个单词
    

    相关文章

      网友评论

          本文标题:rabbitmq:java编程(使用rabbitmq原生API)

          本文链接:https://www.haomeiwen.com/subject/bqjjjctx.html