概念简介
简单介绍,不做过多描述,网上很多详细介绍
- TCP/IP
属于网络协议 - SOCKET 为内核向用户提供的可以实现各种网络通信协议的api
- FD Linux 一切皆文件,进程可以打开成百上千个文件,为了表示和区分已经打开的文件,Linux 会给每个文件分配一个编号(一个 ID),这个编号就是一个整数,被称为文件描述符(File Descriptor)
IO模型(Unix 网络编程P124)
首先一个IO操作其实分成了两个步骤:发起IO请求和实际的IO操作,
同步IO和异步IO的区别就在于第二个步骤是否阻塞,如果实际的IO读写阻塞请求进程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO复用、信号驱动IO都是同步IO,如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那么就是异步IO。
阻塞IO和非阻塞IO的区别在于第一步,发起IO请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞IO,如果不阻塞,那么就是非阻塞IO。
同步:所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回。
异步:异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。
-
阻塞blocking I/O
可读事件和实际数据读全过程全阻塞
blocking-io.jpg
-非阻塞 nonblocking I/O
可读事件由用户态轮询,数据读取过程阻塞
non-blocking-io.jpg
-
IO复用 I/O multiplexing (select and poll)
multiplexing-io.jpg
-
select的本质是采用32个整数的32位,即32*32= 1024来标识,fd值为1->1024。当fd的值超过1024限制时,就必须修改FD_SETSIZE的大小。这个时候就可以标识32*max值范围的fd。
这种设计遍历的过程非常快,因为用位的逻辑操作就可以
缺点:
a. fd超过1024时,性能无法满足,在linux 早期并发没有那么大,还可以大范围支持。
b. 网络一般分两步操作,一步是获取io事件,另一步是数据读取,从内核态读到用户态.select两步都是阻塞的 -
poll
没有1024限制,但依然是两步IO都是阻塞的 -
epoll
没有1024 限制,但第二步依赖阻塞,这也是java nio 所谓的同步非阻塞IO
-
signal driven I/O (SIGIO)
signal-driven-io.jpg
- 异步 IO
两步都不阻塞
-
asynchronous I/O (the POSIX aio_functions)
asynchronous-io.jpg -
比较
compare-io.gif
Synchronous I/O versus Asynchronous I/O
POSIX defines these two terms as follows:
- A synchronous I/O operation causes the
requesting process
to be blocked until that I/O operation completes. - An asynchronous I/O operation does not cause the
requesting process
to be blocked.
Using these definitions, the first four I/O models blocking, nonblocking, I/O multiplexing, and signal-driven I/O are all synchronous because the actual I/O operation (recvfrom)
blocks the process. Only the asynchronous I/O model matches the asynchronous I/O definition.
requesting process 可以理解为用户态当前进程
长链接VS短链接
TCP协议中有长连接和短连接之分。短连接在数据包发送完成后就会自己断开,长连接在发包完毕后,会在一定的时间内保持连接,即我们通常所说的Keepalive(存活定时器)功能。
默认的Keepalive超时需要7,200,000 milliseconds,即2小时,探测次数为5次。它的功效和用户自己实现的心跳机制是一样的。开启Keepalive功能需要消耗额外的宽带和流量,尽管这微不足道,但在按流量计费的环境下增加了费用,另一方面,Keepalive设置不合理时可能会因为短暂的网络波动而断开健康的TCP连接。
image- 保活
keepalive并不是TCP规范的一部分。在Host Requirements RFC罗列有不使用它的三个理由:
(1)在短暂的故障期间,它们可能引起一个良好连接(good connection)被释放(dropped),(2)它们消费了不必要的宽带,
(3)在以数据包计费的互联网上它们(额外)花费金钱。
然而,需要在应用层实现心跳检测与链接重连
-
网络单通
-
连接被防火墙Hand 住
-
长时间GC或者通信线程发生非预期异常
-
会导致连接不可用,而又无法及时发现。
应用场景
- 长链接
- 链接固定且频繁
- 长连接多用于操作频繁,点对点的通讯,而且连接数不能太多情况。每个TCP连接都需要三步握手,这需要时间,如果每个操作都是先连接,再操作的话那么处理速度会降低很多,所以每个操作完后都不断开,每次处理时直接发送数据包就OK了,不用建立TCP连接。例如:数据库的连接用长连接, 如果用短连接频繁的通信会造成socket错误,而且频繁的socket 创建也是对资源的浪费。
- 推送业务
-
短链接
a. 随机访问
-
而像WEB网站的http服务一般都用短链接,因为长连接对于服务端来说会耗费一定的资源,而像WEB网站这么频繁的成千上万甚至上亿客户端的连接用短连接会更省一些资源,如果用长连接,而且同时有成千上万的用户,如果每个用户都占用一个连接的话,资源占用太大。所以并发量大,但每个用户无需频繁操作情况下需用短连好。
实际场景分析(lettuce 客户端)
redis 的lettuce客户端
-
验证客户端的connection与tcp链接的关系,即与fd的关系,这里tcp物理连接与fd一一对应.
TCP协议
由于tcp是网络层协议,主要关系图中的第三层,主要几个字段source-->ip:port到destination ip:port
- 验证代码
引自 lettuce官方example
/*
* Copyright 2011-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lambdaworks.examples;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.api.StatefulRedisConnection;
import com.lambdaworks.redis.api.sync.RedisCommands;
/**
* @author Mark Paluch
*/
public class ReadWriteExample {
public static void main(String[] args) {
// Syntax: redis://[password@]host[:port][/databaseNumber]
RedisClient redisClient = RedisClient.create(RedisURI.create("redis://localhost:6379/0"));
StatefulRedisConnection<String, String> connection = redisClient.connect();
//创建10个connection
for (int i=0;i<10;i++) {
redisClient.connect();
}
System.out.println("Connected to Redis");
RedisCommands<String, String> sync = connection.sync();
sync.set("foo", "bar");
String value = sync.get("foo");
System.out.println(value);
connection.close();
redisClient.shutdown();
}
}
redis-client 监控结果
10个tcp链接对于10个fd
localhost:0>client list
"id=220 addr=127.0.0.1:62381 fd=13 name= age=42 idle=42 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=NULL
id=221 addr=127.0.0.1:62382 fd=12 name= age=42 idle=42 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=NULL
id=222 addr=127.0.0.1:62383 fd=11 name= age=42 idle=42 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=NULL
id=223 addr=127.0.0.1:62384 fd=18 name= age=42 idle=42 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=NULL
id=224 addr=127.0.0.1:62385 fd=14 name= age=42 idle=42 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=NULL
id=225 addr=127.0.0.1:62386 fd=8 name= age=42 idle=42 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=NULL
id=226 addr=127.0.0.1:62387 fd=15 name= age=42 idle=42 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=NULL
id=227 addr=127.0.0.1:62388 fd=9 name= age=42 idle=42 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=NULL
id=36 addr=127.0.0.1:57381 fd=16 name= age=18865 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=client
id=217 addr=127.0.0.1:62378 fd=20 name= age=42 idle=42 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=NULL
id=218 addr=127.0.0.1:62379 fd=10 name= age=42 idle=42 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=NULL
id=219 addr=127.0.0.1:62380 fd=19 name= age=42 idle=42 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=NULL
"
localhost:0>
AbstractRedisClient相关netty代码
{
Bootstrap redisBootstrap = new Bootstrap();
redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR);
SocketOptions socketOptions = getOptions().getSocketOptions();
redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
(int) socketOptions.getConnectTimeoutUnit().toMillis(socketOptions.getConnectTimeout()));
if (LettuceStrings.isEmpty(redisURI.getSocket())) {
redisBootstrap.option(ChannelOption.SO_KEEPALIVE, socketOptions.isKeepAlive());
redisBootstrap.option(ChannelOption.TCP_NODELAY, socketOptions.isTcpNoDelay());
}
connectionBuilder.timeout(redisURI.getTimeout(), redisURI.getUnit());
connectionBuilder.password(redisURI.getPassword());
connectionBuilder.bootstrap(redisBootstrap);
connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);
connectionBuilder.commandHandler(handler).socketAddressSupplier(socketAddressSupplier).connection(connection);
connectionBuilder.workerPool(genericWorkerPool);
}
netty 客户端的chnnel 创建
/**
* Connect and initialize a channel from {@link ConnectionBuilder}.
*
* @param connectionBuilder must not be {@literal null}.
* @return the {@link ConnectionFuture} to synchronize the connection process.
* @since 4.4
*/
@SuppressWarnings("unchecked")
protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(
ConnectionBuilder connectionBuilder) {
SocketAddress redisAddress = connectionBuilder.socketAddress();
if (clientResources.eventExecutorGroup().isShuttingDown()) {
throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
}
logger.debug("Connecting to Redis at {}", redisAddress);
CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>();
Bootstrap redisBootstrap = connectionBuilder.bootstrap();
RedisChannelInitializer initializer = connectionBuilder.build();
redisBootstrap.handler(initializer);
clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
CompletableFuture<Boolean> initFuture = initializer.channelInitialized();
//物理TCP链接创建
ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);
connectFuture.addListener(future -> {
if (!future.isSuccess()) {
logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause());
connectionBuilder.commandHandler().initialState();
channelReadyFuture.completeExceptionally(future.cause());
return;
}
initFuture.whenComplete((success, throwable) -> {
if (throwable == null) {
logger.debug("Connecting to Redis at {}: Success", redisAddress);
RedisChannelHandler<?, ?> connection = connectionBuilder.connection();
connection.registerCloseables(closeableResources, connection);
channelReadyFuture.complete(connectFuture.channel());
return;
}
logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable);
connectionBuilder.commandHandler().initialState();
Throwable failure;
if (throwable instanceof RedisConnectionException) {
failure = throwable;
} else if (throwable instanceof TimeoutException) {
failure = new RedisConnectionException("Could not initialize channel within "
+ connectionBuilder.getTimeout() + " " + connectionBuilder.getTimeUnit(), throwable);
} else {
failure = throwable;
}
channelReadyFuture.completeExceptionally(failure);
CompletableFuture<Boolean> response = new CompletableFuture<>();
response.completeExceptionally(failure);
});
});
return new DefaultConnectionFuture<>(redisAddress, channelReadyFuture.thenApply(channel -> (T) connectionBuilder
.connection()));
}
由代码和监控可确认,luttuce redis 的connection对netty的channel一一对应,同时服务器会一一创建tcp链接!(这里强调一下是lettuce的redis 客户端,而spring的封装并非如此
)
Spring Boot RedisTemplate
由于本地redis 这里的demo未使用集群模式,不影响测试效果
- 配置文件
#spring.redis.cluster.nodes= 192.168.2.10:9000,192.168.2.14:9001,192.168.2.13:9000
#spring.redis.cluster.max-redirects=3
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.timeout=5000ms
# Lettuce
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.lettuce.pool.max-wait=1ms
# 连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=10
# 连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=8
# 关闭超时时间
spring.redis.lettuce.shutdown-timeout=100ms
#驱逐时间 初始化延迟时间 默认-1
#if (delay > 0L) 必须>时才初始化
(这句很重要,这个参数默认为-1,不>0L则驱逐任务不会生成,池中的idel max 等参数等于没配置,不会生效。第二,如果是默认配置,即使生效也不会读池中链接,更浪费资源!!!代码见下边引用)
#spring.redis.lettuce.pool.time-between-eviction-runs=1s
引用驱逐器代码
BaseGenericObjectPool
final void startEvictor(long delay) {
Object var3 = this.evictionLock;
synchronized(this.evictionLock) {
EvictionTimer.cancel(this.evictor, this.evictorShutdownTimeoutMillis, TimeUnit.MILLISECONDS);
this.evictor = null;
this.evictionIterator = null;
if (delay > 0L) {
this.evictor = new BaseGenericObjectPool.Evictor();
EvictionTimer.schedule(this.evictor, delay, delay);
}
}
}
- 测试代码
package com.sparrow.spring.boot;
import com.sparrow.spring.Application;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {Application.class})
public class RedisConnectionTest {
@Autowired
private RedisTemplate redisTemplate;
@Test
public void test() throws IOException {
List<RedisClientInfo> redisClientInfos = redisTemplate.getClientList();
List<RedisConnection> redisConnections = new ArrayList<>();
//模拟多线程访问场景,如果按lettuce 的逻辑,这里应该每个链接都会对应一个socket 链接?
for (int i = 0; i < 200; i++) {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
redisTemplate.opsForValue().get("a" + System.currentTimeMillis());
}
}
}).start();
}
System.in.read();
}
}
- 执行后的redis client list结果
只有两条链接(fd),其中一条为当前client
localhost:0>client list
"id=36 addr=127.0.0.1:57381 fd=16 name= age=20394 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=client
id=250 addr=127.0.0.1:63325 fd=14 name= age=18 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=get
"
localhost:0>
- 可见结果,spring boot redistemplate 不管开多少个线程,都只有一个链接,为什么?
摘自LettuceConnectionFactory官方代码注释
This factory creates a new @link LettuceConnection on each call to @link #getConnection(). Multiple @link LettuceConnection's share a single thread-safe native connection by default.
当我们调用getConnection()方法获取链接时,多个LettuceConnection会共享一个线程安全的 native connection(默认情况,意思就是说可以重写,可以改)
/**
* Connection factory creating <a href="http://github.com/mp911de/lettuce">Lettuce</a>-based connections.
* <p>
* This factory creates a new {@link LettuceConnection} on each call to {@link #getConnection()}. Multiple
* {@link LettuceConnection}s share a single thread-safe native connection by default.
当我们调用getConnection()方法获取链接时,多个LettuceConnection会共享一个线程安全的`native connection`
* <p>
* The shared native connection is never closed by {@link LettuceConnection}, therefore it is not validated by default
* on {@link #getConnection()}. Use {@link #setValidateConnection(boolean)} to change this behavior if necessary. Inject
* a {@link Pool} to pool dedicated connections. If shareNativeConnection is true, the pool will be used to select a
* connection for blocking and tx operations only, which should not share a connection. If native connection sharing is
* disabled, the selected connection will be used for all operations.
* <p>
* ....
*/
public class LettuceConnectionFactory
implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {
总结
- redis 长链接情况下,物理链接非常少,甚至可以共享一个,而并不影响并发效果,线上压测结果 qps 40000左右,CPU不到2%,当然测试效果与实际key有关,但5W左右是没有问题的
- redis 的client list 命令和info clients 命令可以查看当前服务器的tcp链接数
- tcp 物理链接与netty的channel一一对应,可以简单理解netty的channel 是对tcp链接的封装,里边实现了很多事。具体可以参考,李林峰老师的《netty权威指南》
- channel可以被多线程共享
- tcp链接与fd一一对应,受内核的最大fd数限制,但非1024限制,该值可以修改,与内存有关,理论上不受限。
网友评论