美文网首页
RabbitMQ-中间件

RabbitMQ-中间件

作者: 通灵路耳 | 来源:发表于2020-06-25 06:24 被阅读0次

安装Erlang语言

RabbitMQ 是基于 Erlang 语言开发的,安装Erlang语言

百度网盘:
链接:https://pan.baidu.com/s/1yMiGsaN0V-50v4fL7TlL0A 
提取码:29vx

1、otp_win64_18.1安装,配置path环境变量
2、cmd测试:erl
图片.png

安装RabbitMQ

百度网盘:
链接:https://pan.baidu.com/s/1yMiGsaN0V-50v4fL7TlL0A 
提取码:29vx

1、rabbitmq-server-3.6.5.exe安装
2、运行cmd,配置插件:"E:\system\RabbitMQ\RabbitMQ\rabbitmq_server-3.6.5\sbin\rabbitmq-plugins.bat" enable rabbitmq_management
3、重启:net stop RabbitMQ && net start RabbitMQ
4、访问:http://127.0.0.1:15672/
5、账号:guest    密码:guest
图片.png

模式

协议:与ActiveMQ不一样, Rabbitmq 使用的是一种叫做 AMQP 的协议来通信,这种模式可以解决复杂的业务需求
模式:与ActiveMQ不同,ActiveMQ消息放到队列等待消费者获取,RabbitMQ拿到
消息交给交换机,交换机通过策略决定发给哪个队列

fanout模板代码

1、pom.xml

 <dependencies>
    <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
     </dependency>
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>4.3.1</version>
    </dependency>
   </dependencies>

2、判断RabbitMQ是否启动

package com.llhc;
import javax.swing.JOptionPane;
import cn.hutool.core.util.NetUtil;
public class RabbitMQUtil {
    public static void main(String[] args) {
        checkServer();
    }
    public static void checkServer() {
        if(NetUtil.isUsableLocalPort(15672)) {
            JOptionPane.showMessageDialog(null, "RabbitMQ 服务器未启动 ");
            System.exit(1);
        }
    }
}


3、消息发送

package com.llhc;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
/**
 * 消息生成者
 */
public class TestProducer {
    public final static String EXCHANGE_NAME="fanout_exchange";
 
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtil.checkServer();
         
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ相关信息
        factory.setHost("localhost");
        //创建一个新的连接
        Connection connection = factory.newConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
         
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
         
        for (int i = 0; i < 100; i++) {
            String message = "direct 消息 " +i;
            //发送消息到队列中
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println("发送消息: " + message);
             
        }
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

4、消息接收

package com.llhc;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.hutool.core.util.RandomUtil;
public class TestDriectCustomer {
    public final static String EXCHANGE_NAME="fanout_exchange";
 
    public static void main(String[] args) throws IOException, TimeoutException {
        //为当前消费者取随机名
        String name = "consumer-"+ RandomUtil.randomString(5);
         
        //判断服务器是否启动
        RabbitMQUtil.checkServer();
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("localhost");
        //创建一个新的连接
        Connection connection = factory.newConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
        //交换机声明(参数为:交换机名称;交换机类型)
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //获取一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        //队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)
        channel.queueBind(queueName,EXCHANGE_NAME,"");
         
        System.out.println(name +" 等待接受消息");
        //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
        // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(name + " 接收到消息 '" + message + "'");
            }
        };
        //自动回复队列应答 -- RabbitMQ中的消息确认机制
        channel.basicConsume(queueName, true, consumer);
    }
}

direct模板代码

1、pom.xml

 <dependencies>
    <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
     </dependency>
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>4.3.1</version>
    </dependency>
   </dependencies>

2、判断RabbitMQ是否启动

package com.llhc;
import javax.swing.JOptionPane;
import cn.hutool.core.util.NetUtil;
public class RabbitMQUtil {
    public static void main(String[] args) {
        checkServer();
    }
    public static void checkServer() {
        if(NetUtil.isUsableLocalPort(15672)) {
            JOptionPane.showMessageDialog(null, "RabbitMQ 服务器未启动 ");
            System.exit(1);
        }
    }
}

3、生产者

package cn.how2j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
/**
 * 消息生成者
 */
public class TestDriectProducer {
    public final static String QUEUE_NAME="direct_queue";
 
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtil.checkServer();
         
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ相关信息
        factory.setHost("localhost");
        //创建一个新的连接
        Connection connection = factory.newConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
         
        for (int i = 0; i < 100; i++) {
            String message = "direct 消息 " +i;
            //发送消息到队列中
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("发送消息: " + message);
             
        }
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

4、接收者

package cn.how2j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
 
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
import cn.hutool.core.util.RandomUtil;
 
public class TestDriectCustomer {
    private final static String QUEUE_NAME = "direct_queue";
 
    public static void main(String[] args) throws IOException, TimeoutException {
        //为当前消费者取随机名
        String name = "consumer-"+ RandomUtil.randomString(5);
         
        //判断服务器是否启动
        RabbitMQUtil.checkServer();
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("localhost");
        //创建一个新的连接
        Connection connection = factory.newConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
        //声明要关注的队列
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);
        System.out.println(name +" 等待接受消息");
        //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
        // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(name + " 接收到消息 '" + message + "'");
            }
        };
        //自动回复队列应答 -- RabbitMQ中的消息确认机制
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

topic模板代码

1、pom.xml

<dependencies>
    <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
     </dependency>
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>4.3.1</version>
    </dependency>
   </dependencies>

2、判断RabbitMQ是否启动

package com.llhc;
import javax.swing.JOptionPane;
import cn.hutool.core.util.NetUtil;
public class RabbitMQUtil {
    public static void main(String[] args) {
        checkServer();
    }
    public static void checkServer() {
        if(NetUtil.isUsableLocalPort(15672)) {
            JOptionPane.showMessageDialog(null, "RabbitMQ 服务器未启动 ");
            System.exit(1);
        }
    }
}

3、生产者

package cn.how2j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
/**
 * 消息生成者
 */
public class TestProducer {
    public final static String EXCHANGE_NAME="topics_exchange";
 
    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtil.checkServer();
         
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ相关信息
        factory.setHost("localhost");
        //创建一个新的连接
        Connection connection = factory.newConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
         
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
         
        String[] routing_keys = new String[] { "usa.news", "usa.weather",   
                "europe.news", "europe.weather" };   
        String[] messages = new String[] { "美国新闻", "美国天气",   
                "欧洲新闻", "欧洲天气" };   
         
        for (int i = 0; i < routing_keys.length; i++) {
            String routingKey = routing_keys[i];
            String message = messages[i];
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message   
                    .getBytes());   
            System.out.printf("发送消息到路由:%s, 内容是: %s%n ", routingKey,message);
             
        }
 
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

4、接收usa*

package cn.how2j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
 
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
import cn.hutool.core.util.RandomUtil;
 
public class TestCustomer4USA {
    public final static String EXCHANGE_NAME="topics_exchange";
 
    public static void main(String[] args) throws IOException, TimeoutException {
        //为当前消费者取名称
        String name = "consumer-usa";
         
        //判断服务器是否启动
        RabbitMQUtil.checkServer();
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("localhost");
        //创建一个新的连接
        Connection connection = factory.newConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
        //交换机声明(参数为:交换机名称;交换机类型)
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //获取一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        //接受 USA 信息
         
        channel.queueBind(queueName, EXCHANGE_NAME, "usa.*");           
        System.out.println(name +" 等待接受消息");
        //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
        // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(name + " 接收到消息 '" + message + "'");
            }
        };
        //自动回复队列应答 -- RabbitMQ中的消息确认机制
        channel.basicConsume(queueName, true, consumer);
    }
}

5、接收 *.news

package cn.how2j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
 
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
import cn.hutool.core.util.RandomUtil;
 
public class TestCustomer4News {
    public final static String EXCHANGE_NAME="topics_exchange";
 
    public static void main(String[] args) throws IOException, TimeoutException {
        //为当前消费者取名称
        String name = "consumer-news";
         
        //判断服务器是否启动
        RabbitMQUtil.checkServer();
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("localhost");
        //创建一个新的连接
        Connection connection = factory.newConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
        //交换机声明(参数为:交换机名称;交换机类型)
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //获取一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        //接受 USA 信息
         
        channel.queueBind(queueName, EXCHANGE_NAME, "*.news");           
        System.out.println(name +" 等待接受消息");
        //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
        // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(name + " 接收到消息 '" + message + "'");
            }
        };
        //自动回复队列应答 -- RabbitMQ中的消息确认机制
        channel.basicConsume(queueName, true, consumer);
    }
}

相关文章

网友评论

      本文标题:RabbitMQ-中间件

      本文链接:https://www.haomeiwen.com/subject/wptrfktx.html