美文网首页
利用协程提高客户端并发性能的尝试

利用协程提高客户端并发性能的尝试

作者: 阿呆少爷 | 来源:发表于2018-12-24 11:22 被阅读84次

    假设服务器端提供无限的服务能力,客户端如何达到最高的QPS?如果使用多线程来提高并发,线程数量会成为瓶颈。是否可以通过协程来提高客户端的并发能力?下面做了一组测试,分别使用同步、异步、协程+同步等方式请求百度的首页,运行10s,对比一下QPS。

    测试环境

    2017年MBP,2核4线程,16GB内存。

    Maven依赖

    <properties>
        <kotlin.version>1.3.11</kotlin.version>
    </properties>
    
    <dependency>
        <groupId>org.jetbrains.kotlin</groupId>
        <artifactId>kotlin-stdlib-jdk8</artifactId>
        <version>${kotlin.version}</version>
    </dependency>
    
    <dependency>
        <groupId>org.jetbrains.kotlin</groupId>
        <artifactId>kotlin-test</artifactId>
        <version>${kotlin.version}</version>
        <scope>test</scope>
    </dependency>
    
    <dependency>
        <groupId>org.jetbrains.kotlinx</groupId>
        <artifactId>kotlinx-coroutines-core</artifactId>
        <version>1.1.0</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpasyncclient</artifactId>
        <version>4.1</version>
        <scope>test</scope>
    </dependency>
    
    <dependency>
        <groupId>khttp</groupId>
        <artifactId>khttp</artifactId>
        <version>0.1.0</version>
    </dependency>
    
    <dependency>
        <groupId>io.github.rybalkinsd</groupId>
        <artifactId>kohttp</artifactId>
        <version>0.5.0</version>
    </dependency>
    
    <dependency>
        <groupId>com.squareup.okhttp3</groupId>
        <artifactId>okhttp</artifactId>
        <version>3.12.1</version>
    </dependency>
    

    Kotlin多线程同步

    使用Kotlin的khttp发起同步请求。

    import org.junit.Test
    import java.util.concurrent.atomic.AtomicInteger
    
    public class TestHttpSyncClient {
    
        val api = "https://www.baidu.com"
    
        val completedCount = AtomicInteger(0)
        var failedCount = AtomicInteger(0)
    
        @Test
        fun test() {
    
            for (i in 0..80) {
    
                Thread() {
    
                    while (true) {
    
                        try {
    
                            var response = khttp.get(api)
    
                            if (response.statusCode == 200) {
                                completedCount.incrementAndGet()
                            } else {
                                failedCount.incrementAndGet()
                            }
                        }catch (e : Exception) {
    
                            failedCount.incrementAndGet()
                        }
                    }
                }.start()
            }
    
            Thread.sleep(10 * 1000)
            println("completedCount: ${completedCount}, failedCount: ${failedCount}");
            System.exit(0);
        }
    }
    

    线程开到80个,QPS可达2600左右。

    completedCount: 26089, failedCount: 111
    

    HttpAsyncClients异步

    使用HttpAsyncClients发起异步请求。

    import org.apache.http.HttpResponse;
    import org.apache.http.client.methods.HttpGet;
    import org.apache.http.concurrent.FutureCallback;
    import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
    import org.apache.http.impl.nio.client.HttpAsyncClients;
    import org.junit.Test;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class TestHttpAsyncClient implements Runnable {
    
       public final static AtomicInteger completedCount = new AtomicInteger(0);
       public final static AtomicInteger failedCount = new AtomicInteger(0);
       public final static AtomicInteger canceledCount = new AtomicInteger(0);
    
       @Test
       public void test() {
    
           //启动线程
           TestHttpAsyncClient testHttpAsyncClient = new TestHttpAsyncClient();
           new Thread(testHttpAsyncClient).start();
    
           //测试10s
           try {
    
               Thread.sleep(10 * 1000);
               System.out.println(String.format("completedCount: %d, failedCount: %d, canceledCount: %d",
                       completedCount.get(), failedCount.get(), canceledCount.get()));
    
               System.exit(0);
           } catch (Exception e) {
    
               System.err.println(String.format("System.exit exception: %s", e));
           }
       }
    
       public void run() {
    
           CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom()
                   .setMaxConnTotal(128)
                   .setMaxConnPerRoute(128)
                   .build();
           httpclient.start();
    
           final HttpGet request = new HttpGet("https://www.baidu.com");
    
           while (true) {
    
               httpclient.execute(request, new FutureCallback<HttpResponse>() {
    
                   @Override
                   public void completed(HttpResponse httpResponse) {
    
                       if (httpResponse.getStatusLine().getStatusCode() == 200) {
                           completedCount.incrementAndGet();
                       }else {
                           failedCount.incrementAndGet();
                     }
                   }
    
                   @Override
                   public void failed(Exception e) {
                       failedCount.incrementAndGet();
                   }
    
                   @Override
                   public void cancelled() {
                       canceledCount.incrementAndGet();
                   }
               });
           }
       }
    }
    

    并发设置为128,QPS可以达到2000左右。

    completedCount: 19319, failedCount: 125, canceledCount: 0
    

    通过调试可以发现,只需要4个dispatcher线程,就可以满足需要。

    image.png

    Kotlin协程+khttp

    使用Kotlin Coroutine+khttp同步请求测试。

    import khttp.responses.Response
    import kotlinx.coroutines.Deferred
    import kotlinx.coroutines.GlobalScope
    import kotlinx.coroutines.async
    import kotlinx.coroutines.launch
    import java.util.concurrent.atomic.AtomicInteger
    import org.junit.Test
    
    public class TestKotlinCoroutine {
    
        val api = "https://www.baidu.com"
        val completedCount = AtomicInteger(0)
        val failedCount = AtomicInteger(0)
    
        @Test
        fun test() {
    
            Thread(){
    
                while (true) {
    
                    val resp =
                            GlobalScope.launch {
    
                                val result: Deferred<Response> = async {
    
                                    get()
                                }
    
                                result.await()
                            }
                }
            }.start()
    
            Thread.sleep(10 * 1000)
            println("completedCount: ${completedCount}, failedCount: ${failedCount}");
            System.exit(0);
        }
    
        suspend fun get(): khttp.responses.Response {
    
            var response = khttp.get(api)
    
            if (response.statusCode == 200) {
                completedCount.addAndGet(1)
            }else {
                failedCount.addAndGet(1)
            }
    
            return response;
        }
    }
    

    协程版本的性能甚至不如同步版本,QPS只有50。换一种方式,起多个协程,每个线程里面循环请求,QPS也一样只有50左右。

    completedCount: 498, failedCount: 0
    

    这个版本性能很差,通过调试可以发现,最多也就启动了4个worker线程,并且khttp又不支持协程阻塞。

    image.png

    下面是一个优化的版本,将工作放到Dispatchers.IO里面去做,尽量多起一些worker线程。

    public class TestKotlinCoroutineClient {
    
        val api = "https://www.baidu.com"
    
        val completedCount = AtomicInteger(0)
        var failedCount = AtomicInteger(0)
        val time = 10000
    
        @Test
        fun test() = runBlocking {
    
            val start = System.currentTimeMillis()
            val channel = Channel<Int>()
    
            repeat(time) {
                launch { channel.send(get()) }
            }
    
            repeat(time) {
                val code = channel.receive()
                if (code == 200) {
                    completedCount.incrementAndGet()
                } else {
                    failedCount.incrementAndGet()
                }
            }
    
            val end = System.currentTimeMillis()
            println("completedCount: ${completedCount}, failedCount: ${failedCount}, cost${(end - start) / 1000}");
            System.exit(0);
        }
    
        suspend fun get() = withContext(Dispatchers.IO) { khttp.get(api) }.statusCode
    }
    

    性能可以提升到1200左右。调试可以发现线程数量最多可达68个。

    image.png

    Kotlin协程+kohttp

    kohttp看起来支持协程。使用kohttp请求一万次,花费时间100s左右,QPS为100。

    import io.github.rybalkinsd.kohttp.ext.asyncHttpGet
    import junit.framework.Assert.assertEquals
    import kotlinx.coroutines.runBlocking
    import org.junit.Test
    import kotlin.system.measureTimeMillis
    
    public class TestKotlinCoroutine {
    
        val api = "https://www.baidu.com"
    
        @Test
        fun `many async invokes of httpGet`() {
            measureTimeMillis {
                GlobalScope.launch {
                    val tasks = List(10000) {
                        api.asyncHttpGet()
                    }
                    tasks.map { r ->
                        r.await().also { it.close() }
                    }.forEach {
                        assertEquals(200, it.code())
                    }
                }
            }.also { println("$it ms") }
        }
    }
    

    分析一下kohttp的实现,可以发现它只是封装了okhttp3,使用了异步模式。okhttp3本身不支持协程,所以性能也不会太好。

    fun String.asyncHttpGet(client: Call.Factory = defaultHttpClient): Deferred<Response> =
        GlobalScope.async(context = Unconfined) {
            client.suspendCall(Request.Builder().url(this@asyncHttpGet).build())
        }
    
    internal suspend fun Call.Factory.suspendCall(request: Request): Response =
        suspendCoroutine { cont ->
            newCall(request).enqueue(object : Callback {
                override fun onResponse(call: Call, response: Response) {
                    cont.resume(response)
                }
    
                override fun onFailure(call: Call, e: IOException) {
                    cont.resumeWithException(e)
                }
            })
        }
    

    Go协程

    顺便试一下Go协程的性能。Go协程版本在使用keepalive的情况下,256个协程的QPS可以达到1700左右。注意需要读一下resp.Body,这样keepalive才会生效。参考文章:A brief intro of TCP keep-alive in Go’s HTTP implementation

    package main
    
    import (
        "fmt"
        "io"
        "io/ioutil"
        "net/http"
        "os"
        "runtime"
        "sync"
        "sync/atomic"
        "time"
    )
    
    var completedCount uint64
    var failedCount uint64
    var lock = &sync.Mutex{}
    
    func main() {
    
        runtime.GOMAXPROCS(4)
        for i := 0; i < 256; i++ {
            clt := newQPSClient()
            go clt.test()
        }
    
        time.Sleep(10 * time.Second)
        fmt.Printf("completedCount: %d, failedCount: %d\n", completedCount, failedCount)
        os.Exit(0)
    }
    
    type QPSClient struct {
        clt *http.Client
    }
    
    func newQPSClient() *QPSClient {
        return &QPSClient{
            clt: &http.Client{},
        }
    }
    
    func (qc *QPSClient) test() {
    
        for {
            resp, err := qc.clt.Get("https://www.baidu.com")
            if err == nil && (resp.StatusCode == 200) {
    
                _, err = io.Copy(ioutil.Discard, resp.Body)
                resp.Body.Close()
                atomic.AddUint64(&completedCount, 1)
            } else {
    
                _, err = io.Copy(ioutil.Discard, resp.Body)
                atomic.AddUint64(&failedCount, 1)
            }
        }
    }
    

    结论

    请求方式 QPS
    Kotlin多线程同步 80个线程,QPS 2600
    HttpAsyncClients异步 最大128个连接,QPS 2000
    Kotlin协程+khttp 循环起协程+khttp同步,QPS 50
    Kotlin协程+kohttp 循环起协程+okhttp3异步,QPS 100
    Go协程 256个协程,QPS 1700

    上面这些测试,缺乏更精细的设置,比如HttpAsyncClients 128个连接使用了多少个线程?Kotlin和Go的协程版本使用了多少线程?还有没有提升空间?

    Coroutine运行有很多受限条件,不能堵塞在操作系统会堵塞线程的地方,需要自己实现对这些堵塞API的处理,在用户态做上下文的保存和恢复。Java生态圈中缺乏对协程支持良好的基础库,导致不能发挥协程真正的威力,使用异步IO是更好的解决办法。

    相关文章

      网友评论

          本文标题:利用协程提高客户端并发性能的尝试

          本文链接:https://www.haomeiwen.com/subject/bjsakqtx.html