美文网首页
RabbitMQ-中间件

RabbitMQ-中间件

作者: 通灵路耳 | 来源:发表于2020-06-25 06:24 被阅读0次

    安装Erlang语言

    RabbitMQ 是基于 Erlang 语言开发的,安装Erlang语言
    
    百度网盘:
    链接:https://pan.baidu.com/s/1yMiGsaN0V-50v4fL7TlL0A 
    提取码:29vx
    
    1、otp_win64_18.1安装,配置path环境变量
    2、cmd测试:erl
    
    图片.png

    安装RabbitMQ

    百度网盘:
    链接:https://pan.baidu.com/s/1yMiGsaN0V-50v4fL7TlL0A 
    提取码:29vx
    
    1、rabbitmq-server-3.6.5.exe安装
    2、运行cmd,配置插件:"E:\system\RabbitMQ\RabbitMQ\rabbitmq_server-3.6.5\sbin\rabbitmq-plugins.bat" enable rabbitmq_management
    3、重启:net stop RabbitMQ && net start RabbitMQ
    4、访问:http://127.0.0.1:15672/
    5、账号:guest    密码:guest
    
    图片.png

    模式

    协议:与ActiveMQ不一样, Rabbitmq 使用的是一种叫做 AMQP 的协议来通信,这种模式可以解决复杂的业务需求
    模式:与ActiveMQ不同,ActiveMQ消息放到队列等待消费者获取,RabbitMQ拿到
    消息交给交换机,交换机通过策略决定发给哪个队列
    

    fanout模板代码

    1、pom.xml
    
     <dependencies>
        <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.6.5</version>
         </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>4.3.1</version>
        </dependency>
       </dependencies>
    
    2、判断RabbitMQ是否启动
    
    package com.llhc;
    import javax.swing.JOptionPane;
    import cn.hutool.core.util.NetUtil;
    public class RabbitMQUtil {
        public static void main(String[] args) {
            checkServer();
        }
        public static void checkServer() {
            if(NetUtil.isUsableLocalPort(15672)) {
                JOptionPane.showMessageDialog(null, "RabbitMQ 服务器未启动 ");
                System.exit(1);
            }
        }
    }
    
    
    3、消息发送
    
    package com.llhc;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
     
    /**
     * 消息生成者
     */
    public class TestProducer {
        public final static String EXCHANGE_NAME="fanout_exchange";
     
        public static void main(String[] args) throws IOException, TimeoutException {
            RabbitMQUtil.checkServer();
             
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置RabbitMQ相关信息
            factory.setHost("localhost");
            //创建一个新的连接
            Connection connection = factory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
             
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
             
            for (int i = 0; i < 100; i++) {
                String message = "direct 消息 " +i;
                //发送消息到队列中
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println("发送消息: " + message);
                 
            }
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    
    4、消息接收
    
    package com.llhc;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import cn.hutool.core.util.RandomUtil;
    public class TestDriectCustomer {
        public final static String EXCHANGE_NAME="fanout_exchange";
     
        public static void main(String[] args) throws IOException, TimeoutException {
            //为当前消费者取随机名
            String name = "consumer-"+ RandomUtil.randomString(5);
             
            //判断服务器是否启动
            RabbitMQUtil.checkServer();
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置RabbitMQ地址
            factory.setHost("localhost");
            //创建一个新的连接
            Connection connection = factory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
            //交换机声明(参数为:交换机名称;交换机类型)
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
            //获取一个临时队列
            String queueName = channel.queueDeclare().getQueue();
            //队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)
            channel.queueBind(queueName,EXCHANGE_NAME,"");
             
            System.out.println(name +" 等待接受消息");
            //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
            // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(name + " 接收到消息 '" + message + "'");
                }
            };
            //自动回复队列应答 -- RabbitMQ中的消息确认机制
            channel.basicConsume(queueName, true, consumer);
        }
    }
    

    direct模板代码

    1、pom.xml
    
     <dependencies>
        <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.6.5</version>
         </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>4.3.1</version>
        </dependency>
       </dependencies>
    
    2、判断RabbitMQ是否启动
    
    package com.llhc;
    import javax.swing.JOptionPane;
    import cn.hutool.core.util.NetUtil;
    public class RabbitMQUtil {
        public static void main(String[] args) {
            checkServer();
        }
        public static void checkServer() {
            if(NetUtil.isUsableLocalPort(15672)) {
                JOptionPane.showMessageDialog(null, "RabbitMQ 服务器未启动 ");
                System.exit(1);
            }
        }
    }
    
    3、生产者
    
    package cn.how2j;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
     
    /**
     * 消息生成者
     */
    public class TestDriectProducer {
        public final static String QUEUE_NAME="direct_queue";
     
        public static void main(String[] args) throws IOException, TimeoutException {
            RabbitMQUtil.checkServer();
             
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置RabbitMQ相关信息
            factory.setHost("localhost");
            //创建一个新的连接
            Connection connection = factory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
             
            for (int i = 0; i < 100; i++) {
                String message = "direct 消息 " +i;
                //发送消息到队列中
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println("发送消息: " + message);
                 
            }
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    
    4、接收者
    
    package cn.how2j;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
     
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
     
    import cn.hutool.core.util.RandomUtil;
     
    public class TestDriectCustomer {
        private final static String QUEUE_NAME = "direct_queue";
     
        public static void main(String[] args) throws IOException, TimeoutException {
            //为当前消费者取随机名
            String name = "consumer-"+ RandomUtil.randomString(5);
             
            //判断服务器是否启动
            RabbitMQUtil.checkServer();
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置RabbitMQ地址
            factory.setHost("localhost");
            //创建一个新的连接
            Connection connection = factory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
            //声明要关注的队列
            channel.queueDeclare(QUEUE_NAME, false, false, true, null);
            System.out.println(name +" 等待接受消息");
            //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
            // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(name + " 接收到消息 '" + message + "'");
                }
            };
            //自动回复队列应答 -- RabbitMQ中的消息确认机制
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    topic模板代码

    1、pom.xml
    
    <dependencies>
        <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.6.5</version>
         </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>4.3.1</version>
        </dependency>
       </dependencies>
    
    2、判断RabbitMQ是否启动
    
    package com.llhc;
    import javax.swing.JOptionPane;
    import cn.hutool.core.util.NetUtil;
    public class RabbitMQUtil {
        public static void main(String[] args) {
            checkServer();
        }
        public static void checkServer() {
            if(NetUtil.isUsableLocalPort(15672)) {
                JOptionPane.showMessageDialog(null, "RabbitMQ 服务器未启动 ");
                System.exit(1);
            }
        }
    }
    
    3、生产者
    
    package cn.how2j;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
     
    /**
     * 消息生成者
     */
    public class TestProducer {
        public final static String EXCHANGE_NAME="topics_exchange";
     
        public static void main(String[] args) throws IOException, TimeoutException {
            RabbitMQUtil.checkServer();
             
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置RabbitMQ相关信息
            factory.setHost("localhost");
            //创建一个新的连接
            Connection connection = factory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
             
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
             
            String[] routing_keys = new String[] { "usa.news", "usa.weather",   
                    "europe.news", "europe.weather" };   
            String[] messages = new String[] { "美国新闻", "美国天气",   
                    "欧洲新闻", "欧洲天气" };   
             
            for (int i = 0; i < routing_keys.length; i++) {
                String routingKey = routing_keys[i];
                String message = messages[i];
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message   
                        .getBytes());   
                System.out.printf("发送消息到路由:%s, 内容是: %s%n ", routingKey,message);
                 
            }
     
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    
    4、接收usa*
    
    package cn.how2j;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
     
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
     
    import cn.hutool.core.util.RandomUtil;
     
    public class TestCustomer4USA {
        public final static String EXCHANGE_NAME="topics_exchange";
     
        public static void main(String[] args) throws IOException, TimeoutException {
            //为当前消费者取名称
            String name = "consumer-usa";
             
            //判断服务器是否启动
            RabbitMQUtil.checkServer();
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置RabbitMQ地址
            factory.setHost("localhost");
            //创建一个新的连接
            Connection connection = factory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
            //交换机声明(参数为:交换机名称;交换机类型)
            channel.exchangeDeclare(EXCHANGE_NAME,"topic");
            //获取一个临时队列
            String queueName = channel.queueDeclare().getQueue();
            //接受 USA 信息
             
            channel.queueBind(queueName, EXCHANGE_NAME, "usa.*");           
            System.out.println(name +" 等待接受消息");
            //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
            // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(name + " 接收到消息 '" + message + "'");
                }
            };
            //自动回复队列应答 -- RabbitMQ中的消息确认机制
            channel.basicConsume(queueName, true, consumer);
        }
    }
    
    5、接收 *.news
    
    package cn.how2j;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
     
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
     
    import cn.hutool.core.util.RandomUtil;
     
    public class TestCustomer4News {
        public final static String EXCHANGE_NAME="topics_exchange";
     
        public static void main(String[] args) throws IOException, TimeoutException {
            //为当前消费者取名称
            String name = "consumer-news";
             
            //判断服务器是否启动
            RabbitMQUtil.checkServer();
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置RabbitMQ地址
            factory.setHost("localhost");
            //创建一个新的连接
            Connection connection = factory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
            //交换机声明(参数为:交换机名称;交换机类型)
            channel.exchangeDeclare(EXCHANGE_NAME,"topic");
            //获取一个临时队列
            String queueName = channel.queueDeclare().getQueue();
            //接受 USA 信息
             
            channel.queueBind(queueName, EXCHANGE_NAME, "*.news");           
            System.out.println(name +" 等待接受消息");
            //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
            // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(name + " 接收到消息 '" + message + "'");
                }
            };
            //自动回复队列应答 -- RabbitMQ中的消息确认机制
            channel.basicConsume(queueName, true, consumer);
        }
    }
    

    相关文章

      网友评论

          本文标题:RabbitMQ-中间件

          本文链接:https://www.haomeiwen.com/subject/wptrfktx.html