美文网首页技术总结
Rabbitmq概念及HelloWorld

Rabbitmq概念及HelloWorld

作者: AlanKim | 来源:发表于2019-02-11 08:55 被阅读1次

    相关概念:

    • Producer:消息产生者

    • Consumer: 消息消费者

    • Broker:消息中间件的服务节点,对于Rabbitmq来说,一个Rabbitmq broker可以认为是一个Rabbitmq的服务实例。

    • Connection,与Broker的tcp长连接,Producer和Consumer都需要建立连接之后才可以使用

    • Channel,建立在Connection基础上,每个线程分配一个channel,类似于NIO的多路复用,节省连接资源。大部分RabbitMQ的操作和核心概念都是基于Channel的,需要特别注意。

    • Queue:队列,RabbitMQ中用于存储消息的容器。而且RabbitMQ中的消息只能存储在Queue中,这点跟kafka不同,kafka只能将消息放在topic中,而kafka中的queue只是topic实际存储文件中的位移标识。

      多个consumer可以消费同一个queue中的消息,这时候消息的处理是互斥的,即一个消息只能被一个consumer处理。

    • Exchange:还是不翻译成中文了,太怪。在producer将消息发到Broker中时,是通过exchange按照一定规则转发到不同的queue中,而不是直接放入queue中。

    • RoutingKey:producer在发送消息给exchange时,一般会指定一个RoutingKey,用来指定这个消息的路由规则。

    • BindingKey:RoutingKey和BindingKey匹配时(注意不是相同,可能是模糊匹配),exchange才会把消息发送到对应的queue中

    Snip20180606_6.png
    exchange类型
    • fanout,会把消息路由到所有绑定的queue中,无视RoutingKey和BindingKey
    • direct,只能将消息路由到RoutingKey和BindingKey完全匹配的queue,queue可以有多个,只要匹配就行
    • topic,可以支持# * 等通配符匹配RoutingKey和BindingKey
    • headers,不常用,不会依赖RoutingKey,而是根据消息内容中的headers属性跟exchange绑定的内容进行匹配,性能较低,不过非常灵活。

    一些关键点

    大部分情况下,按照最简单的方式使用就好了,作为工具书去查询《RabbitMQ实战详解》里面的配置。

    • channel.basicQos是设置一个channel中consumer所能保持的最大未确认消息,也就是说,如果一个channel中的qos值已经到了最大,那么rabbitmq就不会继续往这个channel中push对应的消息。
    • rabbitmq的顺序一致性其实是无法保证的,
      • 比如事务消息或者发送消息确认,当发送失败需要重试时,这一条(批)数据跟其之后的数据在producer端就不一致了
      • 如果producer发送的时候设置了不同的超时时间,并且也设置了死信队列,那么消费者在处理死信队列的时候,也会出现数据顺序与发送顺序不同的情况。
      • 设置优先级,也会导致顺序一致性收到影响。
    • 死信队列、延迟队列、消费优先级、持久化等,查询工具书即可。

    web端管理

    • 使用rabbitmq-plugins enable rabbitmq_management 来启用web管理插件,重启才会生效.
    • 使用rabbitmq-plugins list来查看正在使用的插件。
    • 访问 server_ip:15672可以访问,guest用户无法登陆远程服务器,需要使用上面创建的root:root123 用户/密码来登陆
    HelloWorld

    加入maven依赖:

            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.2.0</version>
            </dependency>
    
    producer代码:
    package rabbitmq.server;
    
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class RabbitMQProducer {
    
        private static final String exchange_name = "exchange_siyu";
    
        private static final String routing_key = "routing_key_siyu";
    
        private static final String queue_name = "queue_siyu";
    
        private static final String rabbitmq_server_ip_addr = "10.199.189.30";
    
        private static final int rabbitmq_server_port = 5672;
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            // 连接工厂类
            ConnectionFactory connectionFactory = new ConnectionFactory();
    
            // 设置连接属性及用户名密码,用户、密码要通过rabbitmqctl设置过权限
            connectionFactory.setHost(rabbitmq_server_ip_addr);
            connectionFactory.setPort(rabbitmq_server_port);
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("root123"); // 如果用户名密码不匹配,会连接失败
    
            // 建立连接,一个tcp长连接
            Connection connection = connectionFactory.newConnection();
    
            // 创建信道,主要操作通过channel执行,可以认为channel是虚拟化出来的一个Connection,用于复用
            Channel channel = connection.createChannel();
    
            // 定义路由,direct是point-2-piont的,直接到对应的单个queue中
            channel.exchangeDeclare(exchange_name,"direct",true,false,null);
    
            // 定义queue
            channel.queueDeclare(queue_name,true,false,false,null);
    
            // 通过routingkey 绑定queue和exchange
            channel.queueBind(queue_name,exchange_name,routing_key);
    
    
    
            // 开始发送消息
            String message = "Hello World!!";
    
            /* MessageProperties中预置了一部分消息的参数,比如PERSIST_TEXT_PLAIN,其中的定义如下:
            *
            *
            public static final BasicProperties PERSISTENT_TEXT_PLAIN =
            new BasicProperties("text/plain",
                                null,
                                null,
                                2,
                                0, null, null, null,
                                null, null, null, null,
                                null, null);
            * */
            channel.basicPublish(exchange_name,routing_key, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
    
            // 关闭channel和connection
            channel.close();
            connection.close();
    
        }
    
    }
    
    
    Consumer
    package rabbitmq.client;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class RabbitMQConsumer {
    
        private static final String queue_name = "queue_siyu";
        private static final String rabbitmq_server_ip_address = "10.199.189.30";
        private static final int port = 5672;
    
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    
            Address[] addresses = new Address[]{
                    new Address(rabbitmq_server_ip_address,port)
            };
    
            // 长连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
    
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("root123");
    
            // 这里创建连接跟server端不同,传入了address
            Connection connection = connectionFactory.newConnection(addresses);
    
            // 创建channel
            final Channel channel = connection.createChannel();
    
            channel.basicQos(64);// ?? 设置客户端最多接收未被ack的消息个数
    
            // 创建消费
            final Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("receive msg:" + new String(body));
    
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    channel.basicAck(envelope.getDeliveryTag(),false); // 发送ack之后,消息会在queue中被删除
                }
            };
    
            channel.basicConsume(queue_name,consumer);
    
            TimeUnit.SECONDS.sleep(5);
    
            // 如果先关闭connection,再关闭channel,就会抛出异常:
            // com.rabbitmq.client.AlreadyClosedException: connection is already closed due to clean connection shutdown;
            // 所以这里一定要注意关闭的顺序
            channel.close();
            connection.close();
    
        }
    
    }
    

    相关文章

      网友评论

        本文标题:Rabbitmq概念及HelloWorld

        本文链接:https://www.haomeiwen.com/subject/cpryjqtx.html