美文网首页
Okhttp的线程池和高并发

Okhttp的线程池和高并发

作者: 小水neo | 来源:发表于2020-11-13 11:14 被阅读0次

Okhttp的源码分析

Okhttp的线程池和高并发

Okhttp链接池的使用

Okhttp的缓存机制

Okhttp的责任链模式

Okhttp的框架使用

建议安装目录插件食用
在实际项目中使用okhttp的时候(异步),通常遵循以下步骤:

  1. 创建okhttp实例用于代理(OkhttpClient 的对象拥有自己的线程池和链接池,所以可以将其封装成单例模式)

    OkhttpClient client = new OkhttpClient();
    
  2. 创建用于存放提交参数的RequestBody对象

    MediaType mediaType = MediaType.parse("application/json");
    RequestBody body = RequestBody.create(mediaType, RequestBodyHelper.getUserStarredRepoListInActivity(login));
    //mediaType 是传递的数据的MIME(媒体类型:json\xml\png\jpg\gif等)
    // 第二个参数为所需获得的数据
    
  3. 创建用于发送请求的Request对象

    Request request = builder.method("POST", body).build();
    
  4. 发送请求并获取服务器返回的数据,执行call的enqueue()(异步方式,实际使用)或者execute()(同步,第一行代码中的例子)

    client .newCall(request).enqueue(callback)
    //okttp 的操作元是Call 对象。
    

线程池的使用

源码位于Dispatcher.java(依赖用的是3.8.1版本,Kotlin工作量有点大。。。)

/** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

调度器 dispatcher 管理了三个队列:同步运行队列,异步运行队列、异步等待队列。同步暂时不做分析(加载耗时太长,用不到)异步如图所示,注Call本质是一个Runnable。

Runnable与Thread的区别

Thread类是在java.lang包中定义的。一个类只要继承了Thread类同时覆写了本类中的run()方法就可以实现多线程操作了,但是一个类只能继承一个父类

在实际开发中一个多线程的操作很少使用Thread类,而是通过Runnable接口完成。但是在使用Runnable定义的子类中没有start()方法,只有Thread类中才有。此时观察Thread类,有一个构造方法:public Thread(Runnable targer)此构造方法接受Runnable的子类实例,也就是说可以通过Thread类来启动Runnable实现的多线程。

异步请求最大同时请求数量
private int maxRequests = 64;
异步请求同一域名同时存在的最大请求数量
private int maxRequestsPerHost = 5;

Host和Call

执行过程

  1. 用户调用最后一步为enqueue,

    • 执行队列里面不足最大线程数maxRequests 并且Call 对应的host 数目不超过maxRequestsPerHost 的时候直接把call 对象直接推入到执行队列
    • 否则,当前线程数过多,就把 他推入到等待队列中。
    synchronized void enqueue(AsyncCall call) {
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          runningAsyncCalls.add(call);
          executorService().execute(call);
        } else {
          readyAsyncCalls.add(call);
        }
      }
    
    
  2. 将等待队列的线程调入到运行队列

    可以看到在线程的执行的时候最后都调用了, client.dispatcher().finished(this);

    @Override protected void execute() {
          boolean signalledCallback = false;
          try {
            Response response = getResponseWithInterceptorChain();
            if (retryAndFollowUpInterceptor.isCanceled()) {
              signalledCallback = true;
              responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
            } else {
              signalledCallback = true;
              responseCallback.onResponse(RealCall.this, response);
            }
          } catch (IOException e) {
            if (signalledCallback) {
              // Do not signal the callback twice!
              Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
            } else {
              responseCallback.onFailure(RealCall.this, e);
            }
          } finally {
            client.dispatcher().finished(this);
          }
        }
    
    

    进入finish方法的底层,可以看到call 在runningAsyncCalls 队列中被移除了(calls.remove(call),并重新计算了目前正在执行 线程数量,如果为零执行idle。此外最重要的是通过promoteCalls进行任务队列的调度。

    • 这里注意到remove和promotcalls的调用都加入了synchronized 同步锁(其他试图访问该代码块的线程会被阻塞),原因在步骤3中解释。
    private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          if (promoteCalls) promoteCalls();
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
        }
    
        if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
        }
      }
    
    
  3. promoteCalls任务队列调度:

    经过简单的判断后,首先remove方法,将线程移出等待队列,再通过add加入运行队列。

    • 最开始定义过runningAsyncCalls和readyAsyncCalls为双端队列,其在插入和删除的时候是非线程安全的,因此在调用他的时候(步骤2)加入了同步锁
    • 通过循环也可以知道,每次任务调度的时候迭代器试图要遍历整个队列,直到运行队列满时才return。这样做要比来回传递计数变量(运行队列还能添加几个进程)来的简单安全。
    private void promoteCalls() {
        if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall call = i.next();
    
          if (runningCallsForHost(call) < maxRequestsPerHost) {
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
          }
    
          if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
        }
      }
    
    

执行过程的总结

  • 用户在调用client .newCall(request).enqueue(callback)的时候,调度器dispatcher首先检查运行队列是否满,若满则将其调度到等待队列,每一个执行的线程(call)在execute的结束时都会调用promotcalls,从而使得等待进程进入运行队列。

  • 整体流程如下图所示

相关文章

网友评论

      本文标题:Okhttp的线程池和高并发

      本文链接:https://www.haomeiwen.com/subject/sgsvbktx.html