美文网首页
吐槽——公司前人自研的分布式任务调度跑不起来,最后我选择了zoo

吐槽——公司前人自研的分布式任务调度跑不起来,最后我选择了zoo

作者: 无醉_1866 | 来源:发表于2019-10-16 01:20 被阅读0次

背景

作为创业公司,基础组件全部自研,并且开发团队集体跑路了,留下的坑没有最大,只有更大,近日使用前人造的轮子,分布式任务调度,折腾了近两天,没跑起来,各种问题,又没文档,最后只能放弃使用,而新部署一套也不现实,然后就选择了自己造轮子

对于目前遇到的场景,有个任务,一天跑一次,一次大概2小时能跑完,想过两种方案实现分布式锁解决当前遇到的问题:

  • 使用数据库,建一张任务表,利用行锁来锁定任务,锁定成功后,将标识位设置成已占用,占用者设置成自身机器ip,锁定成功的机器通过定时任务每5秒更新一次心跳时间,任务执行完成后将标识设置成未占用,并停止心跳,其它机器通过定时任务做检查,如果15s没有更新心跳时间则认为当前任务的执行机器挂了,并立即争用
  • 使用zookeeper,为任务创建一个临时节点,如果创建成功,则表示当前机器获得了锁,定时任务在执行前先判断当前机器是否能获取到锁,获取不到则不执行

理论上两种方式都是可行的,并且各有问题,由于不想建表,所以选择了使用zk。

问题分析

初始化及正常流程

对于前面的问题,我在实现的过程中尽量保证实现的更通用一些,正常情况下,在应用创建锁时,向zk创建一个临时节点,定时任务在执行前先判断当前机器是否能获取到锁,获取不到则不执行,正常流程如下:

image

如果不出现其它异常问题,这样简单的处理将非常有效,但是出现网络异常,宕机等情况时,则没这么容易了,如果机器A挂了,机器B不获取锁,则很有可能任务不跑了

异常情况

异常情况主要表现为两种:

  1. 机器A关闭或宕机,这个时间,机器B会收到dataDelete的回调,此时机器B应该要重新创建path,作为后续任务执行的机器
  2. 机器A出现了短暂的连接中断或者机器A的会话过期,此时机器B如果重新创建path,那么机器A就失去了执行任务的权限,而此时任务可能正在执行

对于异常情况,我们保持一个原则:尽量让机器A成功创建path,获得任务的执行权,那么我们可以在机器B的handleDataDeleted回调里延迟10s后再获取锁,如果10s后机器A还没恢复,则认为机器A短时间内恢复了不了。机器A出现连接/会话过期问题时,执行流程类似于下图:

image

而获取任务与连接断开或者会话过期之间可能存在并发的情况,这个时候有两种策略:

  1. 调用方重试,如果10s内获取不到锁,则不执行,等待其它机器获取锁后执行
  2. 阻塞,直到连通zk

不管使用哪种方式都会存在一个问题,机器B上的定时任务先启动,但是获取锁失败,不执行任务,而后机器A在判断自己能否执行任务前,机器A与zk失联(可能是挂了或者会话过期),那么在机器A与zk恢复连接前,机器B可能已经等待超过10s,成功创建了锁对应的path,这个时候,任务将得不到执行,需要使用回调的方式做补尝,在回调方法里判断是否需要启动任务。

锁的状态

基于上述分析,将锁分为以下4个状态:

  • CREATED:表示刚创建,是初始状态,这个状态下,直接参与竞争,去zk上创建节点
  • OWN:表示zk的节点被自己创建成功了,当有调用过来时,如果是此状态,则再调用一次zkClient.create(path),如果成功,则表示可执行任务,避免链接断了而状态没变
  • RELEASED:表示自己没有争用成功,返回false
  • WAITING:表示已经收到了handleDataDeleted回调,在10s等待时间中,此时如果获取锁,则同样等待10s后再尝试重新获取锁的判断

增加两个回调方法:

  1. handleDataDeleted中在延迟10s后争用成功时调用,需要在此回调方法中判断是否需要启动任务,可以通过记录最近一次获取锁的时间,如果离当前时间很近(比如任务执行耗时等),则启动任务(任务没执行完,机器就出了问题,导致当前机器获得了锁,需要继续将任务跑完);或者check一下任务的输出,判断是否需要继续执行任务
  2. 在重新建立会话后,立即争用,如果失败,则触发回调接口,使用方可在此接口的实现中判断是否需要取消任务

实现方式

基于zk的分布式锁的实现,用于支持任务的执行,整理后的代码如下:

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 使用zookeeper实现分布式锁,可用于执行分布式定时任务等互斥类代码,调用{@link #tryLock()},并在任务结束后,一定要调用{@link #unlock()}释放锁
 *
 * @author gaohang
 */
public final class ZookeeperLock {
  private static final Logger logger = LoggerFactory.getLogger(ZookeeperLock.class);

  private static final ScheduledExecutorService delayExecutor = Executors.newScheduledThreadPool(1);
  private static final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1);

  /**
   * 所有锁都在此节点下
   */
  private static final String LOCK_BASE_PATH = "/onlineetl/distributeLock/instance";
  private static final Charset UTF8 = Charset.forName("UTF-8");

  private final int delayTime = 10;

  private final String lockName;
  private final String lockPath;
  private final ZkClient zkClient;

  /**
   * 当前机器的ip
   */
  private final String localhostAddr;

  /**
   * 占用锁的机器的ip
   */
  private String activeHostAddr;

  /**
   * 当前持有锁的状态
   */
  private volatile State state = State.CREATED;

  /**
   * 本地锁,防止本地多线程导致本地非串行
   */
  private final Lock localLock = new ReentrantLock();

  /**
   * 创建分布多锁
   *
   * @param lockName 锁的名字,唯一
   * @param zkClient zk客户端
   * @param callback 当有dataDeleted时,重新获取争用后调用
   * @throws UnknownHostException 获取localhost失败
   */
  ZookeeperLock(String lockName, ZkClient zkClient, LockAddrChangedCallback callback) throws UnknownHostException {
    this.zkClient = zkClient;
    if (StringUtils.isBlank(lockName)) {
      throw new IllegalArgumentException("lockName cannot be blank string");
    }
    this.lockName = lockName;
    this.lockPath = LOCK_BASE_PATH + '/' + lockName;
    final InetAddress localhost = InetAddress.getLocalHost();
    this.localhostAddr = localhost.getHostAddress();
    initLock();
    subscriptDataChange(lockName, zkClient, callback);
    zkClient.subscribeStateChanges(new IZkStateListener() {
      @Override
      public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
        logger.info("zookeeper state changed, state: {}", state);
      }

      @Override
      public void handleNewSession() throws Exception {
        if (!initLock()) {
          callbackExecutor.execute(callback::onPathLose);
        }
      }

      @Override
      public void handleSessionEstablishmentError(Throwable error) throws Exception {
        logger.error("failed to connect to zookeeper", error);
      }
    });
  }

  private void subscriptDataChange(String lockName, ZkClient zkClient, LockAddrChangedCallback callback) {
    zkClient.subscribeDataChanges(lockPath, new IZkDataListener() {

      @Override
      public void handleDataChange(String dataPath, Object data) {
        final String newOwner = new String((byte[]) data, UTF8);
        if (!isMyself(newOwner)) {
          //不是本机
          state = State.RELEASED;
          logger.info("lock [{}] released, owner is {}", lockName, newOwner);
        } else {
          //本机持有了锁
          state = State.OWN;
          logger.info("lock [{}] locked", lockName);
        }
        activeHostAddr = newOwner;
      }

      @Override
      public void handleDataDeleted(String dataPath) {
        //数据被删除有两种情况,
        // 1.持有锁的机器挂了
        // 2.网络异常或者zk出现了异常
        //因此在收到数据删除回调时,未获取锁的机器要优先让上一次获取锁的机器重新获取锁
        if (state == State.OWN && activeHostAddr != null && isMyself(activeHostAddr)) {
          //设置成released,在获取锁成功时防止有新的上锁操作
          state = State.WAITING;
          if (localLock.tryLock()) {
            //说明锁没有在使用中,释放即可,将锁的权限交给其它机器
            localLock.unlock();
            activeHostAddr = null;
            //5s后再获取锁
            delayExecutor.schedule(() -> {
              initLock();
              //对方挂了,自己争用成功
              callbackExecutor.execute(callback::onPathLocked);
            }, 5, TimeUnit.SECONDS);
          } else {
            //锁被占用,立即获取锁
            initLock();
          }
        } else {
          state = State.WAITING;
          activeHostAddr = null;
          //对方可能网络异常了,5s后再获取锁,如果对方5s没恢复,则认为对方短时间内无法恢复了
          delayExecutor.schedule(() -> {
            initLock();
            callbackExecutor.execute(callback::onPathLocked);
          }, delayTime, TimeUnit.SECONDS);
        }
      }
    });
  }

  private boolean isMyself(String activeHostAddr) {
    return StringUtils.equals(activeHostAddr, localhostAddr);
  }

  private boolean initLock() {
    try {
      zkClient.createEphemeral(lockPath, localhostAddr.getBytes(UTF8));
      activeHostAddr = localhostAddr;
      state = State.OWN;
      logger.info("lock [{}] locked", lockName);
      return true;
    } catch (ZkNodeExistsException e) {
      byte[] host = zkClient.readData(lockPath, true);
      if (host == null) {
        // 如果不存在节点,立即尝试一次
        return initLock();
      } else {
        activeHostAddr = new String(host, UTF8);
        state = State.RELEASED;
        logger.info("lock [{}] released, owner is {}", lockName, activeHostAddr);
        if (isMyself(activeHostAddr)) {
          state = State.OWN;
          return true;
        }
        return false;
      }
    } catch (ZkNoNodeException e) {
      zkClient.createPersistent(LOCK_BASE_PATH, true); // 尝试创建父节点
      return initLock();
    } catch (Throwable e) {
      logger.error("lock [{}] failed", lockName, e);
      //不让招出异常
      return false;
    }
  }

  public boolean tryLock() {
    //避免状态被修改,这里先上锁,在handleDataDeleted中,如果trylock成功,则表示没有任务在执行
    if (!localLock.tryLock()) {
      return false;
    }
    switch (state) {
      case OWN:
        if (initLock()) {
          return true;
        }
        state = State.RELEASED;
        return false;
      case RELEASED:
        localLock.unlock();
        return false;
      case CREATED:
        initLock();
        return relock();
      case WAITING:
        try {
          TimeUnit.SECONDS.sleep(delayTime);
        } catch (InterruptedException e) {
          localLock.unlock();
          throw new RuntimeException(e);
        }
        //再次尝试获取锁
        final boolean locked = relock();
        if (locked) {
          return true;
        }
        initLock();
        return relock();
    }

    //理论上不会走到这里
    initLock();
    //这里一定要Unlock,否则tryLock会重入,而unlock没有多次释放徜
    return relock();
  }

  private boolean relock() {
    localLock.unlock();
    return tryLock();
  }

  public void unlock() {
    localLock.unlock();
  }

  public String getLockName() {
    return lockName;
  }

  public String getActiveHostAddr() {
    return activeHostAddr;
  }

  @Override
  public String toString() {
    return "ZookeeperLock{" +
        "lockName='" + lockName + '\'' +
        ", lockPath='" + lockPath + '\'' +
        ", localhostAddr='" + localhostAddr + '\'' +
        ", activeHostAddr='" + activeHostAddr + '\'' +
        ", state=" + state +
        '}';
  }

  /**
   * 锁的状态,当状态是created时,则竞争,OWN状态表示持有锁,RELEASED状态表示未持有锁, waiting表示锁待争用
   */
  private enum State {
    OWN, RELEASED, CREATED, WAITING
  }
}

相关文章

网友评论

      本文标题:吐槽——公司前人自研的分布式任务调度跑不起来,最后我选择了zoo

      本文链接:https://www.haomeiwen.com/subject/vmzemctx.html