生产者
根据自定义的规则,将某一类的消息发送到同一个queue中,可以hash与队列的个数取余
public static void main(String[] args) throws UnsupportedEncodingException {
try {
//默认情况下,broker为每一个topic创建4个queue,生产者把要顺序生产的消息一次的发送到同一个队列
DefaultMQProducer producer = new DefaultMQProducer ("please_rename_unique_group_name");
producer.setNamesrvAddr ("192.168.44.145:9876");
producer.start ();
for (int i = 0; i < 10; i++) {
int orderId = i;
for (int j = 0; j <= 5; j++) {
Message msg =
new Message ("OrderTopicTest", "order_" + orderId, "KEY" + orderId,
("order_" + orderId + " step " + j).getBytes (RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send (msg, new MessageQueueSelector () {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = ( Integer ) arg;
int index = id % mqs.size ();
//返回就是消息发往到queue的id
return mqs.get (index);
}
//orderId就是select的arg变量
}, orderId);
System.out.printf ("%s%n", sendResult);
}
}
producer.shutdown ();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace ();
}
}
消费者
消费者要注意使用的监听器MessageListenerOrderly,处理同一个queue的消息使用的是一个线程,单线程获取一个queue的消息保证了消息的顺序消费
public static void main(String[] args) throws MQClientException {
//顺序消费不是全局顺序,只是分区顺序。要全局顺序只能分配一个queue。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("please_rename_unique_group_name_3");
consumer.setNamesrvAddr ("192.168.44.145:9876");
consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe ("OrderTopicTest", "*");
//顺序消费,就需要保证消费端用同一个线程处理一个queue的消息
consumer.registerMessageListener (new MessageListenerOrderly () {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit (true);
for (MessageExt msg : msgs) {
System.out.println ("收到消息内容 " + new String (msg.getBody ())+"消息队列id-"+msg.getQueueId ());
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start ();
System.out.printf ("Consumer Started.%n");
}
消费者收到的消息, order_X代表同一类的消息,step X 是该类消息的步骤,
可以看出来,同一类的消息的步骤是按照顺序的,但是不同类之间可能会相互的穿插,证明了顺序消费是局部消费,不是全局消费.如果要实现全局消费的话,topic创建的时候只创建一个queue(默认为4个)
收到消息内容 order_3 step 0消息队列id-3
收到消息内容 order_2 step 0消息队列id-2
收到消息内容 order_0 step 0消息队列id-0
收到消息内容 order_1 step 0消息队列id-1
收到消息内容 order_1 step 1消息队列id-1
收到消息内容 order_3 step 1消息队列id-3
收到消息内容 order_2 step 1消息队列id-2
收到消息内容 order_0 step 1消息队列id-0
收到消息内容 order_3 step 2消息队列id-3
收到消息内容 order_1 step 2消息队列id-1
收到消息内容 order_0 step 2消息队列id-0
收到消息内容 order_2 step 2消息队列id-2
收到消息内容 order_1 step 3消息队列id-1
收到消息内容 order_3 step 3消息队列id-3
收到消息内容 order_2 step 3消息队列id-2
收到消息内容 order_0 step 3消息队列id-0
收到消息内容 order_3 step 4消息队列id-3
收到消息内容 order_1 step 4消息队列id-1
网友评论