美文网首页
RxJava部分操作符

RxJava部分操作符

作者: SMSM | 来源:发表于2018-09-02 16:05 被阅读31次

toBlocking().firstOrDefault()同步,结果直接返回,而不再经过回调

@Override
public  Response execSync(Request request) {
    request.setSync(true);
    Observable<Response> observable = networkService.exec(request);
    return observable.onErrorReturn(new Func1<Throwable, Response>() {
        @Override
        public Response call(Throwable throwable) {
            return new Response.Builder().statusCode(ErrorCode.CODE_ERROR_INNER).error(throwable).build();
        }
    }).subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).toBlocking().firstOrDefault(defaultErrorResp);
}

实现原理是,借助一个final CountDownLatch latch = new CountDownLatch(1);
一直阻塞直到为0后,当前线程才能返回结果。

流的传递

CallAdapterFactory 使用异步rxjava,call内部使用同步rxjava,关于切换网络框架也就是failover机制,没有采用flatmap,而是在暂存上个流的subscriber ,新开一个流在error 或者 onNext中调用上个流的subscriber相关方法完成上个流。

有failover的概念

对Runable对象可以被同时run多次

   @Override
    public void run() {
        if (startTime == 0) {
            startTime = timestamp();
            if (timeout > 0) {
                scheduleRun(this, timeout);
            }
            try {
                sendQueue.add(this);
            } catch (Exception e) {
                log("encrypt > sendqueue beyond limit");
                TunnelResponse resp = new TunnelResponse();
                resp.id = request.id;
                resp.statusCode = ErrorCode.TUNNEL_CODE_SEND_FULL;
                this.resp = resp;
                done(this);
                return;
            }
            synchronized (Tunnel.this) {
                if (sendThread == null) {
                    sendThread = new SendThread();
                    sendThread.start();
                }
            }
        } else if (timeout > 0 && resp == null
                && runningSessions.get(request.id) == this) {
            long elapse = timestamp() - startTime + 1;
            if (elapse >= timeout) {
                resp = new TunnelResponse();
                resp.id = request.id;
                resp.statusCode = ErrorCode.TUNNEL_CODE_TIMEOUT;
                done(this);
            }
        }
    }

   //大家都这样玩了,监控+生产模式
    private class SendThread extends Thread {
    public SendThread() {
        super("tunnel_send");
    }

    @Override
    public void run() {
        handlerRequest();
    }

    private void handlerRequest() {
        while (true) {
            Session s;
            try {
                s = sendQueue.take();
            } catch (InterruptedException e) {
                synchronized (Tunnel.this) {
                    if (sendThread == this) {
                        sendThread = null;
                    }
                }
                break;
            }
      。。。。。。。。。
                      connection.send(s.request);
                        sent = true;
                        s.connection = connection;
                        //启动ack超时定时器
                        scheduleRun(s.ackTimeoutRunnable(),NVGlobalConfig.instance().getHttpHold());
    }

    //成功后删除队列任务
    sendQueue.remove(s);
    unscheduleRun(s);
    //取消ack超时定时器
    if(s.ackTimeoutRunnable != null) {
        unscheduleRun(s.ackTimeoutRunnable);
    }

1、借助阻塞队列实现的生产消费模式,如果没有阻塞队列,就要自己实现了。
2、同时通过巧妙的思路实现了超时检测,把当前Runnable会被执行两次,延时发送借助标志位判断是否超时,第一次执行成功后会把第二次删除,不会走到超时处理。如果未删除走超时处理。
3、为提高可用性,引入两类超时。
ACK超时,判断链路状态的,依赖不同的网路3G4GWifi等给定不同的值,比如3G 5s 4G3s,用static静态全局变量记录所有请求总的超时次数,达到6次后就更新IP重建socket(Tunnel、RxAndroidTunnelService)
请求超时 单次请求超时,返回错误码

提高可用性
通道的选择原则(更新ip测速选最好的)、切换socket通道(6次ack超时、网络类型动态超时时长)、failover机制(触发条件)

Netty是异步编程框架,客户端发起请求之后,不会同步等待结果返回,需要自己实现同步等待机制。实现思路如上,或者用 okio的超时机制

有个MySession的概念

用于request 和 response基于唯一id配对

private static final AtomicInteger reqId = new AtomicInteger(new Random(
        System.nanoTime()).nextInt());

public static String generateHttpRequestId() {
    return Integer.toString(0xfffffff & reqId.getAndIncrement());
}

相关文章

网友评论

      本文标题:RxJava部分操作符

      本文链接:https://www.haomeiwen.com/subject/rpymwftx.html