toBlocking().firstOrDefault()同步,结果直接返回,而不再经过回调
@Override
public Response execSync(Request request) {
request.setSync(true);
Observable<Response> observable = networkService.exec(request);
return observable.onErrorReturn(new Func1<Throwable, Response>() {
@Override
public Response call(Throwable throwable) {
return new Response.Builder().statusCode(ErrorCode.CODE_ERROR_INNER).error(throwable).build();
}
}).subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).toBlocking().firstOrDefault(defaultErrorResp);
}
实现原理是,借助一个final CountDownLatch latch = new CountDownLatch(1);
一直阻塞直到为0后,当前线程才能返回结果。
流的传递
CallAdapterFactory 使用异步rxjava,call内部使用同步rxjava,关于切换网络框架也就是failover机制,没有采用flatmap,而是在暂存上个流的subscriber ,新开一个流在error 或者 onNext中调用上个流的subscriber相关方法完成上个流。
有failover的概念
对Runable对象可以被同时run多次
@Override
public void run() {
if (startTime == 0) {
startTime = timestamp();
if (timeout > 0) {
scheduleRun(this, timeout);
}
try {
sendQueue.add(this);
} catch (Exception e) {
log("encrypt > sendqueue beyond limit");
TunnelResponse resp = new TunnelResponse();
resp.id = request.id;
resp.statusCode = ErrorCode.TUNNEL_CODE_SEND_FULL;
this.resp = resp;
done(this);
return;
}
synchronized (Tunnel.this) {
if (sendThread == null) {
sendThread = new SendThread();
sendThread.start();
}
}
} else if (timeout > 0 && resp == null
&& runningSessions.get(request.id) == this) {
long elapse = timestamp() - startTime + 1;
if (elapse >= timeout) {
resp = new TunnelResponse();
resp.id = request.id;
resp.statusCode = ErrorCode.TUNNEL_CODE_TIMEOUT;
done(this);
}
}
}
//大家都这样玩了,监控+生产模式
private class SendThread extends Thread {
public SendThread() {
super("tunnel_send");
}
@Override
public void run() {
handlerRequest();
}
private void handlerRequest() {
while (true) {
Session s;
try {
s = sendQueue.take();
} catch (InterruptedException e) {
synchronized (Tunnel.this) {
if (sendThread == this) {
sendThread = null;
}
}
break;
}
。。。。。。。。。
connection.send(s.request);
sent = true;
s.connection = connection;
//启动ack超时定时器
scheduleRun(s.ackTimeoutRunnable(),NVGlobalConfig.instance().getHttpHold());
}
//成功后删除队列任务
sendQueue.remove(s);
unscheduleRun(s);
//取消ack超时定时器
if(s.ackTimeoutRunnable != null) {
unscheduleRun(s.ackTimeoutRunnable);
}
1、借助阻塞队列实现的生产消费模式,如果没有阻塞队列,就要自己实现了。
2、同时通过巧妙的思路实现了超时检测,把当前Runnable会被执行两次,延时发送借助标志位判断是否超时,第一次执行成功后会把第二次删除,不会走到超时处理。如果未删除走超时处理。
3、为提高可用性,引入两类超时。
ACK超时,判断链路状态的,依赖不同的网路3G4GWifi等给定不同的值,比如3G 5s 4G3s,用static静态全局变量记录所有请求总的超时次数,达到6次后就更新IP重建socket(Tunnel、RxAndroidTunnelService)
请求超时 单次请求超时,返回错误码
提高可用性
通道的选择原则(更新ip测速选最好的)、切换socket通道(6次ack超时、网络类型动态超时时长)、failover机制(触发条件)
Netty是异步编程框架,客户端发起请求之后,不会同步等待结果返回,需要自己实现同步等待机制。实现思路如上,或者用 okio的超时机制
有个MySession的概念
用于request 和 response基于唯一id配对
private static final AtomicInteger reqId = new AtomicInteger(new Random(
System.nanoTime()).nextInt());
public static String generateHttpRequestId() {
return Integer.toString(0xfffffff & reqId.getAndIncrement());
}
网友评论