美文网首页
ActiveMQ学习

ActiveMQ学习

作者: 黄二的NPE | 来源:发表于2018-07-29 17:48 被阅读9次
    • JMS

    JMS(Java Message Service):java消息服务,它是一套API,定义了Java程序访问消息中间件的接口,包括了消息的创捷,发送和接收.大名鼎鼎的ActiveMQ便是Apache出品的关于JMS的一种实现,还有阿里巴巴的RocketMQ.
    在java消息服务中,一共有三种角色,producer(生产者),broker(中间件),consumer(消费者),生产者生产消息到中间件,消费者获取消息进行消费.其示意图如下:


    JMS示意图
    • JMS的规范

    既然说JMS是一套规范,一套接口,那么我们可以在javax.jms包里面找到这些接口


    jms

    这套规范定义了连接工厂(ConnectionFactory),连接(Connection),会话(Session),目的地(Destination),生产者(Producer),消费者(Consumer)等对象


    JMS

    连接工厂(ConnectionFactory)
    客户端使用一个连接工厂对象连接到JMS服务提供者,它创建了JMS服务提供者和客户端之间的连接。

    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROCKER_URL);
    

    JMS连接(Connection)
    连接对象封装了与生产者与JMS中间件之间的虚拟连接,使用该连接,客户端能够与目的地通讯,往队列或话题发送/接收消息,JMS客户端(如发送者或接受者)会在JNDI名字空间中搜索并获取该连接。

    Connection connection = connectionFactory.createConnection();
    //使用的时候一定要先打开它,
    connection.start();
    //使用中
    ...
    //创建完连接后,需要在程序使用结束后关闭它:
    connection.close();
    

    JMS 会话(Session)
    Session是一个单线程上下文,用于生产和消费消息,可以创建出消息生产者和消息消费者。
    Session对象实现了Session接口,在创建完连接后,我们可以使用它创建Session。

    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    

    createSession中第一个参数表示是否支持事务,true为支持,false为不支持。
    第二个参数表示事务类型,当为AUTO_ACKNOWLEDGE时 ,接收到消息(receive 或 onMessage运行完毕,成功返回时),即为消费成功,然后broker从队列里移除该数据。不关心该数据有没有正确被处理成我们想要的结果;Session设置为CLIENT_ACKNOWLEDGE 时,必须手动调用acknowledge 方法才为消费成功,然后从队列里移除该条数据。

    目的地(Destination)
    目的地指明消息被发送的目的地以及客户端接收消息的来源。JMS使用两种目的地,队列和话题。如下代码指定了一个队列和话题。

    //创建一个topic
    Destination destination = session.createTopic("GiftMsg");
    //创建一个queue
    Destination destination = session.createQueue("GiftMsg");
    

    JMS消息生产者
    消息生产者由Session创建,用于往目的地发送消息。生产者实现MessageProducer接口,我们可以为目的地、队列或话题创建生产者;

    //创建生产者
    MessageProducer messageProducer = session.createProducer(destination);
    //生产者发送消息
    TextMessage textMessage = session.createTextMessage("Hello World");
    messageProducer.send(textMessage);
    

    JMS消息消费者
    消息消费者由Session创建,用于接受目的地发送的消息。消费者实现MessageConsumer接口,我们可以为目的地、队列或话题创建消费者;

    //创建消费者
    MessageConsumer consumer  = session.createConsumer(destination);
    //消费者用receive消费消息
    TextMessage textMessage = (TextMessage) consumer.receive();
    //消费者用监听器消费信息
     consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        TextMessage textMessage = (TextMessage)message;
                    }
     });
    
    • Queue模式 和 Topic模式

    我们可以分为点对点模式(Queue)与订阅模式(Topic),二者比较如下:

    类型 点对点模式 订阅模型
    消费消息的消费者数量 无论有多少消费者监听消息,当中只有一个消费者能处理消息 所有监听消息的消费者都能处理该消息
    获取消息方式 pull(消费者先发个请求询问broker是否有消息,如果有消息broker会把消息发给中间件) push(无须消费者询问,broker会主动把消息发送订阅者)
    持久化 消息会被保存到DB中,broker重启,消息犹在,如果没有消费者消费,消息就不会消失.只有当消息被消费,broker才会对消息进行进一步处理,比如删除等 消息不会落地,一到broker就会被发送出去,随后消息消失
    完整性 每一条信息都能被消费 如果没有监听者进行监听,消息就会消失
    接受策略 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器
    • receive 和 onMessage

    如果为false,表示使用同步。当consumer使用receive()获取消息时,那么session将会把消息添加到consumer的本地queue,然后唤醒receive等待;当consumer使用messageListener异步侦听消息时,将会调用其onMessage()方法直到方法执行完毕,然后返回。

    • 同步和异步

    JMS本身就是异步机制,即生产者发送消息以后,可以不管消费者是否接收到消息,从而继续工作,但是根据消费者接收信息的方式,可以分成同步接收和异步接收.
    同步接收(pull)
    消费者主动接收消息,即不断从中间件中拉取和轮询消息,如果消息队列为空,消费者会发生阻塞.
    异步接收(push)
    消费者设置监听器,当中间件有消息的时候,会发广播给各个消费者,而不用消费者定期去拉取。消费者在消费消息的过程中,还可以去做其他事,不会发生阻塞。

    以下有些东西纯属臆想:

    • ack机制

    不管是queue还是topic,每个消费者启动后,都会在中间件那里注册一个“会话”,用来存放与该消费者相关的数据(消费者与各个中间件的“会话”保持联系)。消费者消费完消息以后,会发送一个ack给中间件,告诉它我已经消费完这个消息了,然后中间件就会删掉总表或者会话当中对应的消息。当中间件收到消息的时候,这里我猜,queue肯定是会放在一个总表里面,然后由总表再随机发放到queue的某个“会话当中”,而topic我感觉会放在各个topic的“会话”中。当消息被push或者pull出去以后,消息的状态可能就会发生变化,比如由原来的0变成了1,当消费者返回ack后,这个消息就会被删掉。如果是queue,还会把总表的消息状态改变一下。

    optimizeAcknowledge
    String brokerUrl = "tcp://localhost:61616?" +   
                       "jms.optimizeAcknowledge=true" +   
                       "&jms.optimizeAcknowledgeTimeOut=30000" +   
                       "&jms.redeliveryPolicy.maximumRedeliveries=6";  
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);  
    

    optimizeAcknowledge是针对消费者而言的,当optimizeAcknowledge=false的时候,表示逐个确认,中间件会发放一系列的消息给消费者,消费者每消费完一个就会发送ack给中间件;如果是等于true,即批量确认,消费者会消费满一定量的消息以后,再批量确认这批消息。

    prefetchSize

    那么这个一定量是多少呢?那就是prefetchSize,prefetchSize的值可以如下设置:

    String queueName = "test-queue?customer.prefetchSize=100";  
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    Destination queue = session.createQueue(queueName);  
    

    prefetchSize的值不仅针对消费者,也针对中间件当针对消费者时,即上面谈到的消费满prefetchSize个消息后发送确认;当针对中间件的时候,中间件会推送消息给消费者,当推送出去且还没消费的消息满prefetchSize个的时候,中间件便不会再向消费者推送消息,当prefetchSize = 0的时候,broker不会向消费者推送消息,即消费者想要获取消息,有且只能通过pull来拉取。那么客户端是怎么判断是否达到prefetchSize的呢?我想这就跟我们之前说的“会话”,只要中间件检测一下会话中的状态=1的消息的的数量即可。

    • activeMQ的参数

    alwaysSyncSend/useAsyncSend

    jms.alwaysSyncSend=false&jms.useAsyncSend=true
    用来设定producer(或者session)发送消息的方式:同步/异步。

    相关文章

      网友评论

          本文标题:ActiveMQ学习

          本文链接:https://www.haomeiwen.com/subject/ngkvhftx.html