美文网首页
jedis实现分布式锁

jedis实现分布式锁

作者: 书唐瑞 | 来源:发表于2020-09-15 04:17 被阅读0次

    通过此篇文章可以了解Redis的底层通信,Redis的协议,以及自己手写与服务器通信.

    在分布式锁的实现上, 基于Redis的实现是其中一种.

    而具体的实现依赖包又有两个

    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>3.0.1</version>
    </dependency>
    
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.10.1</version>
    </dependency>
    

    本篇文章我们就讲解第一种Jedis.

    基于Jedis又有两种实现

    // 单机
    Jedis(String host, int port, int connectionTimeout, int soTimeout)
    
    // 集群
    JedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, GenericObjectPoolConfig poolConfig)
    

    不管是单机还是集群,它们的底层和服务器之间的通信都是基于java.net.Socket

    比如说我们通过Jedis(String host, int port, int connectionTimeout, int soTimeout)构造了一个客户端,连接单机的服务器.

    public Jedis(final String host, final int port, final int connectionTimeout, final int soTimeout) {
        // 调用父类BinaryJedis
        super(host, port, connectionTimeout, soTimeout);
    }
    
    public BinaryJedis(final String host, final int port, final int connectionTimeout,
          final int soTimeout) {
        // 创建一个Client,它并没有连接服务器,只是先保存了host,port. 在Client类内部有个Socket属性.
        client = new Client(host, port);
        client.setConnectionTimeout(connectionTimeout);
        client.setSoTimeout(soTimeout);
    }
    

    调用首次调用setnx向服务器发送命令时,会连接服务器

    public Long setnx(final String key, final String value) {
        checkIsInMultiOrPipeline();
        // 调用上面构造好的client的setnx方法
        client.setnx(key, value);
        return client.getIntegerReply();
      }
    

    最后会跟进到如下代码

    protected Connection sendCommand(final Command cmd, final byte[]... args) {
        try {
            // 连接服务器
            connect();
            // 发送指令
            Protocol.sendCommand(outputStream, cmd, args);
            pipelinedCommands++;
            return this;
        } catch (JedisConnectionException ex) {
            // ...
        }
    }
    

    跟进到connect()方法

    public void connect() {
        // 判断socket是否已连接,如果没有连接服务器则连接服务器
        if (!isConnected()) {
            try {
                socket = new Socket();
                socket.setReuseAddress(true);
                socket.setKeepAlive(true);
                socket.setTcpNoDelay(true);
                socket.setSoLinger(true, 0);
                socket.connect(new InetSocketAddress(host, port), connectionTimeout);
                socket.setSoTimeout(soTimeout);
                // 输出流
                outputStream = new RedisOutputStream(socket.getOutputStream());
                // 输入流
                inputStream = new RedisInputStream(socket.getInputStream());
            } catch (IOException ex) {
                broken = true;
                throw new JedisConnectionException(ex);
            }
        }
    }
    

    这就是最熟悉的通过Socket和服务器进行通信,还有输入输出流.

    连接好服务器之后,接下来就是发送命令给服务器了.

    跟进到发送命令的代码

    private static void sendCommand(final RedisOutputStream os, final byte[] command,
          final byte[]... args) {
        // 通过输出流向服务器发送数据
        try {
            os.write(ASTERISK_BYTE);
            os.writeIntCrLf(args.length + 1);
            os.write(DOLLAR_BYTE);
            os.writeIntCrLf(command.length);
            os.write(command);
            os.writeCrLf();
    
            for (final byte[] arg : args) {
                os.write(DOLLAR_BYTE);
                os.writeIntCrLf(arg.length);
                os.write(arg);
                os.writeCrLf();
            }
        } catch (IOException e) {
            throw new JedisConnectionException(e);
        }
    }
    

    通过输出流,将命令发送给服务器.

    下面我们将这个方法里面的常量替换一下,把一些不必要的代码删除,再看下这个方法

    // #1
    os.write('*');
    os.write(args.length + 1);
    os.write("\r\n");
    
    // #2
    os.write('$');
    os.write(command.length);
    os.write("\r\n");
    os.write(command);
    os.write("\r\n");
    
    // #3
    for (final byte[] arg : args) {
        os.write('$');
        os.write(arg.length);
        os.write("\r\n");
        os.write(arg);
        os.write("\r\n");
    }
    

    在说这段代码之前,我们要说下Redis的协议.

    互联网的通信是基于协议的,我们熟悉的TCP/IP协议,Dubbo通信的dubbo协议,Zookeeper的zookeeper协议,RocketMQ通信的自身应用层协议.没有协议,那么客户端和服务器就不能通信,彼此'听不懂'对方在说什么.

    那么Redis客户端和服务器之间要想彼此知道对方说的什么,那么它们之间也有通过协议通信,这就是Redis协议.

    具体协议如下

    *<number of arguments> CR LF
    $<number of bytes of argument 1> CR LF
    <argument data> CR LF
    ...
    $<number of bytes of argument N> CR LF
    <argument data> CR LF
    

    比如我们要向服务器发送SET mykey myvalue这个命令,那么转换成协议之后,具体的内容如下

    *3
    $3
    SET
    $5
    mykey
    $7
    myvalue
    

    并不是我不把它们写成一行,而是在它们彼此之间有'\r\n',也就是回车换行. 我们简单介绍下它

    *3
    

    *这个符号是固定的,那么后面这个3是什么意思呢,3表示后面有3个内容(或者说命令由3部分组成),细心的读者也看到了,整个命令中有3个符号,每个符号代表一个内容.

    $3
    SET
    

    $符号是固定的,后面的3表示后面有3个字符,因为SET就是3个字符组成的

    $5
    mykey
    

    $符号是固定的,后面的5表示后面有5个字符,因为mykey就是5个字符组成的

    $7
    myvalue
    

    $符号是固定的,后面的7表示后面有7个字符,因为myvalue就是7个字符组成的

    协议的编解码也是一个话题. Redis的协议是基于长度的,通过长度就可以准确的知道,命令的开始在哪里,结束又在哪里. 基于长度的协议有很多,比如Dubbo或者RocketMQ的协议,它们将数据发送给对方之后,对方就是通过基于长度的解码器,将数据解码出来.

    相信读者朋友应该明白了Redis的协议.那么我们只要通过Socket的输出流将这些协议内容发送给服务器就可以了,服务器基于协议,就能'读懂'我们发送给它的命令是什么了.

    我们再回到上面的那段代码.

    // #1
    os.write('*');
    os.write(args.length + 1);
    os.write("\r\n");
    
    // #2
    os.write('$');
    os.write(command.length);
    os.write("\r\n");
    os.write(command);
    os.write("\r\n");
    
    // #3
    for (final byte[] arg : args) {
        os.write('$');
        os.write(arg.length);
        os.write("\r\n");
        os.write(arg);
        os.write("\r\n");
    }
    

    相信这个时候,你再来看这段代码应该就明白了. 就是平铺直叙的将协议'翻译'成代码.

    所以说,当我们需要和服务器通信的时候,也未必是必须依赖Redis的依赖包,我们完成可以自己通过Socket与服务器直接通信. 比如下面这段简单的代码,就可以直接和Redis通信了,当然它很简单.这里只是提供给你一个思路.

    import java.io.IOException;
    import java.net.Socket;
    
    public static void connectRedis(String key, String value) throws IOException {
    
        Socket client = new Socket(host, port);
    
        // 执行 set key value命令
        StringBuilder command = new StringBuilder();
    
        String number = "*3" + CRLF;
        command.append(number);
    
        String cmd = "$3" + CRLF + "SET" + CRLF;
        command.append(cmd);
    
        cmd = "$" + key.getBytes().length + CRLF + key + CRLF;
        command.append(cmd);
    
        cmd = "$" + value.getBytes().length + CRLF + value + CRLF;
        command.append(cmd);
    
        // 向服务器发送命令
        client.getOutputStream().write(command.toString().getBytes());
    
        // 接收服务器响应
        byte[] response = new byte[1024];
        client.getInputStream().read(response);
        System.out.println(new String(response, 0, response.length));
    
    }
    

    在开篇我们也讲到,Redis分布式锁的实现有jedis和redisson两种. jedis的底层通信是直接基于Socket, 而redisson的底层与服务器通信是基于Netty.

    相关文章

      网友评论

          本文标题:jedis实现分布式锁

          本文链接:https://www.haomeiwen.com/subject/cvlxyktx.html