美文网首页
同步队列生产与消费实现

同步队列生产与消费实现

作者: 长孙俊明 | 来源:发表于2019-10-13 11:19 被阅读0次
package com.example.activemq.syncqueue;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 简单生产者
 */
public class Producer {
    public static void main(String[] args) {
        new ProducerThread("tcp://47.106.100.211:61616", "sync-queue").start();
    }

    static class ProducerThread extends Thread {
        String brokerUrl;
        String destinationUrl;

        public ProducerThread(String brokerUrl, String destinationUrl) {
            this.brokerUrl = brokerUrl;
            this.destinationUrl = destinationUrl;
        }

        @Override
        public void run() {
            ActiveMQConnectionFactory connectionFactory;
            Connection conn;
            Session session;

            try {
                // 1、创建连接工厂
                connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

                // 2、创建连接
                conn = connectionFactory.createConnection();
                conn.start(); // 一定要start

                // 3、创建会话(可以创建一个或者多个session)
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

                // 4、创建消息发送目标 (Topic or Queue)
                Destination destination = session.createQueue(destinationUrl);

                // 5、用目的地创建消息生产者
                MessageProducer producer = session.createProducer(destination);
                // 设置递送模式(持久化 / 不持久化)
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                producer.setPriority(7);

                // 6、创建一条文本消息
                String text = "Hello world! From: " + Thread.currentThread().getName() + " : "
                        + System.currentTimeMillis();
                TextMessage message = session.createTextMessage(text);

                // 7、通过producer 发送消息
                System.out.println("Sent message: " + text);
                producer.send(message);

                // 8、 清理、关闭连接
                session.close();
                conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

package com.example.activemq.syncqueue;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 简单消费者
 */
// http://activemq.apache.org/consumer-features.html
public class Consumer {
    public static void main(String[] args) {
        new ConsumerThread("tcp://47.106.100.211:61616", "sync-queue").start();
    }
}

class ConsumerThread extends Thread {

    String brokerUrl;
    String destinationUrl;

    public ConsumerThread(String brokerUrl, String destinationUrl) {
        this.brokerUrl = brokerUrl;
        this.destinationUrl = destinationUrl;
    }

    @Override
    public void run() {
        ActiveMQConnectionFactory connectionFactory;
        Connection conn;
        Session session;
        MessageConsumer consumer;

        try {
            // brokerURL
            // http://activemq.apache.org/connection-configuration-uri.html
            // 1、创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);

            // 2、创建连接对象
            conn = connectionFactory.createConnection();
            conn.start(); // 一定要启动

            // 3、创建会话(可以创建一个或者多个session)
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 4、创建消息消费目标(Topic or Queue)
            Destination destination = session.createQueue(destinationUrl);

            // 5、创建消息消费者 http://activemq.apache.org/destination-options.html
            consumer = session.createConsumer(destination);

            // 6、接收消息(没有消息就持续等待)
            Message message = consumer.receive();

            if (message instanceof TextMessage) {
                System.out.println("收到文本消息:" + ((TextMessage) message).getText());
            } else {
                System.out.println(message);
            }

            consumer.close();
            session.close();
            conn.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

相关文章

  • 队列

    队列: 先进先出 栈: 先进后出 1.使用Queue实现生产者与消费者解耦 可以使用队列来实现线程间的同步 生产者...

  • 同步队列生产与消费实现

  • Java - SynchronousQueue学习使用

    SynchronousQueue叫同步队列,它与其他队列的不同之处是生产put与消费take都是阻塞操作,当生产的...

  • 生产者和消费者的Java实现方式

    引言 生产者与消费者问题是典型的多线程同步问题。生产者与消费者分别是两个角色,需要维护一个公共队列,生产者向队列中...

  • SynchronousQueue

    这个队列没有容量,一条也没有。基于生产者-消费者模式,可实现同步阻塞的功能。生产者生产数据后,如果没有消费者进行消...

  • 多进程、多线程、生成器实现生产者消费者模型

    多线程实现 多线程实现生产者消费者模型的逻辑十分简单,生产者与消费者之间通过队列来进行通讯,所以生产者不用等待消费...

  • 生产者-消费者

    产品:Product类工厂:生产线队列 LinkedList ; 两个同步方法:生产produce() 消费 c...

  • 生产者消费者

    生产者/消费者模式(阻塞队列) 生产者消费者模型的实现

  • 2018-07-27

    17.7.queue- 同步队列类源代码:Lib / queue.py 该queue模块实现了多生产者,多消费者队...

  • 并发编程05--Java中的锁(Lock接口和队列同步器)

    Java中的锁Lock接口队列同步器队列同步器的接口与示例队列同步器的实现分析同步队列独占式同步状态获取与释放共享...

网友评论

      本文标题:同步队列生产与消费实现

      本文链接:https://www.haomeiwen.com/subject/iouhmctx.html