美文网首页
RabbitMQ消息队列(一)简单版 说明

RabbitMQ消息队列(一)简单版 说明

作者: 尼尔君 | 来源:发表于2018-09-28 17:53 被阅读0次

    Producer生产者例子

    
    class Producer{
    
    
    
        static ConnectionFactory connectionFactory;
    
        static Connection connection ;
    
        static Channel channel;
    
        static{
            init();
        }
    
        public static void main(String[] args) {
    
    
            send("你好吗");
    
    
        }
        public static void init(){
    
            connectionFactory  = new ConnectionFactory();
    
            connectionFactory.setHost("localhost");
    
            connectionFactory.setUsername("admin");
    
            connectionFactory.setPassword("8954036aa");
    
        }
    
    
        public static void send(String str){
            try {
    
    
                connection= connectionFactory.newConnection();
    
                channel =connection.createChannel();
    
                channel.queueDeclare(Rabbitmq.QUEUE_NAME,false,false,false,null);
    
                channel.basicPublish("",Rabbitmq.QUEUE_NAME,null,str.getBytes());
    
    
    
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
            finally {
                if(channel.isOpen())
                {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
                if(connection!=null)
                {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
    }
    
    
    
    
    

    Consumer消费者例子

    class Customer{
    
        static ConnectionFactory connectionFactory;
    
        static Connection connection ;
    
        static Channel channel;
    
    
        public static void main(String[] args) {
            init();
            consume();
        }
    
        public static void init(){
    
            connectionFactory  = new ConnectionFactory();
    
            connectionFactory.setHost("localhost");
    
            connectionFactory.setUsername("admin");
    
            connectionFactory.setPassword("8954036aa");
    
        }
    
        public static void consume(){
            try {
                    connection = connectionFactory.newConnection();
    
                    channel = connection.createChannel();
    
                    channel.queueDeclare(Rabbitmq.QUEUE_NAME,false,false,false,null);
    
                    Consumer consumer = new DefaultConsumer(channel){
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                            System.out.println(new String(body,"utf-8"));
                        }
                    };
    
                channel.basicConsume(Rabbitmq.QUEUE_NAME,consumer);
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
        }
    
    
    
    }
    
    
    

    初始化

            #创建连接工厂设置账号密码 url
             connectionFactory  = new ConnectionFactory();
    
            connectionFactory.setHost("localhost");
    
            connectionFactory.setUsername("admin");
    
            connectionFactory.setPassword("8954036aa");
    
    

    queueDeclare

    queueDeclare(String queue,
                boolean durable,
                boolean exclusive,
                Map<String, Object> arguments);
    
    
    #说明
    queue: 队列名称
    
    
    durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
    
    exclusive:是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等于true的话用于一个队列只能有一个消费者来消费的场景
    
    autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
    
    arguments:
    
    队列中的消息什么时候会自动被删除?
    
    Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间
    
            #AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
    
            #channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));
    
    Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp
    
    Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim
    
    Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B
    
    Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX
    
    Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK
    
    Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,
    
    Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中
    
    Master locator(x-queue-master-locator)
    
    
    
    

    basicPublish

    
     channel.basicPublish(参数1:(交换机名字),参数2:(队列名字),参数3:(BasicProperties 配置信息),参数4:(发送的消息体))
    
         routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用
    
        mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。 
      false:出现上述情形broker会直接将消息扔掉
    immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
    
    BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留
    
    

    生产者

    
                #建立连接
                connection= connectionFactory.newConnection();
    
                #创建通道
                channel =connection.createChannel();
    
                #声明一个队列 
                channel.queueDeclare(Rabbitmq.QUEUE_NAME,false,false,false,null);
    
                channel.basicPublish("",Rabbitmq.QUEUE_NAME,null,str.getBytes());
    
    
    
    

    消费者

            connection = connectionFactory.newConnection();
    
                    channel = connection.createChannel();
    
                    channel.queueDeclare(Rabbitmq.QUEUE_NAME,false,false,false,null);
    
                    Consumer consumer = new DefaultConsumer(channel){
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                            System.out.println(new String(body,"utf-8"));
                        }
                    };
    
                channel.basicConsume(Rabbitmq.QUEUE_NAME,consumer);
    
    

    basicConsume

       参数1:队列名字
       参数2:消费者(callback)
    

    相关文章

      网友评论

          本文标题:RabbitMQ消息队列(一)简单版 说明

          本文链接:https://www.haomeiwen.com/subject/pxgpoftx.html