美文网首页
基于redis模拟实现选主

基于redis模拟实现选主

作者: TinyThing | 来源:发表于2019-05-16 19:26 被阅读0次

没时间写细节,直接上代码



/**
 * <p>集群模式下的切面</p>
 * @since 
 */
@Component
@Aspect
@Order(1)
@ConditionalOnExpression("${resource.sync.cluster:false}==true")    //只有当集群模式下该切面才生效
@Slf4j
public class ClusterAspect {

    private RLock redisLock;
    //由于改变量被多个线程读写,所以上读写锁,保障任务执行完成后,才能改变变量值
    private volatile boolean leader;

    private static final ReentrantReadWriteLock LEADER_READ_WRITE_LOCK = new ReentrantReadWriteLock();
    private static final ReentrantReadWriteLock.ReadLock LEADER_READ_LOCK = LEADER_READ_WRITE_LOCK.readLock();
    private static final ReentrantReadWriteLock.WriteLock LEADER_WRITE_LOCK = LEADER_READ_WRITE_LOCK.writeLock();

    private static volatile boolean LOOP_FLAG = true;

    private static final String LOCK_KEY = "CLUSTER_LEADER";

    //选举间隔,默认10000ms
    @Value("${resource.sync.cluster.elect.interval:10000}")
    private Integer electInterval;

    /**
     * 门栓,当首次获取redis锁执行后才能打开
     */
    public static final CountDownLatch LATCH = new CountDownLatch(1);

    @Autowired
    private TaskExecutor taskExecutor;

    /**
     * 利用redis模拟选主
     * 弊端: 主节点挂掉后,可能无法立即选出新主节点,20s后才有
     */
    @EventListener(ApplicationReadyEvent.class)
    @Async
    public void electLeader() {
        redisLock = SyncDistributedLockUtil.getLock(LOCK_KEY);
        //每隔10s拿一次锁,相当于10s选举一次,
        while (LOOP_FLAG) {
            //LEADER写锁
            LEADER_WRITE_LOCK.lock();
            try {
                //如果当前是主节点,放锁
                if (leader && redisLock.isHeldByCurrentThread()) {
                    redisLock.unlock();
                }
                //设置锁过期时间, 20s
                //重新获取锁,相当于选举过程
                leader = redisLock.tryLock(0L, 20L, TimeUnit.SECONDS);
            } catch (Exception e) {
                log.error("- get redis lock error!", e);
            } finally {
                LEADER_WRITE_LOCK.unlock();
            }

            //开门,放行启动同步
            if (LATCH.getCount() == 1L) {
                LATCH.countDown();
            }

            //每隔十秒放一次锁
            LockSupport.parkNanos(electInterval * 1000000L);
        }

        //项目关闭记得放锁
        if (leader && redisLock.isHeldByCurrentThread()) {
            log.debug("- project is shutting down, release the redis lock...");
            redisLock.unlock();
        }
    }

    @PreDestroy
    public void destroy() {
        endLoop();
    }


    @Around("@annotation(syncPreProcessor)")
    public Object clusterModel(ProceedingJoinPoint joinPoint, SyncPreProcessor syncPreProcessor) {

        RedisExtendTask redisExtendTask = new RedisExtendTask(redisLock);

        try {
            //等待首次获取选主执行完成
            LATCH.await();
            //判断是否为主节点
            log.info("- +++++++++++ entering cluster mode! Check whether this node is Leader: {}!...", leader);
            //LEADER变量读锁
            LEADER_READ_LOCK.lock();
            //如果不是主节点,直接退出
            if (!leader) {
                log.info("- +++++++++++ not LEADER, return...");
                return null;
            }
            //是主节点,执行任务
            log.info("- +++++++++++ this node is LEADER, begin to sync resource ...");
            //一旦任务开始执行,redis锁不能放,启动redis锁延期线程
            taskExecutor.execute(redisExtendTask);
            return joinPoint.proceed();
        } catch (Throwable throwable) {
            log.error("- +++++++++++ error when doing sync job, message: {}", throwable.getMessage());
            throw new BaseRuntimeException(throwable.getMessage());
        } finally {
            //放锁
            LEADER_READ_LOCK.unlock();
            //停止redis锁延长线程
            redisExtendTask.endTask();
            log.debug("- +++++++++++ release LEADER_READ_LOCK: {} success ...");
        }
    }


    private static void endLoop() {
        LOOP_FLAG = false;
    }


    /**
     * redis lock 延时线程
     */
    private static class RedisExtendTask implements Runnable {

        private volatile boolean flag = true;

        private RLock redisLock;

        RedisExtendTask(RLock redisLock) {
            this.redisLock = redisLock;
        }

        @Override
        public void run() {
            log.debug("- begin to extend the expire time for redis lock: {}", redisLock.getName());
            //每秒延长一下过期时间,直到任务结束
            while (flag) {
                redisLock.expire(20L, TimeUnit.SECONDS);
                if (log.isDebugEnabled()) {
                    log.debug("- redis lock remain time to live = {}ms", redisLock.remainTimeToLive());
                }
                LockSupport.parkNanos(1000000000L);
            }
            log.debug("- finish extending the expire time for redis lock: {}", redisLock.getName());
        }

        void endTask() {
            this.flag = false;
        }
    }
}

相关文章

网友评论

      本文标题:基于redis模拟实现选主

      本文链接:https://www.haomeiwen.com/subject/ypswaqtx.html