美文网首页
JDK11 WebSocket API (HTTP)

JDK11 WebSocket API (HTTP)

作者: lF_IF | 来源:发表于2018-12-09 13:38 被阅读0次

jdk11 发布也已经两个多月,作为jdk8以后的首个LTS版本,抽空简单看了一下。其中java.net.http包还是比较好用的(jdk9 还处于incubation状态,jdk11正式发布),jdk的网络连接部分算是迎来了一次重大的更新。

基本的API都被包括到了HttpClient,HttpRequset之中了,提供了诸多静态方法,build实例的方法,链式调用,原生异步请求支持,与老的http请求调用方法相比,使用上非常方便了,同时增加了websocket的原生支持,再也不用使用第三方包来构建websocket client了。

HTTP 接口使用起来非常简单,基本上看下接口名称和相关描述,即可上手。本文就直接以使用jdk11原生的http包来构建一个ws client 去连接网站的websocket服务。

1 构建HttpClient实例的方法

1.1 调用静态方法newHttpClient()

var httpClient = HttpClient.newHttpClient();

1.2 如果需要添加相关参数(header, proxy, auth等),可使用builder来构建

var httpClient = HttpClient.newBuilder()
                .authenticator(Authenticator.getDefault()) //auth setting
                .proxy(ProxySelector.getDefault()) //proxy setting
                .connectTimeout(Duration.ofMinutes(1)) // connect timeout setting
                .followRedirects(HttpClient.Redirect.NORMAL) //redirect policy setting 
                .cookieHandler(CookieHandler.getDefault()) // cookie handler setting
                .build();

HttpClient构建好后,如果是http请求,则构建HttpRequest,如果是websocket请求则构建Websocket客户端。

2 构建WebSocket

同样的套路:

var wsCompletableFuture = httpClient.newWebSocketBuilder()
                        .buildAsync(URI.create(url), new WebSocketListener());
var wsClient = wsCompletableFuture.join();

且看一下WebSocket接口提供的方法,api看起来一目了然。

CompletableFuture<WebSocket> sendText(CharSequence data, boolean last);
CompletableFuture<WebSocket> sendBinary(ByteBuffer data, boolean last);
CompletableFuture<WebSocket> sendPing(ByteBuffer message);
CompletableFuture<WebSocket> sendPong(ByteBuffer message);
CompletableFuture<WebSocket> sendClose(int statusCode, String reason);
void request(long n);
boolean isOutputClosed();
boolean isInputClosed();
void abort();

实现websocket接收消息的接口 WebSocket.Listener
接口定义如下:

interface Listener {
        default void onOpen(WebSocket webSocket) { webSocket.request(1); }
        default CompletionStage<?> onText(WebSocket webSocket,CharSequence data,boolean last) {
            webSocket.request(1); //Increments the counter of invocations of receive methods
            return null;
        }
        default CompletionStage<?> onBinary(WebSocket webSocket,ByteBuffer data,boolean last) {
            webSocket.request(1);
            return null;
        }
        default CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) {
            webSocket.request(1);
            return null;
        }
        default CompletionStage<?> onPong(WebSocket webSocket,ByteBuffer message) {
            webSocket.request(1);
            return null;
        }
        default CompletionStage<?> onClose(WebSocket webSocket,int statusCode,
                                           String reason) {
            return null;
        }
        default void onError(WebSocket webSocket, Throwable error) { }
    }

接口方法均提供了默认实现(jdk8),因此只需要去实现需要自定义处理的接口方法即可。
以下是本文中用于连接Huobi网站的websocket的客户端,测试使用,因此所有接口方法均实现了:

private class WebSocketListener implements WebSocket.Listener {

        @Override
        public void onOpen(WebSocket webSocket) {
            System.out.println("websocket opened.");
            webSocket.request(1);
        }

        @Override
        public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
            webSocket.request(1);
            textParts.add(data);
            if (last) {
                var content = String.join("", textParts);
                System.out.println(content);
                textParts.clear();
            }
            return null;
        }
        @Override
        public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean last) {
            webSocket.request(1);
            binaryParts.add(data);
            if (last) {
                int size = 0;
                for (ByteBuffer binaryPart : binaryParts) {
                    size+=binaryPart.array().length;
                }
                var allocate = ByteBuffer.allocate(size);
                binaryParts.forEach(allocate::put);
                binaryParts.clear();
                var content = uncompress(allocate.array());
                System.out.println(new String(content));
            }
            return null;
        }
        @Override
        public CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) {
            webSocket.request(1);
            System.out.println("ping");
            System.out.println(message.asCharBuffer().toString());
            return null;
        }
        @Override
        public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) {
            webSocket.request(1);
            System.out.println("pong");
            return null;
        }
        @Override
        public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
            System.out.println("closed, status(" + statusCode + "), cause:"+reason);
            webSocket.sendClose(statusCode, reason);
            return null;
        }
        @Override
        public void onError(WebSocket webSocket, Throwable error) {
            System.out.println("error: " + error.getLocalizedMessage());
            webSocket.abort();
        }

    }

使用中需要注意的是:

  • 接收函数中的参数boolean last用于判断当次数据是否接收完毕,如果数据还没接收完毕,则不能进行处理,因此需要做保存,最终接收完毕的时候再做处理。(详细代码见下)
  • void request(long n);用于统计接收了多少次数据,不需要的话,可忽略该方法。

以下贴出基于jdk11的Huobi网站的websocket客户端实现代码:

package me.study;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.*;
import java.util.zip.GZIPInputStream;

public class HuobiWebSocket {

    private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); // executor for scheduled task
    private List<CharSequence> textParts = new ArrayList<>(16); //cache text data
    private List<ByteBuffer> binaryParts = new ArrayList<>(16); //cache binary data
    private String url;
    private WebSocket webSocket;

    public HuobiWebSocket(String url) {
        this.url = url;
        var wsCompletableFuture =
                HttpClient.newHttpClient().newWebSocketBuilder()
                .buildAsync(URI.create(url), new WebSocketListener());
        webSocket = wsCompletableFuture.join();
        executor.scheduleAtFixedRate(()->{
            // send ping msg  to keep alive every 5 second
            if (webSocket!=null && !webSocket.isOutputClosed()) {
                System.out.println("send ping");
                var objectMapper = new ObjectMapper(); //jackson
                var map = new HashMap<String,Object>(1);
                map.put("ping",System.currentTimeMillis());
                try {
                    webSocket.sendText(objectMapper.writeValueAsString(map),true);
                } catch (JsonProcessingException ignore) {
                }
            }
        },5,5,TimeUnit.SECONDS);
    }

    private class WebSocketListener implements WebSocket.Listener {

        @Override
        public void onOpen(WebSocket webSocket) {
            System.out.println("websocket opened.");
            webSocket.request(1);
        }

        @Override
        public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
            webSocket.request(1);
            textParts.add(data);
            if (last) {
                String content = String.join("", textParts);
                System.out.println(content);
                textParts.clear();
            }
            return null;
        }
        @Override
        public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean last) {
            webSocket.request(1);
            binaryParts.add(data);
            if (last) {
                int size = 0; 
                for (var binaryPart : binaryParts) {
                    size+=binaryPart.array().length;
                }
                var allocate = ByteBuffer.allocate(size);
                binaryParts.forEach(allocate::put);
                binaryParts.clear();
                var content = uncompress(allocate.array());
                System.out.println(new String(content));
            }
            return null;
        }
        @Override
        public CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) {
            webSocket.request(1);
            System.out.println("ping");
            System.out.println(message.asCharBuffer().toString());
            return null;
        }
        @Override
        public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) {
            webSocket.request(1);
            System.out.println("pong");
            return null;
        }
        @Override
        public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
            System.out.println("ws closed with status(" + statusCode + "). cause:"+reason);
            webSocket.sendClose(statusCode, reason);
            return null;
        }
        @Override
        public void onError(WebSocket webSocket, Throwable error) {
            System.out.println("error: " + error.getLocalizedMessage());
            webSocket.abort();
        }

    }

    private byte[] uncompress(byte[] bytes) {
      // ...
    }

    public String getUrl() {
        return url;
    }

    public WebSocket getWebSocket() {
        return webSocket;
    }

     public static void main(String[] args) throws JsonProcessingException {
        var huobiWebSocket = new HuobiWebSocket("wss://api.huobi.br.com/ws");
        var map = new HashMap<String, Object>();
        map.put("sub","market.btcusdt.kline.1min");
        var objectMapper = new ObjectMapper();
        huobiWebSocket.getWebSocket().sendText(objectMapper.writeValueAsString(map),true);
    }
}

其中byte[] uncompress(byte[] bytes)函数用来解压二进制数据,这个要根据不同的ws server数据格式而具体对待。
使用起来有一点不方便的地方就是,如果接收到的是大报文,则需要进行缓存,根据boolean last参数判断是否接收完毕。因此,可以进一步进行封装,隐藏last参数,达到上层只关心最终的接收完毕的数据结果。

相关文章

网友评论

      本文标题:JDK11 WebSocket API (HTTP)

      本文链接:https://www.haomeiwen.com/subject/twtzcqtx.html