本文是martinfowler.com上关于分布式系统模式的文章。原文地址为:Request Pipeline。如有侵权请联系我。
通过在连接上发送多个请求而不等待前一个请求的响应来提高延迟比。
2020年8月20日
问题
如果请求需要等待前一个请求返回的响应,那么使用单个套接字通道在群集中的服务器之间通信可能会导致性能问题。为了获得更好的吞吐量和延迟,服务器上的请求队列应该足够满,以确保服务器容量得到充分利用。例如,当在一个服务器中使用单个更新队列时,它总是可以接受更多的请求,直到队列填满为止,而它正在处理一个请求。如果一次只发送一个请求,则大部分服务器容量都会被不必要地浪费。
解决方案
节点向其他节点发送请求,而不等待以前请求的响应。这是通过创建两个独立的线程来实现的,一个用于通过网络通道发送请求,另一个用于接收来自网络通道的响应。
图1:请求管道发送方节点通过套接字通道发送请求,而不等待响应。
class SingleSocketChannel…
public void sendOneWay(RequestOrResponse request) throws IOException {
var dataStream = new DataOutputStream(socketOutputStream);
byte[] messageBytes = serialize(request);
dataStream.writeInt(messageBytes.length);
dataStream.write(messageBytes);
}
启动一个单独的线程来读取响应。
class ResponseThread…
class ResponseThread extends Thread implements Logging {
private volatile boolean isRunning = false;
private SingleSocketChannel socketChannel;
public ResponseThread(SingleSocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
try {
isRunning = true;
logger.info("Starting responder thread = " + isRunning);
while (isRunning) {
doWork();
}
} catch (IOException e) {
getLogger().error(e); //thread exits if stopped or there is IO error
}
}
public void doWork() throws IOException {
RequestOrResponse response = socketChannel.read();
logger.info("Read Response = " + response);
processResponse(response);
}
响应处理程序可以立即处理响应或将其提交到单个更新队列。
请求管道有两个问题需要处理。
如果请求连续发送而不等待响应,则接受请求的节点可能会不堪重负。由于这个原因,对于一次可以进行的请求有一个上限。任何节点最多可以向其他节点发送最大数量的请求。一旦在没有收到响应的情况下发送了最大的进行中请求,则不会再接受更多请求,发送方将被阻止。限制最大进行中请求的一个非常简单的策略是保持阻塞队列来跟踪请求。队列初始化为可执行的请求数。一旦收到请求的响应,它将从队列中移除,以便为更多请求创建空间。如下面的代码所示,每个套接字连接最多接受5个进行中请求。
class RequestLimitingPipelinedConnection…
private final Map<InetAddressAndPort, ArrayBlockingQueue<RequestOrResponse>> inflightRequests = new ConcurrentHashMap<>();
private int maxInflightRequests = 5;
public void send(InetAddressAndPort to, RequestOrResponse request) throws InterruptedException {
ArrayBlockingQueue<RequestOrResponse> requestsForAddress = inflightRequests.get(to);
if (requestsForAddress == null) {
requestsForAddress = new ArrayBlockingQueue<>(maxInflightRequests);
inflightRequests.put(to, requestsForAddress);
}
requestsForAddress.put(request);
一旦收到响应,请求将从进行中的请求队列中删除。
处理故障和维护订购保证变得很难实现。假设在进行中有两个请求。第一个请求失败并重试,服务器可能在重试的第一个请求到达服务器之前处理了第二个请求。服务器需要某种机制来确保无序请求被拒绝。否则,在失败和重试的情况下,总是有重新排序消息的风险。例如,Raft总是发送每个日志条目预期的前一个日志索引。如果先前的日志索引不匹配,服务器将拒绝该请求。 Kafka可以允许最大通信时间.每个连接的请求数要不止一个,使用幂等生产者实现,它为发送到代理的每个消息批分配一个唯一的标识符。然后,代理可以检查传入请求的序列号,并在请求无序时拒绝该请求。
示例
Kafka鼓励客户机使用请求流水线来提高吞吐量。
网友评论