基本样例
生产者有三种模式 异步发送 同步发送 单项发送
同步发送,当消息发送的时,代码会阻塞知道borker返回信息,如果发送失败这重新发送,可以自定义重试间隔和重试次数
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//定义生产者, 生产者和消费者的group没有任何联系,生产者group是生产者集群,
DefaultMQProducer producer = new DefaultMQProducer ("please_rename_unique_group_name");
//生产者和消费者都是与nameserver连接,因为nameserver中维护了所有broker信息,包括topic和队列
producer.setNamesrvAddr ("192.168.44.145:9876");
//设置发送消息超时,默认为3秒,单位毫秒
producer.setSendMsgTimeout (1000);
//异步发送消息,重试次数,默认为2次
// producer.setRetryTimesWhenSendAsyncFailed (3);
//同步发送
producer.setRetryTimesWhenSendFailed (5);
//开启生产者
producer.start ();
for (int i = 0; i < 2; i++) {
try {
/**
* 第一个参数为topic
* 第二个参数tag,用与消费者过滤消息用,这种过滤发生在broker,而不是在消费者过滤,这样节省了带宽
* 第三个参数消息体
* message有构造函数中还有key,这个对消费者没有实际意义,是在查找message时可以快速查找
*/
Message msg = new Message ("TopicTest", "TagA" , ("Hello RocketMQ " + i).getBytes (RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//延时消息,开源版本只能定义下面的时间,不能自定义
//messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
//3级别为10s,就是10s后消息放到队列中
//msg.setDelayTimeLevel(3);
//这种方式为同步发送,消息发送到broker并收到了broker返回的消息,线程到这是阻塞的,保证了消息的可靠性
//消息发送,发送失败重试间隔
SendResult sendResult = producer.send (msg,1000);
System.out.printf ("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace ();
Thread.sleep (1000);
}
}
//关闭生产者
producer.shutdown ();
}
}
异步发送
生产者发送到borker的时候,不会等broker返回结果,代码向下执行,可以修改重试次数和重试间隔时间
public static void main(
String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
DefaultMQProducer producer = new DefaultMQProducer ("Jodie_Daily_test");
producer.setNamesrvAddr ("192.168.44.145:9876");
producer.start ();
producer.setRetryTimesWhenSendAsyncFailed (3);
int messageCount = 100;
//由于是异步发送,这里引入一个countDownLatch,保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。
final CountDownLatch countDownLatch = new CountDownLatch (messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message ("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes (RemotingHelper.DEFAULT_CHARSET));
//定义了异步发送的回调函数和重试时间间隔
producer.send (msg, new SendCallback () {
//发送成功回调方法
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown ();
System.out.printf ("%-10d OK %s %n", index, sendResult.getMsgId ());
}
//发送失败回调方法
@Override
public void onException(Throwable e) {
countDownLatch.countDown ();
System.out.printf ("%-10d Exception %s %n", index, e);
e.printStackTrace ();
}
}, 1000);
System.out.println ("消息发送完成");
} catch (Exception e) {
e.printStackTrace ();
}
}
countDownLatch.await (5, TimeUnit.SECONDS);
producer.shutdown ();
}
单向发送
不管发送是否成功,代码向下执行
//简单样例:同步发送消息
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("192.168.44.145:9876");
producer.start();
for (int i = 0; i < 20; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//同步传递消息,消息会发给集群中的一个Broker节点。
producer.sendOneway(msg);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
消费者
消费者有两种模式 pull和push(常用)
push模式:客户端与服务端建立连接后,当服务端有消息时,将消息推送到客户端。
pull模式:客户端不断的轮询请求服务端,来获取新的消息。
RocketMQ在消费端有push和pull两种模式,pull模式需要我们手动调用consumer拉消息,而push模式则只需要我们提供一个listener即可实现对消息的监听,而实际上,RocketMQ的push模式是基于pull模式实现的,它没有实现真正的push。
Pull模式 在生产中用的不多
关于pull模式可以参考下面大神的文章
https://blog.csdn.net/zhaohongfei_358/article/details/101457563
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
// 1. 创建DefaultMQPullConsumer实例
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
// 2. 设置NameServer
consumer.setNamesrvAddr("192.168.44.145:9876");
consumer.start();
// 3. 获取Topic的所有队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
// 4. 遍历所有队列
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
// 5. 拉取消息,arg1=消息队列,arg2=tag消息过滤,arg3=消息队列,arg4=一次最大拉去消息数量
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
// 6. 将消息放入hash表中,存储该队列的消费进度
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND: // 找到消息,输出
System.out.println(pullResult.getMsgFoundList().get(0));
break;
case NO_MATCHED_MSG: // 没有匹配tag的消息
System.out.println("无匹配消息");
break;
case NO_NEW_MSG: // 该队列没有新消息,消费offset=最大offset
System.out.println("没有新消息");
break SINGLE_MQ; // 跳出该队列遍历
case OFFSET_ILLEGAL: // offset不合法
System.out.println("Offset不合法");
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 7. 关闭Consumer
consumer.shutdown();
}
/**
* 从Hash表中获取当前队列的消费offset
* @param mq 消息队列
* @return long类型 offset
*/
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
/**
* 将消费进度更新到Hash表
* @param mq 消息队列
* @param offset offset
*/
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
Push模式
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("CID_JODIE_1");
consumer.setNamesrvAddr ("192.168.44.145:9876");
consumer.subscribe ("TopicTest", "*");
//设置每次拉去的消息数,默认一次只拉去一条消息,如果使用的时候发现,监听器中msgs还是为1的话,
//是因为每生产一个消息,就推到消费者,所以是msgs长度是1,可以先启动生产者生产消息,
//再启动消费者就会发现可以一次拉去了10条消息
consumer.setConsumeMessageBatchMaxSize (10);
//Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
//非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener (new MessageListenerConcurrently () {
//msgs从broker中拉去消息的个数
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf ("%s Receive New Messages: %s %n", Thread.currentThread ().getName (), msgs);
//返回成功,如果消费失败,返回later,消息会重新放到队列中,可能会被同一个消费组的其他消费者消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start ();
System.out.printf ("Consumer Started.%n");
}
回调函数,返回的ConsumeConcurrentlyStatus.CONSUME_SUCCESS,代表消息成功被消费,
如果返回ConsumeConcurrentlyStatus.RECONSUME_LATER,将消息放回消息队列,但不是原先的消息队列,是%RETRY%CID_JODIE_1, %RETRY%+消费组的名称,注意这里不是Topic名称.
考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时间,重试次数越多投递延时就越大。有一个参数messageDelayLevel,这个参数是在服务器端的Broker上配置的,默认是
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
默认是最多可以重试16次
如果重试了16次之后,这条消息还是没有被成功消费,那么就认为这条消息是抢救不过来了,此时,消息队列不会立刻将消息丢弃,于是它被放入了死信队列中,上面重试队列的图中你也可以看到死信队列,死信队列的名称是在原队列名称前加%DLQ%。如果你还是不死心的话,觉得这条消息还能抢救一下,可以开启一个后台线程不断扫描死信队列然后继续重试,也可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费
image.png
网友评论