测试代码如下:
DefaultMQProducer producer = new DefaultMQProducer("ConsumerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
producer.setSendMsgTimeout(30000);
for (int i = 0; i < 1; i++) {
Message msg = new Message("topicName5", ("ffffffffff " + i).getBytes("UTF-8"));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
Thread.sleep(1);
}
System.out.println("生产者发送了");
producer.shutdown();
测试前提:1 台mqnamesrv,1台broker。
- main线程和main方法入口
- new DefaultMQProducer("ConsumerGroup5");
- 设置namespace,设置producerGroup,设置defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
- 这里需要注意,在DefaultMQProducer构造函数中new了DefaultMQProducerImpl,但是参数有this,就说明这两个对象互相绑定,并且在执行过程中也常有互相调用。
- setNamesrvAddr,内部就是给属性赋值。
- 众多设置属性方法
- producer.start();
- this.defaultMQProducerImpl.start();
- this.defaultMQProducer.changeInstanceNameToPID();
- this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
- new MQClientInstance
- mQClientAPIImpl = new MQClientAPIImpl
- defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
- new MQClientInstance
- boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
- this.producerTable.putIfAbsent(group, producer);
- mQClientFactory.start();
- this.mQClientAPIImpl.start();
- this.startScheduledTask();
- this.pullMessageService.start();
- this.rebalanceService.start();
- this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
- this.defaultMQProducerImpl.start();
- new DefaultMQProducer("ConsumerGroup5");
网友评论