美文网首页
ReentrantLock非公平锁源码浅析

ReentrantLock非公平锁源码浅析

作者: JBryan | 来源:发表于2019-12-24 09:54 被阅读0次

源码入口

ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
reentrantLock.unlock();

一.构造方法

/** Synchronizer providing all implementation mechanics */
 private final Sync sync;
/**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

/**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

1.无参构造方法默认初始化非公平的Sync实现,Sync继承AbstractQueuedSynchronizer。AbstractQueuedSynchronizer -- 为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供了一个框架。
2.传入一个boolean值的有参构造方法,根据传入的boolean来初始化公平或非公平的Sync实现。

二.reentrantLock.lock()

lock()方法流程图


lock()流程图.jpg

1.由reentrantLock.lock()点进ReentrantLock类的lock()方法

public void lock() {
        sync.lock();
}

2.sync.lock()进入Sync内部类的lock抽象方法,找到非公平实现。


lock1.jpg
/**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

lock方法首先用CAS操作将state从0改为1:
如果成功,设置独占线程为当前线程,lock()结束。(对应流程图‘state==0’)
3.如果CAS操作失败,进入acquire(1)方法。

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

4.首先if()条件里面,进入tryAcquire(1)方法,找到非公平实现。进入nonfairTryAcquire(1)方法


lock2.jpg
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

获取当前线程,判断state == 0,如果相等,和上一步操作一样。如果不等,判断当前线程是否是独占线程。如果是,state+1,lock()结束。
如果不是,返回false。(对应流程图‘当前线程是否是独占线程’)
5.再次回到acquire(1)方法里面

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

if()判断里面,第一步tryAcquire(1)返回false以后,进入第二个条件判断里面,首先将当前线程添加到等待队列里面addWaiter(Node.EXCLUSIVE)。
6.进入addWaiter(Node.EXCLUSIVE)方法

 /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

从第五步传来的mode为Node.EXCLUSIVE,即为独占模式。首先new一个新的节点,然后获取当前队列的尾节点。这里的Node是一个双向链表,就不展开看了。
如果尾节点不为空,则将new出来的节点设为尾节点,返回新的节点。
如果为空,进行入队操作,然后再将新的节点返回。
7.addWaiter(Node.EXCLUSIVE)返回Node之后,进入第5步if()判断里面的 acquireQueued(Node, arg)方法

 /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
 /**
     * Sets head of queue to be node, thus dequeuing. Called only by
     * acquire methods.  Also nulls out unused fields for sake of GC
     * and to suppress unnecessary signals and traversals.
     *
     * @param node the node
     */
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

首先,将failed = true,然后try代码块里面,进入一个死循环。
通过 node.predecessor(),获取第六步addWaiter(Node.EXCLUSIVE)返回Node的前节点p。
如果p是头节点,进行第4步的tryAcquire(1)操作,操作成功,将当前节点设为头节点(头节点是空的)。lock()方法结束。
上面的操作失败以后,首先判断是否应该将此节点挂起,传入参数为当前节点的前节点p和当前节点node,第一次操作即为head和node。
8.进入if()判断里面的shouldParkAfterFailedAcquire(p, node) 方法,

 /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

第一次进来此方法,前节点p的waitStatus没有被初始化过,所以默认是0。
进入else代码块里面,然后将前节点p的waitStatus设为Node.SIGNAL,即-1。此方法返回false。
9.shouldParkAfterFailedAcquire(p, node) 返回false之后,for()代码块执行结束,开始进行第二次循环。

           for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }

重复进行第7步操作,然后再次进入shouldParkAfterFailedAcquire(p, node)方法,在第8步中,已经将p的waitStatus设为Node.SIGNAL了。所以直接返回true,说明此时线程应该被挂起。

        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;

10.进入parkAndCheckInterrupt()方法,执行挂起当前线程操作。

 /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

reentrantLock.lock()方法结束。

三.reentrantLock.unLock()

unLock()方法流程图


unLock()流程图.jpg

1.进入ReentrantLock的unlock()方法

 /**
     * Attempts to release this lock.
     *
     * <p>If the current thread is the holder of this lock then the hold
     * count is decremented.  If the hold count is now zero then the lock
     * is released.  If the current thread is not the holder of this
     * lock then {@link IllegalMonitorStateException} is thrown.
     *
     * @throws IllegalMonitorStateException if the current thread does not
     *         hold this lock
     */
    public void unlock() {
        sync.release(1);
    }

2.进入sync.release(1)方法

 /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

3.在if()判断里面,进入tryRelease(1)方法,找到非公平实现


lock3.jpg
      protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

获取state,然后将state-1,如果当前线程不是独占线程,抛出异常。
如果state != 0,返回false,unlock()方法结束。
如果state == 0,将独占线程设为null,然后返回true。
4.第2步中,if()判断里面的tryRelease(1)返回true以后,进入if()代码块

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

获取等待队列的头节点head,如果head不为null,且有线程在等待(ps:将线程添加到等待队列时,会将head的waitStatus设为Node.EXCLUSIVE),执行唤醒操作。
5.进入unparkSuccessor(1)方法

/**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

首先把head的waitStatus设为0,然后获取head的下一节点s。
如果s不为null,则唤醒s。
如果s为null,然后从链表尾部开始遍历链表,找到第一个waitStatus<=0的节点,然后唤醒它。

相关文章

网友评论

      本文标题:ReentrantLock非公平锁源码浅析

      本文链接:https://www.haomeiwen.com/subject/svytoctx.html