JMS
JMS(Java Message Service):java消息服务,它是一套API,定义了Java程序访问消息中间件的接口,包括了消息的创捷,发送和接收.大名鼎鼎的ActiveMQ便是Apache出品的关于JMS的一种实现,还有阿里巴巴的RocketMQ.
在java消息服务中,一共有三种角色,producer(生产者),broker(中间件),consumer(消费者),生产者生产消息到中间件,消费者获取消息进行消费.其示意图如下:
JMS示意图
JMS的规范
既然说JMS是一套规范,一套接口,那么我们可以在javax.jms包里面找到这些接口
jms
这套规范定义了连接工厂(ConnectionFactory),连接(Connection),会话(Session),目的地(Destination),生产者(Producer),消费者(Consumer)等对象
JMS
连接工厂(ConnectionFactory)
客户端使用一个连接工厂对象连接到JMS服务提供者,它创建了JMS服务提供者和客户端之间的连接。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROCKER_URL);
JMS连接(Connection)
连接对象封装了与生产者与JMS中间件之间的虚拟连接,使用该连接,客户端能够与目的地通讯,往队列或话题发送/接收消息,JMS客户端(如发送者或接受者)会在JNDI名字空间中搜索并获取该连接。
Connection connection = connectionFactory.createConnection();
//使用的时候一定要先打开它,
connection.start();
//使用中
...
//创建完连接后,需要在程序使用结束后关闭它:
connection.close();
JMS 会话(Session)
Session是一个单线程上下文,用于生产和消费消息,可以创建出消息生产者和消息消费者。
Session对象实现了Session接口,在创建完连接后,我们可以使用它创建Session。
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
createSession中第一个参数表示是否支持事务,true为支持,false为不支持。
第二个参数表示事务类型,当为AUTO_ACKNOWLEDGE时 ,接收到消息(receive 或 onMessage运行完毕,成功返回时),即为消费成功,然后broker从队列里移除该数据。不关心该数据有没有正确被处理成我们想要的结果;Session设置为CLIENT_ACKNOWLEDGE 时,必须手动调用acknowledge 方法才为消费成功,然后从队列里移除该条数据。
目的地(Destination)
目的地指明消息被发送的目的地以及客户端接收消息的来源。JMS使用两种目的地,队列和话题。如下代码指定了一个队列和话题。
//创建一个topic
Destination destination = session.createTopic("GiftMsg");
//创建一个queue
Destination destination = session.createQueue("GiftMsg");
JMS消息生产者
消息生产者由Session创建,用于往目的地发送消息。生产者实现MessageProducer接口,我们可以为目的地、队列或话题创建生产者;
//创建生产者
MessageProducer messageProducer = session.createProducer(destination);
//生产者发送消息
TextMessage textMessage = session.createTextMessage("Hello World");
messageProducer.send(textMessage);
JMS消息消费者
消息消费者由Session创建,用于接受目的地发送的消息。消费者实现MessageConsumer接口,我们可以为目的地、队列或话题创建消费者;
//创建消费者
MessageConsumer consumer = session.createConsumer(destination);
//消费者用receive消费消息
TextMessage textMessage = (TextMessage) consumer.receive();
//消费者用监听器消费信息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
}
});
Queue模式 和 Topic模式
我们可以分为点对点模式(Queue)与订阅模式(Topic),二者比较如下:
类型 | 点对点模式 | 订阅模型 |
---|---|---|
消费消息的消费者数量 | 无论有多少消费者监听消息,当中只有一个消费者能处理消息 | 所有监听消息的消费者都能处理该消息 |
获取消息方式 | pull(消费者先发个请求询问broker是否有消息,如果有消息broker会把消息发给中间件) | push(无须消费者询问,broker会主动把消息发送订阅者) |
持久化 | 消息会被保存到DB中,broker重启,消息犹在,如果没有消费者消费,消息就不会消失.只有当消息被消费,broker才会对消息进行进一步处理,比如删除等 | 消息不会落地,一到broker就会被发送出去,随后消息消失 |
完整性 | 每一条信息都能被消费 | 如果没有监听者进行监听,消息就会消失 |
接受策略 | 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。 | 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 |
receive 和 onMessage
如果为false,表示使用同步。当consumer使用receive()获取消息时,那么session将会把消息添加到consumer的本地queue,然后唤醒receive等待;当consumer使用messageListener异步侦听消息时,将会调用其onMessage()方法直到方法执行完毕,然后返回。
同步和异步
JMS本身就是异步机制,即生产者发送消息以后,可以不管消费者是否接收到消息,从而继续工作,但是根据消费者接收信息的方式,可以分成同步接收和异步接收.
同步接收(pull)
消费者主动接收消息,即不断从中间件中拉取和轮询消息,如果消息队列为空,消费者会发生阻塞.
异步接收(push)
消费者设置监听器,当中间件有消息的时候,会发广播给各个消费者,而不用消费者定期去拉取。消费者在消费消息的过程中,还可以去做其他事,不会发生阻塞。
以下有些东西纯属臆想:
ack机制
不管是queue还是topic,每个消费者启动后,都会在中间件那里注册一个“会话”,用来存放与该消费者相关的数据(消费者与各个中间件的“会话”保持联系)。消费者消费完消息以后,会发送一个ack给中间件,告诉它我已经消费完这个消息了,然后中间件就会删掉总表或者会话当中对应的消息。当中间件收到消息的时候,这里我猜,queue肯定是会放在一个总表里面,然后由总表再随机发放到queue的某个“会话当中”,而topic我感觉会放在各个topic的“会话”中。当消息被push或者pull出去以后,消息的状态可能就会发生变化,比如由原来的0变成了1,当消费者返回ack后,这个消息就会被删掉。如果是queue,还会把总表的消息状态改变一下。
optimizeAcknowledge
String brokerUrl = "tcp://localhost:61616?" +
"jms.optimizeAcknowledge=true" +
"&jms.optimizeAcknowledgeTimeOut=30000" +
"&jms.redeliveryPolicy.maximumRedeliveries=6";
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
optimizeAcknowledge是针对消费者而言的,当optimizeAcknowledge=false的时候,表示逐个确认,中间件会发放一系列的消息给消费者,消费者每消费完一个就会发送ack给中间件;如果是等于true,即批量确认,消费者会消费满一定量的消息以后,再批量确认这批消息。
prefetchSize
那么这个一定量是多少呢?那就是prefetchSize,prefetchSize的值可以如下设置:
String queueName = "test-queue?customer.prefetchSize=100";
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue(queueName);
prefetchSize的值不仅针对消费者,也针对中间件当针对消费者时,即上面谈到的消费满prefetchSize个消息后发送确认;当针对中间件的时候,中间件会推送消息给消费者,当推送出去且还没消费的消息满prefetchSize个的时候,中间件便不会再向消费者推送消息,当prefetchSize = 0的时候,broker不会向消费者推送消息,即消费者想要获取消息,有且只能通过pull来拉取。那么客户端是怎么判断是否达到prefetchSize的呢?我想这就跟我们之前说的“会话”,只要中间件检测一下会话中的状态=1的消息的的数量即可。
activeMQ的参数
alwaysSyncSend/useAsyncSend
jms.alwaysSyncSend=false&jms.useAsyncSend=true
用来设定producer(或者session)发送消息的方式:同步/异步。
网友评论