jdk11 发布也已经两个多月,作为jdk8以后的首个LTS版本,抽空简单看了一下。其中java.net.http包还是比较好用的(jdk9 还处于incubation状态,jdk11正式发布),jdk的网络连接部分算是迎来了一次重大的更新。
基本的API都被包括到了HttpClient,HttpRequset之中了,提供了诸多静态方法,build实例的方法,链式调用,原生异步请求支持,与老的http请求调用方法相比,使用上非常方便了,同时增加了websocket的原生支持,再也不用使用第三方包来构建websocket client了。
HTTP 接口使用起来非常简单,基本上看下接口名称和相关描述,即可上手。本文就直接以使用jdk11原生的http包来构建一个ws client 去连接网站的websocket服务。
1 构建HttpClient实例的方法
1.1 调用静态方法newHttpClient()
var httpClient = HttpClient.newHttpClient();
1.2 如果需要添加相关参数(header, proxy, auth等),可使用builder来构建
var httpClient = HttpClient.newBuilder()
.authenticator(Authenticator.getDefault()) //auth setting
.proxy(ProxySelector.getDefault()) //proxy setting
.connectTimeout(Duration.ofMinutes(1)) // connect timeout setting
.followRedirects(HttpClient.Redirect.NORMAL) //redirect policy setting
.cookieHandler(CookieHandler.getDefault()) // cookie handler setting
.build();
HttpClient构建好后,如果是http请求,则构建HttpRequest,如果是websocket请求则构建Websocket客户端。
2 构建WebSocket
同样的套路:
var wsCompletableFuture = httpClient.newWebSocketBuilder()
.buildAsync(URI.create(url), new WebSocketListener());
var wsClient = wsCompletableFuture.join();
且看一下WebSocket
接口提供的方法,api看起来一目了然。
CompletableFuture<WebSocket> sendText(CharSequence data, boolean last);
CompletableFuture<WebSocket> sendBinary(ByteBuffer data, boolean last);
CompletableFuture<WebSocket> sendPing(ByteBuffer message);
CompletableFuture<WebSocket> sendPong(ByteBuffer message);
CompletableFuture<WebSocket> sendClose(int statusCode, String reason);
void request(long n);
boolean isOutputClosed();
boolean isInputClosed();
void abort();
实现websocket接收消息的接口 WebSocket.Listener
:
接口定义如下:
interface Listener {
default void onOpen(WebSocket webSocket) { webSocket.request(1); }
default CompletionStage<?> onText(WebSocket webSocket,CharSequence data,boolean last) {
webSocket.request(1); //Increments the counter of invocations of receive methods
return null;
}
default CompletionStage<?> onBinary(WebSocket webSocket,ByteBuffer data,boolean last) {
webSocket.request(1);
return null;
}
default CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) {
webSocket.request(1);
return null;
}
default CompletionStage<?> onPong(WebSocket webSocket,ByteBuffer message) {
webSocket.request(1);
return null;
}
default CompletionStage<?> onClose(WebSocket webSocket,int statusCode,
String reason) {
return null;
}
default void onError(WebSocket webSocket, Throwable error) { }
}
接口方法均提供了默认实现(jdk8),因此只需要去实现需要自定义处理的接口方法即可。
以下是本文中用于连接Huobi网站的websocket的客户端,测试使用,因此所有接口方法均实现了:
private class WebSocketListener implements WebSocket.Listener {
@Override
public void onOpen(WebSocket webSocket) {
System.out.println("websocket opened.");
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
webSocket.request(1);
textParts.add(data);
if (last) {
var content = String.join("", textParts);
System.out.println(content);
textParts.clear();
}
return null;
}
@Override
public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean last) {
webSocket.request(1);
binaryParts.add(data);
if (last) {
int size = 0;
for (ByteBuffer binaryPart : binaryParts) {
size+=binaryPart.array().length;
}
var allocate = ByteBuffer.allocate(size);
binaryParts.forEach(allocate::put);
binaryParts.clear();
var content = uncompress(allocate.array());
System.out.println(new String(content));
}
return null;
}
@Override
public CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) {
webSocket.request(1);
System.out.println("ping");
System.out.println(message.asCharBuffer().toString());
return null;
}
@Override
public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) {
webSocket.request(1);
System.out.println("pong");
return null;
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
System.out.println("closed, status(" + statusCode + "), cause:"+reason);
webSocket.sendClose(statusCode, reason);
return null;
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
System.out.println("error: " + error.getLocalizedMessage());
webSocket.abort();
}
}
使用中需要注意的是:
- 接收函数中的参数
boolean last
用于判断当次数据是否接收完毕,如果数据还没接收完毕,则不能进行处理,因此需要做保存,最终接收完毕的时候再做处理。(详细代码见下) -
void request(long n);
用于统计接收了多少次数据,不需要的话,可忽略该方法。
以下贴出基于jdk11的Huobi网站的websocket客户端实现代码:
package me.study;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.*;
import java.util.zip.GZIPInputStream;
public class HuobiWebSocket {
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); // executor for scheduled task
private List<CharSequence> textParts = new ArrayList<>(16); //cache text data
private List<ByteBuffer> binaryParts = new ArrayList<>(16); //cache binary data
private String url;
private WebSocket webSocket;
public HuobiWebSocket(String url) {
this.url = url;
var wsCompletableFuture =
HttpClient.newHttpClient().newWebSocketBuilder()
.buildAsync(URI.create(url), new WebSocketListener());
webSocket = wsCompletableFuture.join();
executor.scheduleAtFixedRate(()->{
// send ping msg to keep alive every 5 second
if (webSocket!=null && !webSocket.isOutputClosed()) {
System.out.println("send ping");
var objectMapper = new ObjectMapper(); //jackson
var map = new HashMap<String,Object>(1);
map.put("ping",System.currentTimeMillis());
try {
webSocket.sendText(objectMapper.writeValueAsString(map),true);
} catch (JsonProcessingException ignore) {
}
}
},5,5,TimeUnit.SECONDS);
}
private class WebSocketListener implements WebSocket.Listener {
@Override
public void onOpen(WebSocket webSocket) {
System.out.println("websocket opened.");
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
webSocket.request(1);
textParts.add(data);
if (last) {
String content = String.join("", textParts);
System.out.println(content);
textParts.clear();
}
return null;
}
@Override
public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean last) {
webSocket.request(1);
binaryParts.add(data);
if (last) {
int size = 0;
for (var binaryPart : binaryParts) {
size+=binaryPart.array().length;
}
var allocate = ByteBuffer.allocate(size);
binaryParts.forEach(allocate::put);
binaryParts.clear();
var content = uncompress(allocate.array());
System.out.println(new String(content));
}
return null;
}
@Override
public CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) {
webSocket.request(1);
System.out.println("ping");
System.out.println(message.asCharBuffer().toString());
return null;
}
@Override
public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) {
webSocket.request(1);
System.out.println("pong");
return null;
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
System.out.println("ws closed with status(" + statusCode + "). cause:"+reason);
webSocket.sendClose(statusCode, reason);
return null;
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
System.out.println("error: " + error.getLocalizedMessage());
webSocket.abort();
}
}
private byte[] uncompress(byte[] bytes) {
// ...
}
public String getUrl() {
return url;
}
public WebSocket getWebSocket() {
return webSocket;
}
public static void main(String[] args) throws JsonProcessingException {
var huobiWebSocket = new HuobiWebSocket("wss://api.huobi.br.com/ws");
var map = new HashMap<String, Object>();
map.put("sub","market.btcusdt.kline.1min");
var objectMapper = new ObjectMapper();
huobiWebSocket.getWebSocket().sendText(objectMapper.writeValueAsString(map),true);
}
}
其中byte[] uncompress(byte[] bytes)
函数用来解压二进制数据,这个要根据不同的ws server数据格式而具体对待。
使用起来有一点不方便的地方就是,如果接收到的是大报文,则需要进行缓存,根据boolean last
参数判断是否接收完毕。因此,可以进一步进行封装,隐藏last参数,达到上层只关心最终的接收完毕的数据结果。
网友评论