1. 准备工作
MAC安装Redis集群的博客【三主三从】
https://www.imooc.com/article/details/id/283036 【找不到src目录,无法建立关联关系】
https://blog.csdn.net/weixin_38003389/article/details/86295944【最终使用这种方式建立关联关系】
引入依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
测试代码:
public class JedisClusterPipeline {
public static void main(String[] args) {
test();
}
public static void test() {
JedisPoolConfig poolConfig = new JedisPoolConfig();
Set<HostAndPort> nodes = new HashSet<HostAndPort>();
nodes.add(new HostAndPort("127.0.0.1", 7000));
nodes.add(new HostAndPort("127.0.0.1", 7001));
nodes.add(new HostAndPort("127.0.0.1", 7002));
nodes.add(new HostAndPort("127.0.0.1", 7003));
nodes.add(new HostAndPort("127.0.0.1", 7004));
nodes.add(new HostAndPort("127.0.0.1", 7005));
//JedisCluster中默认分装好了连接池.
JedisCluster jedisCluster = new JedisCluster(nodes, poolConfig);
String string = jedisCluster.get("HH:acd");
System.out.println(string);
}
}
2. Redis集群简介
2.1 为什么需要Redis Cluster
主从不能解决故障自动恢复问题,哨兵已经可以解决故障自动恢复了,那到底为啥还要集群模式呢?
主从和哨兵都还有另外一些问题没有解决,单个节点的存储能力是有上限,访问能力是有上限的。Redis Cluster 集群模式具有 高可用、可扩展性、分布式、容错 等特性。
Redis 集群并没有使用一致性hash,而是引入了哈希槽的概念。
Redis 集群有16384(2^14)个哈希槽,每个key通过CRC16校验后对16384取模来决定放置哪个槽,集群的每个节点负责一部分hash槽。
这种结构很容易添加或者删除节点,并且无论是添加删除或者修改某一个节点,都不会造成集群不可用的状态。
2.2 Cluster 集群模式的原理
通过数据分片的方式来进行数据共享问题,同时提供数据复制和故障转移功能。
之前的两种模式数据都是在一个节点上的,单个节点存储是存在上限的。集群模式就是把数据进行分片存储,当一个分片数据达到上限的时候,就分成多个分片。
2.3 数据分片怎么分?
集群的键空间被分割为16384个slots[slɑːts]
(即hash槽),通过hash的方式将数据分到不同的分片上的。
2.4 为什么分成16384个slots
CRC16算法产生的hash值有16bit,可以产生的值在0~65535之间,但是作者却是HASH_SLOT = CRC16(客户端key) mod 16384。即使用了16384个槽;
- 在redis节点发送心跳包时需要把所有的槽放到这个心跳包里,以便让节点知道当前集群信息,16384=16k,在发送心跳包时使用char进行bitmap压缩后是2k(2 * 8 (8 bit) * 1024(1k) = 2K),也就是说使用2k的空间创建了16k的槽数。
65535=65k,压缩后就是8k(8 * 8 (8 bit) * 1024(1k) = 8K),也就是说需要需要8k的心跳包。 - redis的集群主节点数量基本不可能超过1000个。如上所述,集群节点越多,心跳包的消息体内携带的数据越多。如果节点过1000个,也会导致网络拥堵。因此redis作者,不建议redis cluster节点数量超过1000个。那么,对于节点数在1000以内的redis cluster集群,16384个槽位够用了。没有必要拓展到65536个。
- 槽位越小,节点少的情况下,压缩比高。
2.5 Redis Cluster为什么不支持批量操作
我们通过redis-cli -h 127.0.0.1 -p 7000
登录上服务器端,操作命令:
127.0.0.1:7000> get HH:124
(error) MOVED 6112 127.0.0.1:7001
127.0.0.1:7000>
这个意思说明,因为HH:124
在7001端口的节点上,所以不允许插入。
Redis Cluster只允许节点操作自己分片的数据。
目前只支持对具有相同 slot 值的 key 执行 批量操作。对于映射为不同 slot 值的 key 由于执行 mget、mget 等操作可能存在于多个节点上,因此不被支持。
操作批量命令出现异常:
127.0.0.1:7000> mget HH:ABC HH:123
(error) CROSSSLOT Keys in request don't hash to the same slot
127.0.0.1:7000>
2. JedisCluster源码分析
原理:在创建JedisCluster对象时,维护了
slots
和JedisPool
的缓存。当进行Redis的操作时,通过CRC16算法计算出key的slot值,然后创建出对应节点的connection,然后与对应节点进行操作。
2.1 创建JedisCluster对象
2.1.1 构造方法
在创建JedisCluster的构造方法中,会创建JedisClusterConnectionHandler
对象。
先创建一个Jedis对象,Jedis会想Redis服务端发送cluster slots
命令获取集群中节点和slots的信息。然后开始创建slots和JedisPool的缓存。
public abstract class JedisClusterConnectionHandler implements Closeable {
protected final JedisClusterInfoCache cache;
...
public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName) {
//初始化cache对象
this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password, clientName);
//将slots节点缓存到cache对象中。
initializeSlotsCache(nodes, poolConfig, connectionTimeout, soTimeout, password, clientName);
}
...
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig,
int connectionTimeout, int soTimeout, String password, String clientName) {
//此处是配置的所有节点信息(只有出现异常,才开启下一次循环)
for (HostAndPort hostAndPort : startNodes) {
Jedis jedis = null;
try {
//创建了Jedis对象
jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout);
if (password != null) {
jedis.auth(password);
}
if (clientName != null) {
jedis.clientSetname(clientName);
}
//维护slots和jedis的缓存
cache.discoverClusterNodesAndSlots(jedis);
//注意,若是不出现异常,for循环只允许一次就结束循环。
break;
} catch (JedisConnectionException e) {
// try next nodes
} finally {
if (jedis != null) {
//关闭jedis。
jedis.close();
}
}
}
}
...
}
2.1.2 维护缓存的方法
发送cluster slots
命令得到的结果:
127.0.0.1:7000> CLUSTER slots
1) 1) (integer) 0
2) (integer) 5461
3) 1) "127.0.0.1"
2) (integer) 7000
3) "da11851495f913e8bfdb68caa2ce6ea663ac64fc"
4) 1) "127.0.0.1"
2) (integer) 7003
3) "26214321c00e6cf8c9c4ae08e9d695b96cad2b06"
2) 1) (integer) 5462
2) (integer) 10922
3) 1) "127.0.0.1"
2) (integer) 7001
3) "0e9b80dfaee9024c9515d5909f7d164dcebf5f4e"
4) 1) "127.0.0.1"
2) (integer) 7004
3) "edea4c8a9bf3930f46a26867fc181b3eddea35dc"
3) 1) (integer) 10923
2) (integer) 16383
3) 1) "127.0.0.1"
2) (integer) 7005
3) "ce64849154df9576867112f1bd7ede7c0cea39cf"
4) 1) "127.0.0.1"
2) (integer) 7002
3) "d1774278f03df8abb341986a034eca97172f5333"
127.0.0.1:7000>
slotInfo集合结构是:

public class JedisClusterInfoCache {
//注意:最终维护的两个缓存的key和value
private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
...
private static final int MASTER_NODE_INDEX = 2;
...
public void discoverClusterNodesAndSlots(Jedis jedis) {
w.lock();
try {
//先清空旧数据【上一次出现异常时,才会继续去调用discoverClusterNodesAndSlots,需要将上次的缓存清空】
reset();
//此处通过socket通信,发送CLUSTER slots命令到对应的节点上
//获取到集群的slots+节点信息。
List<Object> slots = jedis.clusterSlots();
//得到的是主节点信息
for (Object slotInfoObj : slots) {
//【结构如上图所示】
List<Object> slotInfo = (List<Object>) slotInfoObj;
if (slotInfo.size() <= MASTER_NODE_INDEX) {
continue;
}
//得到对应的slot的List数组
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
int size = slotInfo.size();
//开始解析节点信息
for (int i = MASTER_NODE_INDEX; i < size; i++) {
//获取节点信息
List<Object> hostInfos = (List<Object>) slotInfo.get(i);
if (hostInfos.size() <= 0) {
continue;
}
// 获取到ip+端口
HostAndPort targetNode = generateHostAndPort(hostInfos);
//开始维护缓存,key:ip:port;value:JedisPool
setupNodeIfNotExist(targetNode);
//i==2,即是主节点,才会维护solt和jedisPool的关系
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
}
}
}
} finally {
w.unlock();
}
}
2.2 调用方法
public class JedisCluster extends BinaryJedisCluster implements JedisCommands,
MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {
@Override
public String get(final String key) {
//只是创建了一个JedisClusterCommand子类对象,对象调用run方法
return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
@Override
public String execute(Jedis connection) {
return connection.get(key);
}
}.run(key);
}
}
注意:调用runWithRetries
的方法参数
public T run(String key) {
if (key == null) {
throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
}
//这个是带有重试的方法执行
//JedisClusterCRC16.getSlot(key)计算key所在的hash槽数
return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, false);
}
JedisCluster和Redis交互前,需要计算key对应的slot,然后获取对应的Connection对象。之后就是Jedis
客户端和Redis进行通信。
相关文章可以参考:Jedis源码分析
public abstract class JedisClusterCommand<T> {
private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, boolean asking) {
if (attempts <= 0) {
throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
}
Jedis connection = null;
try {
if (asking) {
// TODO: Pipeline asking with the original command to make it
// faster....
connection = askConnection.get();
connection.asking();
// if asking success, reset asking flag
asking = false;
} else {
if (tryRandomNode) {
connection = connectionHandler.getConnection();
} else {
//因为tryRandomNode和asking都是false,方法会到达此处
//计算key对应的JedisPool,更加JedisPool生成Connection对象
connection = connectionHandler.getConnectionFromSlot(slot);
}
}
//调用public String get(final String key) 中的声明的子类实现的方法
return execute(connection);
} catch (JedisNoReachableClusterNodeException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
//在递归之前释放当前连接
releaseConnection(connection);
connection = null;
if (attempts <= 1) {
//重新缓存slots与JedisPool的关系
this.connectionHandler.renewSlotCache();
}
return runWithRetries(slot, attempts - 1, tryRandomNode, asking);
} catch (JedisRedirectionException jre) {
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
//如果集群的节点发送移动,重新请求集群,维护缓存
this.connectionHandler.renewSlotCache(connection);
}
// release current connection before recursion or renewing
releaseConnection(connection);
connection = null;
if (jre instanceof JedisAskDataException) {
asking = true;
askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
} else if (jre instanceof JedisMovedDataException) {
} else {
throw new JedisClusterException(jre);
}
//重新调用(重试机制)
return runWithRetries(slot, attempts - 1, false, asking);
} finally {
releaseConnection(connection);
}
}
}
有同学就会问:你看runWithRetries
方法抛出的异常都是JAVA异常,Redis返回结果怎么转换成Exception类呢?
JedisCluster计算完key所在的slot并获取到Connection对象后,便会调用该方法。
public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands {
@Override
public String get(final String key) {
checkIsInMultiOrPipeline();
//拼装命令到buf数组缓存中
client.sendCommand(Protocol.Command.GET, key);
//使用socket和Redis节点进行通信
return client.getBulkReply();
}
}
public String getBulkReply() {
//socket通信
final byte[] result = getBinaryBulkReply();
//编码转换
if (null != result) {
return SafeEncoder.encode(result);
} else {
return null;
}
}
flush方法是RedisOutputStream
的方法,其作用就是将buf数组的命令写入到socket.outputStream流中,即将命令发送给了Redis。
readProtocolWithCheckingBroken()方法是接受socket.inputStream流的信息。
public byte[] getBinaryBulkReply() {
flush();
pipelinedCommands--;
return (byte[]) readProtocolWithCheckingBroken();
}
最终到达此处:
因为流是单向的,所以RedisInputStream
装饰了socket.inputStream流,将其读取到缓存中,完成多次读取。
根据Redis返回结果的前缀的不同,使用不同的方法解析流的信息。
private static Object process(final RedisInputStream is) {
//将流读取到缓存中。
final byte b = is.readByte();
switch(b) {
case PLUS_BYTE:
return processStatusCodeReply(is);
case DOLLAR_BYTE:
return processBulkReply(is);
case ASTERISK_BYTE:
return processMultiBulkReply(is);
case COLON_BYTE:
return processInteger(is);
case MINUS_BYTE:
//将Redis返回的信息转换为对应的异常
processError(is);
return null;
default:
throw new JedisConnectionException("Unknown reply: " + (char) b);
}
}
根据前缀的不同,转换为不同的JAVA异常类型。
private static void processError(final RedisInputStream is) {
String message = is.readLine();
// TODO: I'm not sure if this is the best way to do this.
// Maybe Read only first 5 bytes instead?
if (message.startsWith(MOVED_RESPONSE)) {
String[] movedInfo = parseTargetHostAndSlot(message);
throw new JedisMovedDataException(message, new HostAndPort(movedInfo[1],
Integer.parseInt(movedInfo[2])), Integer.parseInt(movedInfo[0]));
} else if (message.startsWith(ASK_RESPONSE)) {
String[] askInfo = parseTargetHostAndSlot(message);
throw new JedisAskDataException(message, new HostAndPort(askInfo[1],
Integer.parseInt(askInfo[2])), Integer.parseInt(askInfo[0]));
} else if (message.startsWith(CLUSTERDOWN_RESPONSE)) {
throw new JedisClusterException(message);
} else if (message.startsWith(BUSY_RESPONSE)) {
throw new JedisBusyException(message);
} else if (message.startsWith(NOSCRIPT_RESPONSE) ) {
throw new JedisNoScriptException(message);
}
throw new JedisDataException(message);
}
网友评论