美文网首页
用 java 盘 RabbitMq !

用 java 盘 RabbitMq !

作者: 一只小安仔 | 来源:发表于2019-05-30 18:30 被阅读0次

    一段时间没有更新了,今天来划划水吧!今天准备盘盘 rabbitmq ,首先我们得知道它是个啥!

    就不百度百科了,说我理解的吧,rabbitmq是一套开源的消息队列服务,它的同类型产品有Kafka (apache的), ActiveMQ, RocketMQ (阿里的)等。当然这些产品都有自己的特点,没有谁好谁坏,如何选型视场景而定。这里盘rabbitmq,因为中小型企业用的多。

    那什么是消息队列呢?我们可以把消息队列比作是一个存放消息的容器,这个容器以队列的形式呈现。队列嘛,跟食堂排队打饭一样,先排队的先打饭。而“消息”指的是在两台计算机间传送的数据单位。比方说你给别人在qq上发送一条数据,这个数据就是一个消息。

    你不禁要问,这玩意有啥用?官方回答是给分布式系统解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。可怕,还是完全不知道是干啥的吧。大佬们都喜欢拿这么一张图扯。

    同步处理

    上面这个图呢就是同步处理的意思(不要被他左上角的字蒙骗。)意思就是你在一个网站上注册一个账号的过程,这个过程包含三件事,注册信息写入数据库、给你邮箱发送注册成功的信息, 给你手机发送短信。发短信必须在发完邮件之后执行。如果这三件事做完后才给你浏览器发送注册成功的页面时,这个过程你会等150ms。你可能觉得这个时间已经很快了,丝毫不影响啥,可勤奋的程序员们老想着把这个时间再缩短,他们认为这还是太慢了,因为可能还有其他情况耗费一些时间,比如网络再故意延迟一会了,那可就让人难受了。下面这个图呢就是能再优化这个时间的异步处理啦。

    异步处理

    这个图呢把发邮件和短信做成并行的,同时开始执行,就是说发短信不用等发完邮件才执行。

    接下来说解耦, 下面这个图就是两个系统耦合了,库存系统直接调用订单系统的接口。这样订单系统接口一变,也要去改库存系统的代码。要是这两个系统分别是两拨人去开发,一方随便改改接口,说不定两拨人能打起来。


    耦合

    下面就是解耦场景,库存系统直接跟消息队列打交道,要是订单系统随便改接口代码,消息队列会打死他的,放心!就不需要库存系统这帮人出手了,你说要是库存系统和消息队列吵起来了咋整,放心,消息队列是别人牛逼的人写的,它俩吵起来,八成都会是库存系统做错了!


    解耦

    流量削锋一般在秒杀,团抢活动这些场景出现。双十一都不陌生,突然在这天访问量剧增。使用消息队列来抗住。


    image

    用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。

    异步消息,解耦,流量削锋这些消息队列的作用说完了(当然还有作用,自行百度吧,大佬们),该盘代码了。

    使用之前呢需要安装rabbitmq 这里不细说,因为比较简单。给个windows上安装的博客 : windows上rabbitmq安装参考
    ),唯一注意的是使用rabbitmq需要erlang语言环境,就像java应用需要jdk环境一样。玩linux的敲几句命令就完事了,就不用给教程了吧(手动滑稽)。

    接下来会慢慢更新 rabbitmq 五种模式的测试代码。源码存放于junan的码云仓库

    第一种 简单队列


    简单队列

    简单队列存在一个生产者,一个消费者,一个队列。
    首先需要建立普通maven项目,导入下依赖,只需要这一个依赖即可。

            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.4.3</version>
            </dependency>
    

    然后需要个connection工具类,它作用嘛用于和rabbitmq的建立连接。

    package com.junan.rabbitmqTest.utils;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * <p>
     *         rabbitmq 工具类产生 rabbitmq 连接
     * </p>
     *
     * @author junan
     * @version 1.0.0
     * @since 19-5-29
     */
    public class ConnectionsUtil {
        //设置 rabbitmq 服务ip地址
        private static String host = "localhost";
        //设置 rabbitmq 服务端口
        private static Integer port = 5672;
        //设置 rabbitmq 服务登录用户名
        private static String username = "admin";
        //设置 rabbitmq 服务虚拟主机名
        private static String virtualHost = "/test";
        //设置 rabbitmq 服务登录密码
        private static String password = "chenjunan";
        //通过这个方法获取连接
        public static Connection getConnection(){
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(host);
            factory.setPort(port);
            factory.setUsername(username);
            factory.setPassword(password);
            factory.setVirtualHost(virtualHost);
            try {
                return factory.newConnection();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            return null;
        }
    
    }
    

    使用这个工具类是对应修改static属性即可。然后建立生产者。

    package com.junan.rabbitmqTest.simple;
    
    import com.junan.rabbitmqTest.utils.ConnectionsUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * <p>
     *      简单消息队列生产者
     * </p>
     *
     * @author junan
     * @version 1.0.0
     * @since 19-5-29
     */
    public class Producer {
        //给队列取的名字
        private static final  String QUEUE_NAME = "rabbitmq_simple";
    
        public static void main(String[] args) {
            //从工具类获取连接
            Connection connection = ConnectionsUtil.getConnection();
            try {
                //从连接获取通道
                Channel channel = connection.createChannel();
                //声明队列(具体参数请查api)
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //从通道发送 字节消息
                channel.basicPublish("", QUEUE_NAME, null, "hello rabbitmq".getBytes());
                //关闭通道和连接
                channel.close();
                connection.close();
                System.out.println("<==  已发送一条消息!  ==>");
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    

    接下来是消费者。

    package com.junan.rabbitmqTest.simple;
    
    import com.junan.rabbitmqTest.utils.ConnectionsUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * <p>
     *              简单消息队列消费者
     * </p>
     *
     * @author junan
     * @version 1.0.0
     * @since 19-5-29
     */
    public class Consumer {
    
        //给队列取的名字
        private static final String QUEUE_NAME = "rabbitmq_simple";
    
        public static void main(String[] args) {
            //从工具类获取连接
            Connection connection = ConnectionsUtil.getConnection();
            try {
                //从连接获取通道
                Channel channel = connection.createChannel();
                //声明队列(具体参数请查api)
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //创建消费者(这里只是创建)
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                    //重写这个方法:事件模型,当有消息传来时执行这个方法,相当于listener
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        super.handleDelivery(consumerTag, envelope, properties, body);
                        //打印消息内容
                        System.out.println(new String(body));
                    }
                };
                //这里开始消费 ,需要把创建的消费者传入
                channel.basicConsume(QUEUE_NAME, true, consumer);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    先运行消费者Consumer的main方法。再运行生产者Producer中的main方法发送一条消息,看Consumer消费者能不能收到,反正我是收到了(嘻嘻)。

    producer
    consumer

    以上代码基本使用了rabbitmq的简单模式用生产者给消费者发送了一条消息。

    第二种 work 模式


    work

    这种模式和简单模式的区别就是可以有多个消费者进行消费,我这里使用两个消费者演示。具体参考代码
    这次的生产者一共生产50条消息共两个消费者消费。

    package com.junan.rabbitmqTest.work;
    
    import com.junan.rabbitmqTest.utils.ConnectionsUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    /**
     * <p>
     *          工作消息队列生产者
     * </p>
     *
     * @author junan
     * @version 1.0.0
     * @since 19-5-30
     */
    @SuppressWarnings("all")
    public class Producer {
        //给队列取的名字
        private static final  String QUEUE_NAME = "rabbitmq_work";
    
        public static void main(String[] args) {
    
            Connection connection = ConnectionsUtil.getConnection();
            Channel channel = null;
            try {
                channel = connection.createChannel();
                channel.queueDeclareNoWait(QUEUE_NAME, false, false, false, null);
                //发送50个消息
                for (int i = 0; i < 50; i++) {
                    String msg = " hello   " + i;
                    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                    Thread.sleep(100);
                }
                System.out.println("<==========   生产者已发送 50 条消息!   ==========>");
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                try {
                    if(channel != null)
                        channel.close();
                    if(connection != null)
                        connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    以下是两个消费者。

    package com.junan.rabbitmqTest.work;
    
    import com.junan.rabbitmqTest.utils.ConnectionsUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * <p>
     *          工作队列消费者一号
     * </p>
     *
     * @author junan
     * @version 1.0.0
     * @since 19-5-30
     */
    @SuppressWarnings("all")
    public class Consumer1 {
        //给队列取的名字
        private static final String QUEUE_NAME = "rabbitmq_work";
    
        public static void main(String[] args) {
    
            Connection connection = ConnectionsUtil.getConnection();
            Channel channel = null;
            try {
                channel = connection.createChannel();
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                Consumer consumer = new DefaultConsumer(channel) {
                    //重写这个方法:事件模型,当有消息传来时执行这个方法,相当于listener
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        super.handleDelivery(consumerTag, envelope, properties, body);
                        System.out.println("<== consumer1 ==>  " + new String(body));
                        try {
                             //这里每次循环休息200ms, 让两个消费者休息时间不同, 看他的运行结果
                            Thread.sleep(200);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                };
                channel.basicConsume(QUEUE_NAME, true, consumer);
                System.out.println("<==========   消费者一号启动!   ==========>");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    package com.junan.rabbitmqTest.work;
    
    import com.junan.rabbitmqTest.utils.ConnectionsUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * <p>
     *          工作队列消费者二号
     * </p>
     *
     * @author junan
     * @version 1.0.0
     * @since 19-5-30
     */
    @SuppressWarnings("all")
    public class Consumer2 {
        //给队列取的名字
        private static final String QUEUE_NAME = "rabbitmq_work";
    
        public static void main(String[] args) {
    
            Connection connection = ConnectionsUtil.getConnection();
            Channel channel = null;
            try {
                channel = connection.createChannel();
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        super.handleDelivery(consumerTag, envelope, properties, body);
                        System.out.println("<== consumer2 ==>  " + new String(body, "utf-8"));
                        try {
                            //这里每次循环休息400ms, 让两个消费者休息时间不同, 看他的运行结果
                            Thread.sleep(400);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                };
                channel.basicConsume(QUEUE_NAME, true, consumer);
                System.out.println("<==========   消费者二号启动!   ==========>");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    先启动两个消费者等待消息,然后启动生产者给消息队列发送消息。

    结果如下:

    消费者1号
    消费者2号

    可以看出我们虽然设置了这两个消费者每次消费休息不同时长。可是从结果看,这两个消费者采用轮询的方式消费这些消息。也就是消息队列给这两个消费者消息很公平,一人一个的给。不管你忙碌还是空闲。这种方式相比简单队列能减轻一部分压力。

    第三种 公平分发
    这种方式能需要消费者手动确认收到消息,rabbitmq才会给他分发下一条消息。注意对比work模式的代码

    生产者 需要对channel设置每次值发送一条消息给消费者

    package com.junan.rabbitmqTest.workFair;
    
    import com.junan.rabbitmqTest.utils.ConnectionsUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    /**
     * <p>
     *           公平分发队列生产者
     * </p>
     *
     * @author junan
     * @version 1.0.0
     * @since 19-5-30
     */
    @SuppressWarnings("all")
    public class Producer {
        //给队列取的名字
        private static final  String QUEUE_NAME = "rabbitmq_work";
    
        public static void main(String[] args) {
    
            Connection connection = ConnectionsUtil.getConnection();
            Channel channel = null;
            try {
                channel = connection.createChannel();
                //限制每次发送一条消息给消费者
                channel.basicQos(1);
                channel.queueDeclareNoWait(QUEUE_NAME, false, false, false, null);
                //发送50个消息
                for (int i = 0; i < 50; i++) {
                    String msg = " hello   " + i;
                    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                    Thread.sleep(100);
                }
                System.out.println("<==========   生产者已发送 50 条消息!   ==========>");
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                try {
                    if(channel != null)
                        channel.close();
                    if(connection != null)
                        connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    消费者1号 (二号类似就不粘代码了,) 注意设置限制每次发送一条消息给消费者和设置autoAck为false(basicConsume这个方法的第二个参数)

    package com.junan.rabbitmqTest.workFair;
    
    import com.junan.rabbitmqTest.utils.ConnectionsUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * <p>
     *          公平分发消费者一号
     * </p>
     *
     * @author junan
     * @version 1.0.0
     * @since 19-5-30
     */
    @SuppressWarnings("all")
    public class Consumer1 {
        //给队列取的名字
        private static final String QUEUE_NAME = "rabbitmq_work";
    
        public static void main(String[] args) {
    
            Connection connection = ConnectionsUtil.getConnection();
            try {
                final Channel channel = connection.createChannel();
                //限制每次发送一条消息给消费者
                channel.basicQos(1);
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                Consumer consumer = new DefaultConsumer(channel) {
                    //重写这个方法:事件模型,当有消息传来时执行这个方法,相当于listener
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        super.handleDelivery(consumerTag, envelope, properties, body);
                        System.out.println("<== consumer1 ==>  " + new String(body));
                        try {
                            //这里每次循环休息200ms, 让两个消费者休息时间不同, 看他的运行结果
                            Thread.sleep(200);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        //设置返回的确认消息
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                };
                //设置不自动确认消息,当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。
                channel.basicConsume(QUEUE_NAME, false, consumer);
                System.out.println("<==========   消费者一号启动!   ==========>");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    接下来看结果

    消费者1号
    消费者2号

    可以看到这个效果就比较满意了,谁的空闲时间多就能多拿到一下消息。能充分利用消费者的能力。
    ps: autoAck :这是一个boolean参数,等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。
    ---------------------------------------------------------------------- 有时间继续更新。

    相关文章

      网友评论

          本文标题:用 java 盘 RabbitMq !

          本文链接:https://www.haomeiwen.com/subject/iulvtctx.html