美文网首页
RabbitMQ消息队列Android的实现方式之接收(二)

RabbitMQ消息队列Android的实现方式之接收(二)

作者: 自由自在_Android | 来源:发表于2019-07-12 11:49 被阅读0次

    上一期说了RabbitMQ发送消息的方法,这部分发一下接收和关闭连接的方法实现,废话少说直接上代码。

    private void subscribe(final Handler handler) {//创建消费者,并监听消费信息
            subscribeThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (isCycle) {//定义轮询方法
                        try {
                            connection = factory.newConnection();//创建新的连接
                            channel = connection.createChannel();//创建通道
                            channel.basicQos(1);//一条一条接收消息
                            String queueName = "queueName" + System.currentTimeMillis();//channel.queueDeclare().getQueue();//随机生成队列名
                            //参数:1、队列名 2、是否永久保留队列 3、是否独占队列 4、连接不存在时是否自动删除队列 5其他属性(构造参数),没有可以null
                            AMQP.Queue.DeclareOk q = channel.queueDeclare(queueName, false, false, true, null);
                            //将队列名、交换机名称、交换机key绑定在一起。
                            channel.queueBind(q.getQueue(), "CalonDirectExchange", "CalonDirectRouting");
                            isCycle = false;//禁止连接循环
                            Consumer consumer = new DefaultConsumer(channel) {//新版本更新的接收方法,无需手写轮询
                                @Override
                                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                                    super.handleDelivery(consumerTag, envelope, properties, body);
                                    String mMessage = new String(body, "UTF-8");
                                    //根据自己的需求定义的接收内容
                                    Gson gson = new Gson();
                                    AgvStateBean bean = gson.fromJson(mMessage, AgvStateBean.class);//json转实体
                                    int code = bean.getCode();
                                    Message message = handler.obtainMessage();
                                    Bundle bundle = new Bundle();
                                    bundle.putSerializable("bean", bean);
                                    message.setData(bundle);
                                    switch (code) {
                                        case 2000:
                                            message.what = 2000;
                                            handler.sendMessage(message);
                                            Log.e("AGV", "2000");
                                            break;
                                        case 4000:
                                            message.what = 4000;
                                            handler.sendMessage(message);
                                            Log.e("AGV", "4000");
                                            break;
                                    }
    
                                }
                            };
                            channel.basicConsume(q.getQueue(), true, consumer);
                            Message messageCode = new Message();
                            messageCode.what = 200;//连接成功
                            handler.sendMessage(messageCode);
                            while (!isCycle) {//判断连接是否端口,断开重连
                                if (!connection.isOpen()) {
                                    try {
                                        Thread.sleep(5000); //sleep and then try again沉睡5秒返回循环
                                        isCycle = true;
                                    } catch (InterruptedException e) {
                                        break;
                                    }
                                }
    
                            }
                            /*while (true) {老方法
                                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                                String message = new String(delivery.getBody());
                                Log.d("","[r] " + message);
                                Message msg = handler.obtainMessage();
                                Bundle bundle = new Bundle();
                                bundle.putString("msg", message);
                                msg.setData(bundle);
                                handler.sendMessage(msg);
                            }*/
                        } catch (Exception e1) {
                            Message message = new Message();
                            message.what = 400;
                            Bundle bundle = new Bundle();
                            if (reconnection_count > 5) {//自定义重连次数,到数停止重连
                                String strConn = "尝试重连失败!";
                                bundle.putString("conn_interrupt", strConn);
                                bundle.putBoolean("close_conn", true);
                                message.setData(bundle);
                                handler.sendMessage(message);
                                reconnection_count = 1;
                                return;
                            }
    
                            String strConn = "连接中断,正在尝试第" + reconnection_count + "次重连";
                            bundle.putBoolean("close_conn", false);
                            bundle.putString("conn_interrupt", strConn);
                            message.setData(bundle);
                            handler.sendMessage(message);
                            reconnection_count++;
                            Log.d("", "Connection broken: " + e1.getClass().getName());
                            try {
                                Thread.sleep(5000); //sleep and then try again
                            } catch (InterruptedException e) {
                                break;
                            }
                        }
                    }
                }
            });
            subscribeThread.start();//start thread
        }
    

    断开连接的方法

    public void closeConn() {//关闭连接并暂停子线程
            Thread closeThread = new Thread(() -> {
                try {
                    //判断连接是否存在或连接,否则会崩溃
                    if (connection != null && connection.isOpen()) connection.close();
                    if (channel != null && channel.isOpen()) channel.close();
                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
            });
            closeThread.start();
            closeThread.interrupt();
    
            if (publishThread != null) publishThread.interrupt();
            if (subscribeThread != null) subscribeThread.interrupt();
        }
    

    好了,到这里关于Android版RabbitMQ的发送和接收方法就都完成了。代码写的欠妥的地方欢迎各位大神们指正。谢谢大家

    相关文章

      网友评论

          本文标题:RabbitMQ消息队列Android的实现方式之接收(二)

          本文链接:https://www.haomeiwen.com/subject/mekekctx.html