動態

詳情 返回 返回

聊聊redisson的RLock的unlock - 動態 詳情

本文主要研究一下redisson的RLock的unlock

RLock

org/redisson/api/RLock.java

/**
 * Redis based implementation of {@link java.util.concurrent.locks.Lock}
 * Implements re-entrant lock.
 *
 * @author Nikita Koksharov
 *
 */
public interface RLock extends Lock, RLockAsync {

    /**
     * Returns name of object
     *
     * @return name - name of object
     */
    String getName();
    
    /**
     * Acquires the lock with defined <code>leaseTime</code>.
     * Waits if necessary until lock became available.
     *
     * Lock will be released automatically after defined <code>leaseTime</code> interval.
     *
     * @param leaseTime the maximum time to hold the lock after it's acquisition,
     *        if it hasn't already been released by invoking <code>unlock</code>.
     *        If leaseTime is -1, hold the lock until explicitly unlocked.
     * @param unit the time unit
     * @throws InterruptedException - if the thread is interrupted
     */
    void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException;

    /**
     * Tries to acquire the lock with defined <code>leaseTime</code>.
     * Waits up to defined <code>waitTime</code> if necessary until the lock became available.
     *
     * Lock will be released automatically after defined <code>leaseTime</code> interval.
     *
     * @param waitTime the maximum time to acquire the lock
     * @param leaseTime lease time
     * @param unit time unit
     * @return <code>true</code> if lock is successfully acquired,
     *          otherwise <code>false</code> if lock is already set.
     * @throws InterruptedException - if the thread is interrupted
     */
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

    /**
     * Acquires the lock with defined <code>leaseTime</code>.
     * Waits if necessary until lock became available.
     *
     * Lock will be released automatically after defined <code>leaseTime</code> interval.
     *
     * @param leaseTime the maximum time to hold the lock after it's acquisition,
     *        if it hasn't already been released by invoking <code>unlock</code>.
     *        If leaseTime is -1, hold the lock until explicitly unlocked.
     * @param unit the time unit
     *
     */
    void lock(long leaseTime, TimeUnit unit);

    /**
     * Unlocks the lock independently of its state
     *
     * @return <code>true</code> if lock existed and now unlocked
     *          otherwise <code>false</code>
     */
    boolean forceUnlock();

    /**
     * Checks if the lock locked by any thread
     *
     * @return <code>true</code> if locked otherwise <code>false</code>
     */
    boolean isLocked();

    /**
     * Checks if the lock is held by thread with defined <code>threadId</code>
     *
     * @param threadId Thread ID of locking thread
     * @return <code>true</code> if held by thread with given id
     *          otherwise <code>false</code>
     */
    boolean isHeldByThread(long threadId);

    /**
     * Checks if this lock is held by the current thread
     *
     * @return <code>true</code> if held by current thread
     * otherwise <code>false</code>
     */
    boolean isHeldByCurrentThread();

    /**
     * Number of holds on this lock by the current thread
     *
     * @return holds or <code>0</code> if this lock is not held by current thread
     */
    int getHoldCount();

    /**
     * Remaining time to live of the lock
     *
     * @return time in milliseconds
     *          -2 if the lock does not exist.
     *          -1 if the lock exists but has no associated expire.
     */
    long remainTimeToLive();
    
}
RLock接口繼承了JDK的java.util.concurrent.locks.Lock接口,同時還擴展提供了isLocked、isHeldByThread、isHeldByCurrentThread等方法

RedissonLock

org/redisson/RedissonLock.java

public class RedissonLock extends RedissonBaseLock {

    //......

    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
        
//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);
    }

    //......
}    
RedissonLock繼承了RedissonBaseLock,其unlock方法調用的是父類的unlockAsync,傳遞的參數為Thread.currentThread().getId()

unlockAsync

org/redisson/RedissonBaseLock.java

public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {

    //......

    @Override
    public RFuture<Void> unlockAsync() {
        long threadId = Thread.currentThread().getId();
        return unlockAsync(threadId);
    }

    @Override
    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<>();
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        future.onComplete((opStatus, e) -> {
            cancelExpirationRenewal(threadId);

            if (e != null) {
                result.tryFailure(e);
                return;
            }

            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }

            result.trySuccess(null);
        });

        return result;
    }

    //......
}
unlockAsync執行的是unlockInnerAsync,參數為當前thread的id,再其完成之後觸發cancelExpirationRenewal

unlockInnerAsync

org/redisson/RedissonLock.java

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }
RedissonLock提供了unlockInnerAsync方法,執行的是一段lua腳本:
if redis.call('hexists', KEYS[1], ARGV[3]) == 0 then
    return nil
end

local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1)

if counter > 0 then
    redis.call('pexpire', KEYS[1], ARGV[2])
    return 0
else
    redis.call('del', KEYS[1])
    redis.call('publish', KEYS[2], ARGV[1])
    return 1
end

return nil

先判斷當前hash結構指定getLockName(threadId)的元素是否存在,不存在直接返回nil;接着對該線程的count減1,之後判斷,若大於0則使用pexpire執行過期返回0;否則執行del命令,然後publish發送LockPubSub.UNLOCK_MESSAGE通知,返回1

isLocked

org/redisson/RedissonBaseLock.java

public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {

    //......

    @Override
    public boolean isLocked() {
        return isExists();
    }

    //......
}
isLocked方法是執行父類的isExists方法
    @Override
    public boolean isExists() {
        return get(isExistsAsync());
    }

    @Override
    public RFuture<Boolean> isExistsAsync() {
        return commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EXISTS, getRawName());
    }
通過exists命令來判斷,參數為getRawName

isHeldByCurrentThread

org/redisson/RedissonBaseLock.java

    @Override
    public boolean isHeldByCurrentThread() {
        return isHeldByThread(Thread.currentThread().getId());
    }

    @Override
    public boolean isHeldByThread(long threadId) {
        RFuture<Boolean> future = commandExecutor.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getRawName(), getLockName(threadId));
        return get(future);
    }
isHeldByCurrentThread調用的是isHeldByThread方法,傳遞的是Thread.currentThread().getId();isHeldByThread執行的是hexist命令,參數是getRawName()及getLockName(threadId)

小結

redisson的RLock接口繼承了JDK的java.util.concurrent.locks.Lock接口,同時還擴展提供了isLocked、isHeldByThread、isHeldByCurrentThread等方法。

  • 其中unlock的時候先判斷當前hash結構指定getLockName(threadId)的元素是否存在,不存在直接返回nil;接着對該線程的count減1,之後判斷,若大於0則使用pexpire執行過期返回0;否則執行del命令,然後publish發送LockPubSub.UNLOCK_MESSAGE通知,返回1;
  • isLocked通過exists命令來判斷,參數為getRawName()
  • isHeldByThread執行的是hexist命令,參數是getRawName()及getLockName(threadId)

doc

  • locks-and-synchronizers
user avatar toplist 頭像 xiaoyi_ces 頭像
點贊 2 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.