美文网首页MQ
RabbitMQ:四种ExChange用法

RabbitMQ:四种ExChange用法

作者: 请叫wo小爷 | 来源:发表于2017-08-22 14:44 被阅读36次

    摘要:RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingKey(路由)关系的Queue(队列)。ExChange和Queue之前是多对多的关系。RabbitMQ 3.0之后创建ExChange时,有四种类型可选“fanout、direct、topic、headers”。

    RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingKey(路由)关系的Queue(队列)。

    ExChange和Queue之前是多对多的关系。

    RabbitMQ 3.0之后创建ExChange时,有四种类型可选“fanout、direct、topic、headers”。

    一、fanout

    当向一个fanout发送一个消息时,RoutingKey的设置不起作用。

    消息会被发送给同一个交换机下的所有队列,每个队列接收到的消息是一样的;

    一个队列内有所有消费者(包含那些并没有相应RoutingKey的消费者),将平分队列接收到的消息。

    ----------------消息生产者----------------

    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机

    factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口

    factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名

    factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

    Connection connection = factory.newConnection();

    Channel channel = connection.createChannel();

    // 声明路由名字和类型

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);

    String message = "hello world! ";

    for(int i=0;i<100;i++)

    {

    channel.basicPublish(EXCHANGE_NAME, "", null, (message+i).getBytes());

    }

    System.out.println("Sent msg finish");

    channel.close();

    connection.close();

    ----------------消息消费者----------------

    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机

    factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口

    factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名

    factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

    Connection connection = factory.newConnection();

    Channel channel = connection.createChannel();

    //声明路由名字和类型

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);

    //声明队列

    channel.queueDeclare(QUEUE_NAME, true, false, false, null);

    //绑定路由和队列

    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routkey2", null);

    System.out.println(" Waiting for msg....");

    Consumer consumer = new DefaultConsumer(channel) {

    @Override

    public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {

    String message = "";

    try

    {

    message = new String(body, "UTF-8");

    }

    catch (UnsupportedEncodingException e)

    {

    e.printStackTrace();

    }

    catch (Throwable ex)

    {

    ex.printStackTrace();

    }

    System.out.println("Received msg='" + message + "'");

    }

    };

    channel.basicConsume(QUEUE_NAME, true, consumer);

    二、direct

    当向一个direct发送一个消息时,消息会被发送给同一个交换机下的拥有相应RoutingKey的队列,每个队列接收到的消息是一样的;

    一个队列内拥有相应RoutingKey的消费者,将平分队列接收到的消息。

    ----------------消息生产者----------------

    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机

    factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口

    factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名

    factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

    Connection connection = factory.newConnection();

    Channel channel = connection.createChannel();

    // 声明路由名字和类型

    channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);

    String message = "hello world! ";

    for(int i=0;i<100;i++)

    {

    channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message+i).getBytes());

    }

    System.out.println("Sent msg is '" + message + "'");

    channel.close();

    connection.close();

    ----------------消息消费者----------------

    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机

    factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口

    factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名

    factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

    Connection connection = factory.newConnection();

    Channel channel = connection.createChannel();

    //声明路由名字和类型

    channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);

    //声明队列

    channel.queueDeclare(QUEUE_NAME, true, false, false, null);

    //绑定路由和队列

    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null);

    System.out.println(" Waiting for msg....");

    Consumer consumer = new DefaultConsumer(channel)

    {

    @Override

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,

    byte[] body)

    {

    String message = "";

    try

    {

    message = new String(body, "UTF-8");

    }

    catch (UnsupportedEncodingException e)

    {

    e.printStackTrace();

    }

    catch (Throwable ex)

    {

    ex.printStackTrace();

    }

    System.out.println("1 Received msg='" + message + "'");

    }

    };

    channel.basicConsume(QUEUE_NAME, true, consumer);

    三、topic

    当向一个topic发送一个消息时,消息会被发送给同一个交换机下的拥有相应RoutingKey的队列,每个队列接收到的消息是一样的;

    一个队列内有所有消费者(包含那些并没有相应RoutingKey的消费者),将平分队列接收到的消息。

    ----------------消息生产者----------------

    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机

    factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口

    factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名

    factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

    Connection connection = factory.newConnection();

    Channel channel = connection.createChannel();

    // 声明路由名字和类型

    channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);

    String message = "hello world! ";

    // int i=101;

    for (int i = 0; i < 100; i++)

    {

    channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message + i).getBytes());

    }

    System.out.println("Sent msg is '" + message + "'");

    channel.close();

    connection.close();

    ----------------消息消费者----------------

    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机

    factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口

    factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名

    factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码

    Connection connection = factory.newConnection();

    Channel channel = connection.createChannel();

    // 声明路由名字和类型

    channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);

    //声明队列

    channel.queueDeclare(QUEUE_NAME, true, false, false, null);

    //绑定路由和队列// 把队列绑定到路由上并指定headers

    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null);

    System.out.println("1 Waiting for msg....");

    Consumer consumer = new DefaultConsumer(channel)

    {

    @Override

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body)

    {

    String message = "";

    try

    {

    message = new String(body, "UTF-8");

    }

    catch (UnsupportedEncodingException e)

    {

    e.printStackTrace();

    }

    catch (Throwable ex)

    {

    ex.printStackTrace();

    }

    System.out.println("1 Received msg='" + message + "'");

    }

    };

    channel.basicConsume(QUEUE_NAME, true, consumer);

    展开全文

    相关文章

      网友评论

        本文标题:RabbitMQ:四种ExChange用法

        本文链接:https://www.haomeiwen.com/subject/lucldxtx.html