美文网首页
RabbitMQ部分API详解(含demo)

RabbitMQ部分API详解(含demo)

作者: 程序员大黑鱼 | 来源:发表于2019-04-14 11:20 被阅读0次

    RabbitMQ部分API详解(含demo)

    demo配置文件rabbitmq.properties

    host=192.168.221.138
    port=5672
    username=guest
    password=guest
    

    读取配置文件内容

    public class RabbitConfig {
        public static String host ;
        public static int port ;
        public static String username;
        public static String password ;
    
        static{
            try {
                ClassLoader classLoader=RabbitConfig.class.getClassLoader();
                InputStream is=classLoader.getResourceAsStream("rabbitmq.properties");
                Properties properties=new Properties();
                properties.load(is);
                host = properties.getProperty("host");
                port = Integer.parseInt(properties.getProperty("port"));
                username =properties.getProperty("username");
                password =properties.getProperty("password");
            }catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    创建连接工厂

    public class RabbitUtil {
        public static Connection GetRabbitConnection() {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(RabbitConfig.host);
            factory.setPort(RabbitConfig.port);
            factory.setUsername(RabbitConfig.username);
            factory.setPassword(RabbitConfig.password);
            Connection connection = null;
            try {
                connection = factory.newConnection();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return connection;
        }
    }
    

    创建provider

    public class FanoutProvider {
    
        public static void main(String[] args) throws Exception {
            // 创建一个连接
            Connection connection = RabbitUtil.GetRabbitConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            //声明exchange
            //exchange:exchange名称
            //type:exchange类型
            //durable:exchange是否持久化(不代表消息持久化)
            //autoDelete:已经没有消费者了,服务器是否可以删除该Exchange
            //exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments)
            channel.exchangeDeclare("zx_fanout", BuiltinExchangeType.FANOUT, true,false,null);
    
            //声明queue
            //queue:queue名称
            //durable:queue是否持久化
            //exclusive:是否为当前连接的专用队列,在连接断开后,会自动删除该队列
            //autodelete:当没有任何消费者使用时,自动删除该队列
            //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
            channel.queueDeclare("zx_queue_f1", true, false, false, null);
            channel.queueDeclare("zx_queue_f2", true, false, false, null);
            channel.queueDeclare("zx_queue_f3", true, false, false, null);
    
            //queue:queue名称
            //exchange:exchange名称
            //routingKey:路由键;用来绑定queue和exchange
            //queueBind(String queue, String exchange, String routingKey)
            channel.queueBind("zx_queue_f1","zx_fanout","");
            channel.queueBind("zx_queue_f2","zx_fanout","");
            channel.queueBind("zx_queue_f3","zx_fanout","");
    
            //开启confirm机制
            // (即rabbitmq-server收到生产端的消息,会给生产端发送一个确认,如果没法送确认,生产端重新发送消息到server中)
            channel.confirmSelect();
    
            for (int i = 1 ; i <= 100 ;i++){
                String message = "fanout:广播的第 "+ i +" 条消息";
    
                //exchange
                //routingKey:路由键
                //mandatory:true=如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false=出现上述情形broker会直接将消息扔掉
                //immediate:true=如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。false=当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
                //BasicProperties:消息的基本属性,例如路由头等(MessageProperties.PERSISTENT_TEXT_PLAIN:消息的持久化)
                //basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
                channel.basicPublish("zx_fanout","",false,false,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("utf-8"));
                System.out.println(message);
            }
    
            // 关闭连接
            channel.close();
            connection.close();
        }
    }
    

    创建consumer

    public class Consumer1 {
        public static void main(String[] args) throws Exception {
            // 创建一个连接
            Connection conn = RabbitUtil.GetRabbitConnection();
            // 创建通道
            Channel channel = conn.createChannel();
            //声明exchange
            //exchange:exchange名称
            //type:exchange类型
            //durable:exchange是否持久化(不代表消息持久化)
            //autoDelete:已经没有消费者了,服务器是否可以删除该Exchange
            //exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments)
            channel.exchangeDeclare("zx_fanout", BuiltinExchangeType.FANOUT,true,false,null);
    
            //声明queue
            //queue:queue名称
            //durable:queue是否持久化
            //exclusive:是否为当前连接的专用队列,在连接断开后,会自动删除该队列
            //autodelete:当没有任何消费者使用时,自动删除该队列
            //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
            channel.queueDeclare("zx_queue_f1", true, false, false, null);
    
            //queue:queue名称
            //exchange:exchange名称
            //routingKey:路由键;用来绑定queue和exchange
            //queueBind(String queue, String exchange, String routingKey)
            channel.queueBind("zx_queue_f1","zx_fanout","");
            
            //uack的消息最多有几条
            channel.basicQos(200);
    
            //消息消费
            //queue:绑定队列名称
            //autoAck:是否自动ack,如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答
            //basicConsume(String queue, boolean autoAck, Consumer callback)
            channel.basicConsume("zx_queue_f1",false,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        String message = new String(body,"utf-8");
                        System.out.println(Consumer1.class.getSimpleName()+"--接收消息--"+message);
                    } catch (UnsupportedEncodingException e) {
    
                        //deliveryTag:该消息的唯一ID
                        //multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。
                        //requeue:被拒绝的是否重新入队列
                        //channel.basicNack(long deliveryTag, boolean multiple, boolean requeue);
                        channel.basicNack(envelope.getDeliveryTag(),false,true);
    
                        //deliveryTag:该消息的index
                        //requeue:被拒绝的是否重新入队列
                        //channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
                        //channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息
                    } finally {
    
                        //basicAck(long deliveryTag, boolean multiple)
                        //deliveryTag:该消息的唯一ID
                        //multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            });
        }
    }
    

    demo的github地址

    https://github.com/AndeZhaoX/rabbitmq-test.git

    相关文章

      网友评论

          本文标题:RabbitMQ部分API详解(含demo)

          本文链接:https://www.haomeiwen.com/subject/hdujwqtx.html