AMQPConnection
实例初始化
创建Connection时会通过FrameHandlerFacotry创建一个SocketFrameHandler,SocketFrameHandler对Socket进行了封装。
public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
{
checkPreconditions();
this.username = params.getUsername();
this.password = params.getPassword();
this._frameHandler = frameHandler;
this._virtualHost = params.getVirtualHost();
this._exceptionHandler = params.getExceptionHandler();
this._clientProperties = new HashMap<String, Object>(params.getClientProperties());
this.requestedFrameMax = params.getRequestedFrameMax();
this.requestedChannelMax = params.getRequestedChannelMax();
this.requestedHeartbeat = params.getRequestedHeartbeat();
this.shutdownTimeout = params.getShutdownTimeout();
this.saslConfig = params.getSaslConfig();
this.executor = params.getExecutor();
this.threadFactory = params.getThreadFactory();
this._channelManager = null;
this._brokerInitiatedShutdown = false;
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
}
启动连接
初始化WorkService和HeartBeatSender。
创建一个channel0的AMQChannel,这个channel不会被ChannelManager管理。
首先channel0会将一个BlockingRpcContinuation作为当前未完成的Rpc请求,用于接收handshake的响应。
然后channel0会向socket中写入一条只有header的消息作为handshake,header中包含了客户端的版本号。
紧接着会启动主循环线程,主循环会通过SocketFrameHandler从socket接收字节流。此时接收到的第一条数据是服务端响应handshake返回的Connection.Start信息(包含服务端版本、机制、基础信息)。
主循环线程启动后,主线程会阻塞地等待服务端的handshake响应。拿到响应之后会对服务器回传的信息进行比对,然后发送Connection.StartOK的信息去服务端(这个请求也还是阻塞式的),等待服务端回传Connection.Tune(包含最大channel数、最大frame长度和heartbeat间隔)。将这些信息与实例初始化是的设置信息进行对比,初始化ChannelManager
紧接着发送Connection.TuneOk和Connection.Open消息去服务端,完成connection的建立。
Connection > MainLoop > readFrame
消息体
Frame是对AMQP消息的封装:包含frame的type、channel号、消息内容
type|channelNumber|payloadSize|payload|frameEndMarker
Payload包含了消息类型、消息头和消息主题
method|header|body
消息发送和接收
消息的发送和接收都要channel来完成。
创建Channel
通过Connection的ChannelManager来创建Channel,通过指定的ChannelNumber或者由分配器分配。创建好的Channel实例会放入ChannelManager的Map中,key为ChannelNumber。由此可见Channel是Connection唯一的。
public ChannelN createChannel(AMQConnection connection);
public ChannelN createChannel(AMQConnection connection, int channelNumber);
private ChannelN addNewChannel(AMQConnection connection, int channelNumber);
protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService);
Channel实例化之后会调用Channel.open方法,发送Channel.Open去服务端(阻塞式),等待服务端响应Channel.OpenOk。
消息发送
Channel.transmit 发送消息,调用AMQCommand.transmit完成发送。
AMQCommand.transmit将消息封装成Frame,通过connection的SocketFrameHandler写入OutpuStream。
消息接收
主循环线程在链接创建完成后会监听socket,从InputStream中读取二进制流封装成Frame。通过Frame中的ChannelNumber从ChannelManager中获取对应的Channel实例处理Frame。
while (_running) {
Frame frame = _frameHandler.readFrame();
if (frame != null) {
_missedHeartbeats = 0;
if (frame.type == AMQP.FRAME_HEARTBEAT) {
// Ignore it: we've already just reset the heartbeat counter.
} else {
if (frame.channel == 0) { // the special channel
_channel0.handleFrame(frame);
} else {
if (isOpen()) {
// If we're still _running, but not isOpen(), then we
// must be quiescing, which means any inbound frames
// for non-zero channels (and any inbound commands on
// channel zero that aren't Connection.CloseOk) must
// be discarded.
ChannelManager cm = _channelManager;
if (cm != null) {
cm.getChannel(frame.channel).handleFrame(frame);
}
}
}
}
} else {
// Socket timeout waiting for a frame.
// Maybe missed heartbeat.
handleSocketTimeout();
}
}
Channel会使用已经准备好的AMQCommand处理Frame,并未下一个Frame准备新的AMQCommand。
public void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
_command = new AMQCommand(); // prepare for the next one
handleCompleteInboundCommand(command);
}
}
AMQCommad会使用CommandAssembler依次从Frame的payload中检出对应的Method、Header和Body。如果检出了Body,整个Frame会被检出完成。如过未完成,会进入主循环再次处理直至完成。
public synchronized boolean handleFrame(Frame f) throws IOException
{
switch (this.state) {
case EXPECTING_METHOD: consumeMethodFrame(f); break;
case EXPECTING_CONTENT_HEADER: consumeHeaderFrame(f); break;
case EXPECTING_CONTENT_BODY: consumeBodyFrame(f); break;
default:
throw new AssertionError("Bad Command State " + this.state);
}
return isComplete();
}
Frame被检出完后,会根据Method的类型进入不同的异步处理流程。
Method在channel打开和关闭的情况下会以下的可能:
Channel打开:Basic.Deliver, Basic.Return, Basic.Flow, Basic.Ack, Basic.Nack, Basic.RecoveryOk, Basic.Cancel
Channel关闭:Channel.CloseOk
生产和消费
生产
调用Channel.basicPublish()方法,指定exchange、routingKey等信息,消息属性、消息体。封装成Baisc.Publish,放入AMQCommand,最后调用transmit方法完成发送。参考消息发送
public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate,
BasicProperties props, byte[] body)
throws IOException
{
if (nextPublishSeqNo > 0) {
unconfirmedSet.add(getNextPublishSeqNo());
nextPublishSeqNo++;
}
BasicProperties useProps = props;
if (props == null) {
useProps = MessageProperties.MINIMAL_BASIC;
}
transmit(new AMQCommand(new Basic.Publish.Builder()
.exchange(exchange)
.routingKey(routingKey)
.mandatory(mandatory)
.immediate(immediate)
.build(),
useProps, body));
}
消费
创建QueueingConsumer实例,然后调用Channel.basicConsume方法。
queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume("queue_name", queueingConsumer);
new Thread() {
@Override
public void run() {
while (true) {
try {
final QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
new Thread() {
@Override
public void run() {
try{
delivery.getEnvelope();//消息头
delivery.getProperties();//消息属性
delivery.getBody();//消息体
}finally{
//channel.basicAck();
//channel.basicNack()
}
}
}.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
QueueingConsumer实现了Consumer接口。
Channel.basicConsume方法会封装Channel.Consume消息发送到服务端(阻塞式),等待服务端的Channel.ConsumeOk响应(包含了服务端为Consumer分配的ConsumerTag)。然后将QueueingConsumer放入Map中,key为ConsumerTag。consumer是Channel唯一。
当客户端接收到消息,参考消息接收。Basic.Deliver类型的消息(consumerTag、deliveryTag、redelivered、exchange、routingKey)会进入消费处理流程。Channel根据ConsumerTag从Map中获取对应的QueueConsumer实例,由Channel的ConsumerDispatcher通过Connection初始化的WorkService创建新的处理线程,调用QueueConsumer实例的handleDelivery方法。QueueConsumer将消息封装成Delivery对象,放入BlockingQueue中。
消费线程等待新的Delivery(阻塞式),之后创建新的线程完成消息的处理。
网友评论