实验环境
- JDK:
adopt-openjdk-1.8.0_292
- SpringBoot:
2.5.3
- SpringCloud:
2020.0.3
准备条件
(1)Maven 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
(2)接收请求的 DemoController
@RestController
@Slf4j
public class DemoController {
@GetMapping("/test")
public String testGet(HttpServletRequest request) {
log.info("remote ip:port = {}:{}", request.getRemoteHost(), request.getRemotePort());
return "testGet";
}
@PostMapping("/test")
public String testPost(HttpServletRequest request) throws IOException {
log.info("remote ip:port = {}:{}", request.getRemoteHost(), request.getRemotePort());
// org.apache.commons.io.IOUtils
List<String> lines = IOUtils.readLines(request.getInputStream());
lines.forEach(System.out::println);
return "testPost";
}
}
实验
(一)调用 Get 请求
@Slf4j
public class DemoTest {
public static void main(String[] args) throws Exception {
String requestUrl = "http://127.0.0.1:8080/test";
testGet(requestUrl);
}
@SneakyThrows
public static void testGet(String requestUrl) {
final URL url = new URL(requestUrl);
final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
log.info("sleep begin");
Thread.sleep(3 * 1000);
log.info("sleep end");
// 建立 socket 连接,并发送 http 请求
List<String> lines = IOUtils.readLines(connection.getInputStream());
lines.forEach(System.out::println);
}
客户端控制台输出:
11:19:39.047 [main] INFO com.example.demo.DemoTest - sleep begin
11:19:42.047 [main] INFO com.example.demo.DemoTest - sleep end
testGet
服务端控制台输出:
2021-07-26 11:19:42.047 INFO 11832 --- [nio-8080-exec-3] com.example.demo.DemoController : remote ip:port = 127.0.0.1:7091
Wireshark 网络抓包显示:
Wireshark 抓包
可以看到,在调用 connection.getInputStream()
后才建立 socket 连接,并发送 http 请求。
(二)调用 Post 请求
@Slf4j
public class DemoTest {
public static void main(String[] args) throws Exception {
String requestUrl = "http://127.0.0.1:8080/test";
testPost(requestUrl);
}
@SneakyThrows
public static void testPost(String requestUrl) {
final URL url = new URL(requestUrl);
final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
log.info("getOutputStream() sleep begin");
Thread.sleep(3 * 1000);
log.info("getOutputStream() sleep end");
String text = "你好啊";
connection.setDoOutput(true);
// 建立 socket 连接
OutputStream outputStream = connection.getOutputStream();
log.info("write() sleep begin");
Thread.sleep(3 * 1000);
log.info("write() sleep end");
outputStream.write(text.getBytes(StandardCharsets.UTF_8));
log.info("getInputStream sleep begin");
Thread.sleep(3 * 1000);
log.info("getInputStream sleep end");
// 发送 http 请求
List<String> lines = IOUtils.readLines(connection.getInputStream());
lines.forEach(System.out::println);
}
}
客户端控制台输出:
11:26:19.815 [main] INFO com.example.demo.DemoTest - getOutputStream() sleep begin
11:26:22.830 [main] INFO com.example.demo.DemoTest - getOutputStream() sleep end
11:26:22.832 [main] INFO com.example.demo.DemoTest - write() sleep begin
11:26:25.840 [main] INFO com.example.demo.DemoTest - write() sleep end
11:26:25.840 [main] INFO com.example.demo.DemoTest - getInputStream sleep begin
11:26:28.847 [main] INFO com.example.demo.DemoTest - getInputStream sleep end
testPost
服务端控制台输出:
2021-07-26 11:26:28.848 INFO 11832 --- [nio-8080-exec-7] com.example.demo.DemoController : remote ip:port = 127.0.0.1:11500
你好啊
Wireshark 网络抓包显示:
可以看到:
- 在调用
connection.getOutputStream()
后会建立 socket 连接。 -
write()
只是把数据写入到OutputStream
中的缓存,没有调用flush()
刷新到远程。 - 在调用
connection.getInputStream()
后才发送 http 请求。
(三)多次调用 Get 请求
@Slf4j
public class DemoTest {
public static void main(String[] args) throws Exception {
String requestUrl = "http://127.0.0.1:8080/test";
IntStream.range(0, 5).forEach(v -> testGet(requestUrl));
}
@SneakyThrows
public static void testGet(String requestUrl) {
final URL url = new URL(requestUrl);
final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
// 如果还没有建立过socket 连接,则建立 socket 连接
// 如果建立过 socket 连接,则直接从缓存中取出一个可用的
// 发送 http 请求
List<String> lines = IOUtils.readLines(connection.getInputStream());
lines.forEach(System.out::println);
}
}
客户端控制台输出:
testGet
testGet
testGet
testGet
testGet
服务端控制台输出:
2021-07-26 11:31:56.864 INFO 11832 --- [nio-8080-exec-5] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9611
2021-07-26 11:31:56.873 INFO 11832 --- [nio-8080-exec-6] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9611
2021-07-26 11:31:56.875 INFO 11832 --- [nio-8080-exec-6] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9611
2021-07-26 11:31:56.876 INFO 11832 --- [nio-8080-exec-7] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9611
2021-07-26 11:31:56.878 INFO 11832 --- [nio-8080-exec-8] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9611
Wireshark 网络抓包显示:
可以看到,只建立了一个 socket 连接,后续的请求都是走的同一个 socket 连接。
(四)多线程调用 Get 请求
@Slf4j
public class DemoTest {
public static void main(String[] args) throws Exception {
String requestUrl = "http://127.0.0.1:8080/test";
IntStream.range(0, 6).forEach(v -> testGetWithThread(requestUrl));
// 睡眠 1s,从而让上一轮执行的 socket 连接被放入缓存池中
Thread.sleep(1 * 1000);
IntStream.range(0, 6).forEach(v -> testGetWithThread(requestUrl));
}
public static void testGetWithThread(String requestUrl) {
new Thread(() -> {
testGet(requestUrl);
}).start();
}
@SneakyThrows
public static void testGet(String requestUrl) {
final URL url = new URL(requestUrl);
final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
// 如果还没有建立过socket 连接,则建立 socket 连接
// 如果建立过 socket 连接,则直接从缓存中取出一个可用的
// 发送 http 请求
List<String> lines = IOUtils.readLines(connection.getInputStream());
lines.forEach(System.out::println);
}
}
客户端控制台输出:
testGet
testGet
testGet
testGet
testGet
testGet
testGet
testGet
testGet
testGet
testGet
testGet
服务端控制台输出:
2021-07-26 11:41:45.812 INFO 11832 --- [io-8080-exec-10] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9099
2021-07-26 11:41:45.812 INFO 11832 --- [nio-8080-exec-6] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9095
2021-07-26 11:41:45.812 INFO 11832 --- [nio-8080-exec-2] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9096
2021-07-26 11:41:45.812 INFO 11832 --- [nio-8080-exec-9] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9097
2021-07-26 11:41:45.812 INFO 11832 --- [nio-8080-exec-7] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9094
2021-07-26 11:41:45.812 INFO 11832 --- [nio-8080-exec-3] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9098
2021-07-26 11:41:46.808 INFO 11832 --- [nio-8080-exec-2] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9097
2021-07-26 11:41:46.808 INFO 11832 --- [nio-8080-exec-5] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9094
2021-07-26 11:41:46.808 INFO 11832 --- [nio-8080-exec-6] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9099
2021-07-26 11:41:46.808 INFO 11832 --- [nio-8080-exec-1] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9095
2021-07-26 11:41:46.808 INFO 11832 --- [nio-8080-exec-4] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9098
2021-07-26 11:41:46.808 INFO 11832 --- [nio-8080-exec-7] com.example.demo.DemoController : remote ip:port = 127.0.0.1:9103
可以看到,除了 9096
和 9103
端口,其他端口的 socket 连接都被使用了两次。
源码解析
HttpURLConnection 的 connect()
方法
public void connect() throws IOException {
synchronized (this) {
connecting = true;
}
plainConnect();
}
protected void plainConnect() throws IOException {
synchronized (this) {
// 如果已经连接过了,则直接返回
if (connected) {
return;
}
}
// run without additional permission
plainConnect0();
}
protected void plainConnect0() throws IOException {
try {
// 1. 获取 HttpClient
http = getNewHttpClient(url, null, connectTimeout);
http.setReadTimeout(readTimeout);
} catch (IOException e) {
throw e;
}
// 设置为已连接状态
connected = true;
}
protected HttpClient getNewHttpClient(URL url, Proxy p, int connectTimeout)
throws IOException {
return HttpClient.New(url, p, connectTimeout, this);
}
// HttpClient class
public static HttpClient New(URL url, Proxy p, int to,
HttpURLConnection httpuc) throws IOException
{
return New(url, p, to, true, httpuc);
}
public static HttpClient New(URL url, Proxy p, int to, boolean useCache,
HttpURLConnection httpuc) throws IOException
{
HttpClient ret = null;
// 1. 从缓存中获取 HttpClient
if (useCache) {
ret = kac.get(url, null);
}
// 2. 缓存中没有,则创建一个新的 HttpClient,并建立 socket 连接
if (ret == null) {
ret = new HttpClient(url, p, to);
}
return ret;
}
HttpClient 的 finished()
方法
public void finished() {
if (reuse) /* will be reused */
return;
// keep-alive 的连接数减一
keepAliveConnections--;
// OutputStream 的 buffer 设置为 null
poster = null;
if (keepAliveConnections > 0 && isKeepingAlive() &&
!(serverOutput.checkError())) {
// 如果可以,则放入缓存
putInKeepAliveCache();
} else {
// 否则,关闭 socket 连接
closeServer();
}
}
protected synchronized void putInKeepAliveCache() {
if (inCache) {
assert false : "Duplicate put to keep alive cache";
return;
}
inCache = true;
kac.put(url, null, this);
}
// KeepAliveCache
public synchronized void put(final URL url, Object obj, HttpClient http) {
// 1. 创建缓存 key
KeepAliveKey key = new KeepAliveKey(url, obj);
// 2. 把当前 HttpClient 放入缓存中
ClientVector v = super.get(key);
v.put(http);
}
class KeepAliveKey {
public KeepAliveKey(URL url, Object obj) {
// obj 一般为 null
// 以 protocol:host:port 为 key
// host:port 表示一个远程服务
// protocol 表示当前服务支持的协议
this.protocol = url.getProtocol();
this.host = url.getHost();
this.port = url.getPort();
this.obj = obj;
}
}
synchronized void put(HttpClient h) {
// KeepAliveCache.getMaxConnections() 的默认值为 5
// 表示最多缓存 5 个 HttpClient
if (size() >= KeepAliveCache.getMaxConnections()) {
// 1. 如果已经缓存了 5 个 HttpClient,则关闭当前 HttpClient 的 socket 连接
h.closeServer(); // otherwise the connection remains in limbo
} else {
// 2. 如果还没有缓存 5 个 HttpClient,则将当前 HttpClient 加入缓存
push(new KeepAliveEntry(h, System.currentTimeMillis()));
}
}
HttpURLConnection 的 getOutputStream()
方法
/*
* Allowable input/output sequences:
* [interpreted as request entity]
* - get output, [write output,] get input, [read input]
* - get output, [write output]
* [interpreted as GET]
* - get input, [read input]
* Disallowed:
* - get input, [read input,] get output, [write output]
*/
@Override
public synchronized OutputStream getOutputStream() throws IOException {
connecting = true;
return getOutputStream0();
}
private synchronized OutputStream getOutputStream0() throws IOException {
try {
// 没有设置 connection.setDoOutput(true); 则抛出异常
if (!doOutput) {
throw new ProtocolException("cannot write to a URLConnection"
+ " if doOutput=false - call setDoOutput(true)");
}
// method 如果是 GET,则修改为 POST
if (method.equals("GET")) {
method = "POST"; // Backward compatibility
}
if (!checkReuseConnection())
connect(); // 建立 socket 连接
if (poster == null) {
poster = new PosterOutputStream();
}
return poster;
}
}
HttpURLConnection 的 getInputStream()
方法
@Override
public synchronized InputStream getInputStream() throws IOException {
connecting = true;
return getInputStream0();
}
private synchronized InputStream getInputStream0() throws IOException {
// doInput 默认为 true
if (!doInput) {
throw new ProtocolException("Cannot read from URLConnection"
+ " if doInput=false (call setDoInput(true))");
}
// 已经获取过 InputStream,则直接返回
if (inputStream != null) {
return inputStream;
}
try {
do {
// 如果还没有建立 socket 连接,则建立 socket 连接
if (!checkReuseConnection())
connect();
// 发送 http 请求
if (!streaming()) {
writeRequests();
}
// 获取 inputStream
inputStream = http.getInputStream();
try {
// 获取 response 的 content-length
cl = Long.parseLong(responses.findValue("content-length"));
} catch (Exception exc) { };
if (method.equals("HEAD") || cl == 0 ||
respCode == HTTP_NOT_MODIFIED ||
respCode == HTTP_NO_CONTENT) {
// 如果长度为 0,或其他 case
// 说明不用通过说明不用通过 InputStream 获取数据,则直接调用 http.finished() 方法释放 HttpClient
http.finished();
http = null;
inputStream = new EmptyInputStream();
connected = false;
}
return inputStream;
} while (redirects < maxRedirects);
}
}
网友评论