美文网首页
rabbitMq简单入门

rabbitMq简单入门

作者: 太大_453b | 来源:发表于2018-04-25 16:31 被阅读26次

    背景

    RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现

    应用场景

    • 异步处理
    • 应用解耦(服务解耦)

    API

    下载rabbitMQ自己看

    简单实战搭建

    • 生产者
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author create by yingjie.chen on 2018/3/27.
     * @version 2018/3/27 14:54
     */
    public class Producer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("admin");
            factory.setPassword("admin");
            //设置 RabbitMQ 地址
            factory.setHost("172.16.7.45");
            //建立到代理服务器到连接
            Connection conn = factory.newConnection();
            //获得信道
            Channel channel = conn.createChannel();
            //声明交换器
            String exchangeName = "hello-exchange";
            channel.exchangeDeclare(exchangeName, "direct", true);
    
    
            String routingKey = "hola";
            //发布消息
            byte[] messageBodyBytes = "quit".getBytes();
            channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
    
            channel.close();
            conn.close();
        }
    }
    
    • 消费者
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
     * @author create by yingjie.chen on 2018/3/27.
     * @version 2018/3/27 15:17
     */
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setHost("172.16.7.45");
            //建立到代理服务器到连接
            Connection conn = factory.newConnection();
            //获得信道
            final Channel channel = conn.createChannel();
            //声明交换器
            String exchangeName = "hello-exchange";
            channel.exchangeDeclare(exchangeName, "direct", true);
            //声明队列
            String queueName = channel.queueDeclare().getQueue();
            String routingKey = "hola";
            //绑定队列,通过键 hola 将队列和交换器绑定起来
            channel.queueBind(queueName, exchangeName, routingKey);
    
            while(true) {
                //消费消息
                boolean autoAck = false;
                String consumerTag = "";
                channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties properties,
                                               byte[] body) throws IOException {
                        String routingKey = envelope.getRoutingKey();
                        String contentType = properties.getContentType();
                        System.out.println("消费的路由键:" + routingKey);
                        System.out.println("消费的内容类型:" + contentType);
                        long deliveryTag = envelope.getDeliveryTag();
                        //确认消息
                        channel.basicAck(deliveryTag, false);
                        System.out.println("消费的消息体内容:");
                        String bodyStr = new String(body, "UTF-8");
                        System.out.println(bodyStr);
    
                    }
                });
            }
        }
    }
    

    原理

    1. Client 用过EXCHANGE 指定routingkey 就能找到消息所在的队列Queue
    2. Client 订阅就是监控绑定的的队列,有消息会直接取到,消费者处理完返回信息原理也是一样,通过不一样的队列(一般都是queue后面加个queueName_xxxx)

    相关文章

      网友评论

          本文标题:rabbitMq简单入门

          本文链接:https://www.haomeiwen.com/subject/qleylftx.html