美文网首页消息中间件-RabbitMQ
Rabbitmq 使用之 Direct(默认模式)

Rabbitmq 使用之 Direct(默认模式)

作者: DarkGown | 来源:发表于2019-07-24 17:17 被阅读0次

1 简介

Direct模式是rabbitmq的默认模式,机制大致可以理解为下图:

image

可以这样理解,交换机是一个车站(exChange),每个人是一个消息体(message),携带一张车票(key),在站台乘车(queue);人要乘车,于是根据票的不同,被车站分发到不同的站台,乘车去不同的地方。

即生产者发送一个指定key的消息,交换机将之转发给所有绑定此key的队列,消费者提取。

知道了流程,就考虑实现,我们订一个目标,写一个支持单发,群发的简单通讯。

show code.

2 生产者

首先创建factory

if (factory == null) {
    factory = new ConnectionFactory();

    factory.setHost(HOST);   //主机地址
    factory.setPort(PART);     // 端口号
    factory.setUsername(USERNAME);   //用户名
    factory.setPassword(PASSWORD);     //密码
    factory.setVirtualHost(VIRTUALHOST);
}

创建connection ,channel,声明一个交换机

try {
    //创建连接
    if (connection == null || !connection.isOpen())
        connection = factory.newConnection();

    //创建通道
    if (channel== null || !sendChannel.isOpen())
        channel= connection.createChannel();

    //声明交换机
    //EXCHANGE_NAME = "exchange-name"
    //EXCHANGE_TYPE = "direct"
    channel.exchangeDeclare(EXCHANGE_NAME,
            EXCHANGE_TYPE,
            true);

    Log.i(TAG, "连接成功");

} catch (Exception e) {
    e.printStackTrace();
    Log.i(TAG, "连接失败" + e);
}

然后发送消息(发送消息只需要建立连接,将指定key的消息发给交换机即可)

try {
    //EXCHANGE_NAME = "exchange-name"
    //KEY = "key-group"
    //MESSAGE = "messageBody"
    sendChannel.basicPublish(EXCHANGE_NAME,
            KEY,
            null, 
            MESSAGE.getBytes());

    Log.i(TAG, "发送成功");

} catch (Exception e) {
    e.printStackTrace();
    Log.e(TAG, "发送失败 " + e);
}

3 消费者

同样的,创建factory

if (factory == null) {
    factory = new ConnectionFactory();

    factory.setHost(HOST);   //主机地址
    factory.setPort(PART);     // 端口号
    factory.setUsername(USERNAME);   //用户名
    factory.setPassword(PASSWORD);     //密码
    factory.setVirtualHost(VIRTUALHOST);
}

创建connection ,channel,声明交换机

这里更重要的是声明队列

try {
    //创建连接
    if (connection == null || !connection.isOpen())
        connection = factory.newConnection();

    //创建通道
    if (receiveChannel == null || !receiveChannel.isOpen())
        receiveChannel = connection.createChannel();

    //声明交换机
    //EXCHANGE_NAME = "exchange-name"
    //EXCHANGE_TYPE = "direct"
    channel.exchangeDeclare(EXCHANGE_NAME,
            EXCHANGE_TYPE,
            true);

    //声明队列
    //QUEUE_NAME = "queue-name"
    receiveChannel.queueDeclare(QUEUE_NAME,
            true, false, false, null);

    //队列绑定交换机(单个key)
    //SINGLE_KEY = "key-single"
    receiveChannel.queueBind(QUEUE_NAME,
            EXCHANGE_NAME,
            SINGLE_KEY);

    //队列绑定交换机(集群key)
    //SINGLE_KEY = "key-group"
    receiveChannel.queueBind(QUEUE_NAME,
            EXCHANGE_NAME,
            GROUP_KEY);

    //接收消息
    receiveChannel.basicConsume(QUEUE_NAME,
            true,
            new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);

                    String message = new String(body);
                    Log.i(TAG, "消息内容 " + message);
                }
            });

    Log.i(TAG, "连接成功");

} catch (Exception e) {
    e.printStackTrace();
    Log.i(TAG, "连接失败" + e);
}

相关文章

网友评论

    本文标题:Rabbitmq 使用之 Direct(默认模式)

    本文链接:https://www.haomeiwen.com/subject/etzxrctx.html