美文网首页
RabbitMQ中,为何不关闭Connection,主线程一直不

RabbitMQ中,为何不关闭Connection,主线程一直不

作者: AlstonWilliams | 来源:发表于2018-11-01 21:22 被阅读318次

    在Spark中,我们在Driver里放了一个RabbitMQ,接收消息,然后处理。避免一次次地上传Jar包,进行前面的准备操作。

    然而,在实际操作中,发现了一些诡异的事情。

    第一件事情就是,当我们使用yarn -kill命令,将这个Job kill掉之后,我们发现,实际上Driver并没有被kill掉。还是会继续监听那个队列,而且,SparkContext确实是被关闭掉了。

    而正常的情况,应当是yarn -kill以后,Driver被kill掉。

    于是,我猜想,是因为在poll消息的时候,由于有wait存在,使Driver的线程阻塞了,不能接收到ApplicationMaster发送来的kill消息,导致Driver不会停掉。

    然而,在验证这个想法的过程中,我发现,即使不poll消息,Driver还是不会停止。

    代码如下:

    package com.hyper.cdp.label.utils;
    
    import com.hyper.util.CacheConfUtils;
    import com.hyper.util.Log;
    import com.hyper.util.Mapper;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class RabbitMQAccessor implements Log {
    
        private String host;
        private String port;
        private String username;
        private String password;
        private String vhost;
        private String queue;
    
        private ConnectionFactory connectionFactory;
        private Connection connection;
        private Channel channel;
        private QueueingConsumer queueingConsumer;
    
        public RabbitMQAccessor(String queue) {
            this(CacheConfUtils.get("amqp.host", ""),
                    CacheConfUtils.get("amqp.port", "5672"),
                    CacheConfUtils.get("amqp.username", ""),
                    CacheConfUtils.get("amqp.password", ""),
                    CacheConfUtils.get("amqp.vhost", ""),
                    queue);
        }
    
        public RabbitMQAccessor(String host, String port, String username, String password,
                                String vhost, String queue) {
            this.host = host;
            this.port = port;
            this.username = username;
            this.password = password;
            this.vhost = vhost;
            this.queue = queue;
    
        }
    
        public void init() throws IOException, TimeoutException {
            connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(host);
            connectionFactory.setPort(Integer.valueOf(port));
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost(vhost);
    
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
    
            channel.queueDeclare(queue, true, false, false, null);
            queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume(queue, queueingConsumer);
        }
    
        private QueueingConsumer.Delivery currentDelivery = null;
    
        public String poll() {
            String result = null;
            try {
                currentDelivery = queueingConsumer.nextDelivery();
                if (currentDelivery != null) {
                    byte[] bs = currentDelivery.getBody();
                    result = new String(bs);
                }
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return result;
        }
    
        public Object pollJson(Class targetClass) {
            Object result = null;
    
            String string = poll();
            if (string == null || string.trim().equals("")) return null;
    
            try {
                result = Mapper.readValue(string, targetClass);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
            return result;
        }
    
        public void ack() {
            try {
                channel.basicAck(currentDelivery.getEnvelope().getDeliveryTag(), false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void close() {
            try {
                channel.close();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws IOException, TimeoutException {
            RabbitMQAccessor rabbitMQAccessor = new RabbitMQAccessor("host",
                    "5672",
                    "a",
                    "b",
                    "/c",
                    "queue");
            rabbitMQAccessor.init();
            System.out.println("------- before close");
            rabbitMQAccessor.close();
    
            System.out.println("------- after close");
        }
    }
    

    就简单地初始化了一下,然后关闭一下,想结束掉这个程序,但是它竟然不给我停。

    所以我就很奇怪。于是猜测是创建Connection时搞得鬼。

    通过断点调试,还真发现是它在捣蛋。

    AMQConnection这个类中,我们能看到,在创建Connection时,有这么一段:

    我们再看MainLoop的实现:

    最重要的是那个while循环。

    我们可以看到,只要_running是true,它就会一直运行。而_running只有当Connection关闭时,才会是false.

    你可能会说,这个是子线程啊。跟主线程不结束有什么关系。

    我们从StackOverflow中可以得知,在Java中,只要存在一个不是daemon的子线程还在运行,主线程就不会退出

    实际上,这个问题很简单。但是一年没碰这种Java的基础知识,好多内容都有点忘记了。所以啊,还是要温故知新啊。

    另外,上面对Spark的那个问题,我的猜测是错误的。

    上面的代码,正确的应该是,在close方法中,加上connection.close()

    相关文章

      网友评论

          本文标题:RabbitMQ中,为何不关闭Connection,主线程一直不

          本文链接:https://www.haomeiwen.com/subject/ifrfxqtx.html