美文网首页javaWeb学习Java 杂谈
利用SynchronousQueue多线程处理ActiveMQ消

利用SynchronousQueue多线程处理ActiveMQ消

作者: 大侠陈 | 来源:发表于2019-04-16 20:42 被阅读14次

当我们想通过多条线程处理activemq中的消息,直觉上会使用固定大小线程池去处理,然而这种方式并不妥当,这么做我们只是将消息从activemq转移到线程池的阻塞队列之中,当线程池开始工作,activemq中的消息快速被消费完毕,而消息所代表的任务却并未真正被处理, 他们被堆积在处理程序的内存中,并陆续由线程中的线程处理。这会产生副作用,此时当处理程序因为某种原因而崩溃,这些待处理的任务都将丢失。

如何实现既能通过多个线程处理任务,又能保证未完成的任务的安全性,此时 SynchronousQueue 就有了用武之地。

我们可以把SynchronousQueue 当作长度为1的阻塞队列,当队列被塞入一个元素,假如这个元素未被消费掉,那么后续的塞入操作将被阻塞。我们可以利用它的这个特性,把它当作是activemq与处理线程之间的缓冲层。在 SynchronousQueue 的一端,我们从activemq中读取一个元素,并将它put进SynchronousQueue 。在另一端,多条线程分别从 SynchronousQueue 中 take 元素进行处理,只有当 SynchronousQueue 中不存在任何元素,也就是线程们将当前的任务都处理完毕,还有一端的从activemq中提取消息的操作才能执行,反之则将被阻塞。 通过这种方式,我们便能保证任务不丢失的同时又能通过多线程处理它们。示例代码如下

初始化一个 SynchronousQueue

private SynchronousQueue<ActiveMQObjectMessage> synchronousQueue = new SynchronousQueue<>();

从activemq中将消息转移至synchronousQueue,一次转移一条,如果上一条未被处理,下一条不能继续

ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection
                    .DEFAULT_PASSWORD, brokerUrl);
connection = factory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(dest);
MessageConsumer consumer = session.createConsumer(destination);

while (true) {
  try {
    Message message = consumer.receive();
    if (message instanceof ActiveMQObjectMessage) {
      ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage) message;
      synchronousQueue.put(activeMQObjectMessage);
    } else {
      if (message != null) {
        message.acknowledge();
        logger.error("消息格式错误,msg={}",message.toString());
      }
    }
  } catch (JMSException | InterruptedException e) {
    e.printStackTrace();
  }
}

开启多条线程同时处理消息

  Runnable task = () -> {
            while (true) {
                try {
                    ActiveMQObjectMessage activeMQObjectMessage = synchronousQueue.take();
                      //消费消息,处理成功后确认
                       boolean complete = handle(msg);
                        if (complete) {
                            activeMQObjectMessage.acknowledge();
                        }
                } catch ( JMSException e) {
                    e.printStackTrace();
                }
            }
        };

        for (int i = 0; i < threads; i++) {
            Thread thread = new Thread(task);
            thread.setName("log-task-" + i);
            thread.start();
        }

相关文章

网友评论

    本文标题:利用SynchronousQueue多线程处理ActiveMQ消

    本文链接:https://www.haomeiwen.com/subject/ejbiwqtx.html