Exchanger构造在Transporter之上,其重点在于构建数据传输的数据结构,以Transporter作为数据传输手段,实现 Request-Response 信息交换语义
1.HeaderExchanger
HeaderExchangeClient和HeaderExchangeServer是具体实现
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
2.HeaderExchangeClient
HeaderExchangeClient在具体发送数据时,是由ExchangeChannel实现的,
HeaderExchangeChannel是对Client的封装
public class HeaderExchangeClient implements ExchangeClient {
private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeClient.class);
private final Client client;
private final ExchangeChannel channel;
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
this.channel = new HeaderExchangeChannel(client);
}
@Override
public ResponseFuture request(Object request) throws RemotingException {
return channel.request(request);
}
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
return channel.request(request, timeout);
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
channel.send(message, sent);
}
}
3.HeaderExchangeChannel
可以看到HeaderExchangeChannel内部实际还是调用了Channel相关方法,但其发送的数据封装了Request对象
final class HeaderExchangeChannel implements ExchangeChannel {
private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeChannel.class);
private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL";
private final Channel channel;
private volatile boolean closed = false;
HeaderExchangeChannel(Channel channel) {
if (channel == null) {
throw new IllegalArgumentException("channel == null");
}
this.channel = channel;
}
static HeaderExchangeChannel getOrAddChannel(Channel ch) {
if (ch == null) {
return null;
}
HeaderExchangeChannel ret = (HeaderExchangeChannel) ch.getAttribute(CHANNEL_KEY);
if (ret == null) {
ret = new HeaderExchangeChannel(ch);
if (ch.isConnected()) {
ch.setAttribute(CHANNEL_KEY, ret);
}
}
return ret;
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
}
if (message instanceof Request
|| message instanceof Response
|| message instanceof String) {
channel.send(message, sent);
} else {
Request request = new Request();
request.setVersion("2.0.0");
request.setTwoWay(false);
request.setData(message);
channel.send(request, sent);
}
}
4.心跳检查
HeaderExchangeServer和HeaderExchangeClient同时还实现了Server端和Client的心跳检查
网友评论