美文网首页
尝试自己手写网络请求框架,抗并发、重连、可扩展

尝试自己手写网络请求框架,抗并发、重连、可扩展

作者: 程序员阿兵 | 来源:发表于2019-05-23 11:40 被阅读0次

背景:

客户端网络请求是比不可少的一部分,比较成熟的有okhttp 已经retrofit,用起来也是无比丝滑,一行代码搞定网络请求。通过剖析okhttp的源码,里面的封装真的很完美,线程池和消息队列的调度,OKHttp中阶之拦截器及调用链以及其缓存机制。OKHttp中阶之缓存机制

自己撸:

首先我们网络请求肯定得考虑并发情况,每次请求调度一个任务开启线程,线程的开销很大,这样会使代码显得臃肿,资源浪费。所以得采用线程池管理线程,每次执行的请求任务都加入线程池,依次排队,执行请求。当这些线程都加入线程池后,得采用一个管理线程承担信使的作用,去线程池获取当前的排队任务依次执行。

public class ThreadPoolManager {

    // 1 创建队列,用来保存异步请求任务
    private LinkedBlockingDeque<Runnable> mQueue = new LinkedBlockingDeque<>();

    // 2 添加异步任务到队列中
    public void addTask(Runnable runnable){
        if(runnable != null){
            try {
                mQueue.put(runnable);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    // 3 创建线程池
    private ThreadPoolExecutor mThreadPoolExecutor;

    private ThreadPoolManager(){
        mThreadPoolExecutor = new ThreadPoolExecutor(3, 10, 15, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(4), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // 处理抛出来的任务
                addTask(r);
            }
        });
        mThreadPoolExecutor.execute(communicateThread);
        mThreadPoolExecutor.execute(delayThread);
    }

    // 创建 队列与线程池的"交互"线程
                public Runnable communicateThread = new Runnable() {
                    @Override
                    public void run() {
                        Runnable ruun = null;
                        while (true){
                            try {
                                ruun = mQueue.take();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                mThreadPoolExecutor.execute(ruun);
            }
        }
    };

    private static ThreadPoolManager threadPoolManager = new ThreadPoolManager();

    public static ThreadPoolManager getInstance(){
        return threadPoolManager;
    }

}

封装请求类:

请求需要请求的地址,请求的响应回调,请求的定义格式

public interface IHttpRequest {

    // 封装请求接口
    void setUrl(String url);
    void setData(byte[] data);
    void setListener(CallbackListener callbackListener);
    void execute();

}

请求服务端,把传的请求地址参数传人服务端,返回请求回调:

public class JsonHttpRequest implements IHttpRequest{

    private String url;
    private byte[] data;
    private CallbackListener mCallbackListener;
    private HttpURLConnection urlConnection;

    @Override
    public void setUrl(String url) {
        this.url = url;
    }

    @Override
    public void setData(byte[] data) {
        this.data = data;
    }

    @Override
    public void setListener(CallbackListener callbackListener) {
        this.mCallbackListener = callbackListener;
    }

    @Override
    public void execute() {
        URL url = null;
        try {
            url = new URL(this.url);
            urlConnection = (HttpURLConnection) url.openConnection();//打开http连接
            urlConnection.setConnectTimeout(6000);//连接的超时时间
            urlConnection.setUseCaches(false);//不使用缓存
            urlConnection.setInstanceFollowRedirects(true);//是成员函数,仅作用于当前函数,设置这个连接是否可以被重定向
            urlConnection.setReadTimeout(3000);//响应的超时时间
            urlConnection.setDoInput(true);//设置这个连接是否可以写入数据
            urlConnection.setDoOutput(true);//设置这个连接是否可以输出数据
            urlConnection.setRequestMethod("POST");//设置请求的方式
            urlConnection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");//设置消息的类型
            urlConnection.connect();// 连接,从上述至此的配置必须要在connect之前完成,实际上它只是建立了一个与服务器的TCP连接
            //-------------使用字节流发送数据--------------
            OutputStream out = urlConnection.getOutputStream();
            BufferedOutputStream bos = new BufferedOutputStream(out);//缓冲字节流包装字节流
            bos.write(data);//把这个字节数组的数据写入缓冲区中
            bos.flush();//刷新缓冲区,发送数据
            out.close();
            bos.close();
            //------------字符流写入数据------------
            if (urlConnection.getResponseCode() == HttpURLConnection.HTTP_OK) {//得到服务端的返回码是否连接成功
                InputStream in = urlConnection.getInputStream();
                mCallbackListener.onSuccess(in);
            }else{
                // 访问失败,重试
                throw new RuntimeException("请求失败");
            }
    }catch (Exception e){
            e.printStackTrace();
            throw new RuntimeException("请求失败");
        }finally{
            urlConnection.disconnect();
        }
    }
}

封装当前请求的任务:

public class HttpTask<T> implements Runnable, Delayed {

    private IHttpRequest mIHttpRequest;

    public HttpTask(T requestData,String url,IHttpRequest httpRequest,CallbackListener callbackListener){
        this.mIHttpRequest = httpRequest;
        httpRequest.setUrl(url);
        httpRequest.setListener(callbackListener);
        String content = JSON.toJSONString(requestData);
        try {
            httpRequest.setData(content.getBytes("utf-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        try{
            mIHttpRequest.execute();
        }catch (Exception e){
            // 将失败的任务添加到重试队列中
            ThreadPoolManager.getInstance().addDelayTask(this);
        }

    }

    private long delayTime;
    private int retryCount;

    public int getRetryCount(){
        return retryCount;
    }

    public void setRetryCount(int retryCount) {
        this.retryCount = retryCount;
    }

    public long getDelayTime() {
        return delayTime;
    }

    public void setDelayTime(long delayTime) {
        // 设置延迟时间  3000
        this.delayTime = System.currentTimeMillis()+delayTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.delayTime - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return 0;
    }
}

将请求的任务加入队列:封装成通一入口

public class OkHttp {

    public static<T,M> void sendJsonRequest(T requestData, String url,
                                            Class<M> response, IJsonDataListener listener){
        IHttpRequest httpRequest = new JsonHttpRequest();
        CallbackListener callbackListener = new JsonCallbackListener<>(response,listener);
        HttpTask httpTask = new HttpTask(requestData,url,httpRequest,callbackListener);
        ThreadPoolManager.getInstance().addTask(httpTask);
    }
    
}

最终就可调用:

public class MainActivity extends AppCompatActivity {

//    private String url = "http://v.juhe.cn/historyWeather/citys?province_id=2&key=bb52107206585ab074f5e59a8c73875b";
    private String url = "xxxx";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        NeOkHttp.sendJsonRequest(null, url, ResponseBean.class, new IJsonDataListener<ResponseBean>() {
            @Override
            public void onSuccess(ResponseBean rb) {
                Log.e("===> ",rb.toString());
            }

            @Override
            public void onFailure() {

            }
        });

    }

}

当请求失败我们得实现重试机制,当前请求的任务失败时,我们将其保存在一个新的队列中,采用延时对列去逐一请求:DelayQueue

在上面的任务管理中加入延时队列:设置延时时差

    public void addDelayTask(HttpTask ht){
        if(ht != null){
            ht.setDelayTime(3000);
            mDelayQueue.offer(ht);
        }
    }

public class ThreadPoolManager {

    // 1 创建队列,用来保存异步请求任务
    private LinkedBlockingDeque<Runnable> mQueue = new LinkedBlockingDeque<>();

    // 2 添加异步任务到队列中
    public void addTask(Runnable runnable){
        if(runnable != null){
            try {
                mQueue.put(runnable);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    // 创建延迟队列
    private DelayQueue<HttpTask> mDelayQueue = new DelayQueue<>();

    public void addDelayTask(HttpTask ht){
        if(ht != null){
            ht.setDelayTime(3000);
            mDelayQueue.offer(ht);
        }
    }

    public Runnable delayThread = new Runnable() {
        @Override
        public void run() {
            HttpTask ht = null;
            while (true){
                try {
                    ht = mDelayQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if(ht.getRetryCount() < 3){
                    mThreadPoolExecutor.execute(ht);
                    ht.setRetryCount(ht.getRetryCount()+1);
                    Log.e("=== 重试机制 ===",ht.getRetryCount() + "");
                }else{
                    Log.e("=== 重试机制 ===","执行次数超限,放弃");
                }
            }
        }
    };

    // 3 创建线程池
    private ThreadPoolExecutor mThreadPoolExecutor;

    private ThreadPoolManager(){
        mThreadPoolExecutor = new ThreadPoolExecutor(3, 10, 15, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(4), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // 处理抛出来的任务
                addTask(r);
            }
        });
        mThreadPoolExecutor.execute(communicateThread);
        mThreadPoolExecutor.execute(delayThread);
    }

    // 创建 队列与线程池的"交互"线程
                public Runnable communicateThread = new Runnable() {
                    @Override
                    public void run() {
                        Runnable ruun = null;
                        while (true){
                            try {
                                ruun = mQueue.take();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                mThreadPoolExecutor.execute(ruun);
            }
        }
    };

    private static ThreadPoolManager threadPoolManager = new ThreadPoolManager();

    public static ThreadPoolManager getInstance(){
        return threadPoolManager;
    }

}

在🔝请求失败中加入延时队列:

public class HttpTask<T> implements Runnable, Delayed {

    private IHttpRequest mIHttpRequest;

    public HttpTask(T requestData,String url,IHttpRequest httpRequest,CallbackListener callbackListener){
        this.mIHttpRequest = httpRequest;
        httpRequest.setUrl(url);
        httpRequest.setListener(callbackListener);
        String content = JSON.toJSONString(requestData);
        try {
            httpRequest.setData(content.getBytes("utf-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        try{
            mIHttpRequest.execute();
        }catch (Exception e){
            // 将失败的任务添加到重试队列中
            ThreadPoolManager.getInstance().addDelayTask(this);
        }

    }

    private long delayTime;
    private int retryCount;

    public int getRetryCount(){
        return retryCount;
    }

    public void setRetryCount(int retryCount) {
        this.retryCount = retryCount;
    }

    public long getDelayTime() {
        return delayTime;
    }

    public void setDelayTime(long delayTime) {
        // 设置延迟时间  3000
        this.delayTime = System.currentTimeMillis()+delayTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.delayTime - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return 0;
    }
}

相关文章

网友评论

      本文标题:尝试自己手写网络请求框架,抗并发、重连、可扩展

      本文链接:https://www.haomeiwen.com/subject/zjzmzqtx.html