引入对应的依赖,Jedis的版本2.9.1
,SpringBoot版本2.1.3.RELEASE
。
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
1. JAVA操作下Jedis源码分析
Redis客户端和服务端之间的通信是借助Socket来实现的。
socket通信—实现网络间的IO通信
public class JedisPipeline {
//密码xxx,使用1号库
private static JedisPool pool = new JedisPool(new JedisPoolConfig(), "127.0.0.1",6379, 2000, "xxx", 1);
public static void main(String args[]) {
test1();
}
public static void test1(){
//在连接池中拿出连接
Jedis jedis = pool.getResource();
//执行查询String结构的命令。
jedis.get("CCC");
jedis.close();
}
}
1.1 获取到Jedis连接
1.1.1 创建socket连接
获取Jedis对象.png创建Connection对象,实际上创建socket连接,并获取到输出流和输入流。
public class Connection implements Closeable {
//装饰的socket的输出和输入流
private RedisOutputStream outputStream;
private RedisInputStream inputStream;
...
public void connect() {
if (!isConnected()) {
try {
socket = new Socket();
socket.setReuseAddress(true);
socket.setKeepAlive(true);
socket.setTcpNoDelay(true);
socket.setSoLinger(true, 0);
socket.connect(new InetSocketAddress(host, port), connectionTimeout);
socket.setSoTimeout(soTimeout);
...
outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream());
} catch (IOException ex) {
broken = true;
throw new JedisConnectionException("Failed connecting to host "
+ host + ":" + port, ex);
}
}
}
}
上面的RedisOutputStream和RedisInputStream是装饰流,装饰socket.getInputStream()
1.1.2 装饰socket的输出流
FilterOutputStream
是装饰流类,目的就是装饰socket的outputStream流。
当使用pipeline传递多个命令时,可以确保os.write()
方法只是将命令写入到buf数组中,直到执行flush()
方法时,才会将命令通过socket发送给redis服务端。
public final class RedisOutputStream extends FilterOutputStream {
protected final byte[] buf;
...
//注意:实际上是写入到buf中,并不会写入到socket。
public void write(final byte b) throws IOException {
if (count == buf.length) {
flushBuffer();
}
buf[count++] = b;
}
...
@Override
public void flush() throws IOException {
//此时才将信息刷新到socket中。
flushBuffer();
out.flush();
}
//将buf数组刷新到socket流中
private void flushBuffer() throws IOException {
if (count > 0) {
out.write(buf, 0, count);
count = 0;
}
}
}
1.2 jedis的get命令
public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands {
@Override
public String get(final String key) {
//检查是否是批量或者管道操作
checkIsInMultiOrPipeline();
//组装os流。
client.sendCommand(Protocol.Command.GET, key);
//这个方法是真正进行socket通信,内部含有flush方法。
return client.getBulkReply();
}
}
1.2.1 将命令读取到outputStream的缓存
将命令读取到RedisOutputStream
中的buf
数组进行缓存。
public class Connection implements Closeable {
protected Connection sendCommand(final Command cmd, final String... args) {
//转换参数编码
final byte[][] bargs = new byte[args.length][];
for (int i = 0; i < args.length; i++) {
bargs[i] = SafeEncoder.encode(args[i]);
}
return sendCommand(cmd, bargs);
}
protected Connection sendCommand(final Command cmd, final byte[]... args) {
try {
//因为在`pool.getResource();`中建立了socket连接
connect();
//发送命令
Protocol.sendCommand(outputStream, cmd, args);
//int +1 记录使用
pipelinedCommands++;
return this;
} catch (JedisConnectionException ex) {
try {
String errorMessage = Protocol.readErrorLineIfPossible(inputStream);
if (errorMessage != null && errorMessage.length() > 0) {
ex = new JedisConnectionException(errorMessage, ex.getCause());
}
} catch (Exception e) {
/*
* Catch any IOException or JedisConnectionException occurred from InputStream#read and just
* ignore. This approach is safe because reading error message is optional and connection
* will eventually be closed.
*/
}
// Any other exceptions related to connection?
broken = true;
throw ex;
}
}
}
Protocol
将命令拼装成AOF
文件的格式,该方法进行拼接。
public final class Protocol {
public static void sendCommand(final RedisOutputStream os, final Command command,
final byte[]... args) {
sendCommand(os, command.raw, args);
}
private static void sendCommand(final RedisOutputStream os, final byte[] command,
final byte[]... args) {
//组装的是AOF格式的命令,读取到byte[]数组中
try {
os.write(ASTERISK_BYTE);
os.writeIntCrLf(args.length + 1);
os.write(DOLLAR_BYTE);
os.writeIntCrLf(command.length);
os.write(command);
os.writeCrLf();
for (final byte[] arg : args) {
os.write(DOLLAR_BYTE);
os.writeIntCrLf(arg.length);
os.write(arg);
os.writeCrLf();
}
} catch (IOException e) {
throw new JedisConnectionException(e);
}
}
}
拼接后的格式:
*2
$3
GET
$3
CCC
1.2.2 将命令发送给Redis以及接收Redis返回值
-
flush()
方法中,将RedisOutputStream
的buf数组的信息写入socket(即发送给redis中) -
readProtocolWithCheckingBroken()
,读取socket信息(Redis返回的值)到内存中。
当Redis没有返回值,那么readProtocolWithCheckingBroken()
方法将会阻塞。
public class Connection implements Closeable {
public String getBulkReply() {
//发送命令并接收redis返回的结果
final byte[] result = getBinaryBulkReply();
if (null != result) {
//进行编码
return SafeEncoder.encode(result);
} else {
return null;
}
}
//执行
public byte[] getBinaryBulkReply() {
//发送命令【上面进行分析】
flush();
//管道的命令-1
pipelinedCommands--;
//通过socket获取结果
return (byte[]) readProtocolWithCheckingBroken();
}
}
2. pipeline源码分析
pipeline和上述代码原理一样,通过socket来完成通信,先接受多个命令缓存到outputStream
的buf
数组中,当执行sync()
方法或syncAndReturnAll()
方法时,将buf
数组的内容输出到socket中。
public class JedisPipeline {
//密码xxx,使用1号库
private static JedisPool pool = new JedisPool(new JedisPoolConfig(), "127.0.0.1",6379, 2000, "xxx", 1);
public static void main(String args[]) {
test1();
}
public static void test3(){
Jedis jedis = pool.getResource();
Pipeline pipelined = jedis.pipelined();
//多个命令
pipelined.get("AAA");
pipelined.get("CCC");
pipelined.set("CC","CCTV1");
//此时发送给Redis(无结果)
//pipelined.sync();
//此时发送给Redis(有结果)
List<Object> objects = pipelined.syncAndReturnAll();
System.out.println(JSON.toJSONString(objects));
jedis.close();
}
}
执行结果
[null,"CCTV","OK"]
Redis服务端是单线程,所以返回的结果值顺序就是命令的顺序。
Redis客户端为了保证命令的结果类型(String、Hash结果类型)和结果值一一对应,会按照顺序将结果类型放入到Queue中。
public abstract class PipelineBase extends Queable implements BinaryRedisPipeline, RedisPipeline {
@Override
public Response<String> get(final String key) {
//将命令维护到OutputStream的buf缓存中
getClient(key).get(key);
//将命令的结果类型维护到缓存中
return getResponse(BuilderFactory.STRING);
}
}
存储命令的响应对象。
public class Queable {
private Queue<Response<?>> pipelinedResponses = new LinkedList<Response<?>>();
protected void clean() {
pipelinedResponses.clear();
}
protected Response<?> generateResponse(Object data) {
Response<?> response = pipelinedResponses.poll();
if (response != null) {
response.set(data);
}
return response;
}
protected <T> Response<T> getResponse(Builder<T> builder) {
Response<T> lr = new Response<T>(builder);
pipelinedResponses.add(lr);
return lr;
}
protected boolean hasPipelinedResponse() {
return !pipelinedResponses.isEmpty();
}
protected int getPipelinedResponseLength() {
return pipelinedResponses.size();
}
}
public class Pipeline extends MultiKeyPipelineBase implements Closeable {
public List<Object> syncAndReturnAll() {
if (getPipelinedResponseLength() > 0) {
//将多个命令发送给redis,并接收到Redis的响应结果
List<Object> unformatted = client.getAll();
List<Object> formatted = new ArrayList<Object>();
for (Object o : unformatted) {
try {
//`generateResponse`是Queable方法。响应结果按照结果类型进行组装,得到真正的返回结果。
formatted.add(generateResponse(o).get());
} catch (JedisDataException e) {
formatted.add(e);
}
}
return formatted;
} else {
return java.util.Collections.<Object> emptyList();
}
}
}
网友评论