美文网首页
从多线程到分布式(七)多线程的一些使用实践

从多线程到分布式(七)多线程的一些使用实践

作者: 吟游雪人 | 来源:发表于2023-08-14 11:15 被阅读0次

    学了那么多基础理论和代码,这里来看看实际使用过程中会使用的一些常用实践:

    一.异步转同步。
    异步转同步,需要把回调转为等待。因此可以通过前面提到的锁的阻塞的方式来实现。可以通过ReentrantLock的await方法和signalAll来实现。封装以后代码如下:
    可以配合静态属性,更好的在不同方法中都可以找到对应的GuardedObject

    class GuardedObject<T>{
      // 受保护的对象
      T obj;
      final Lock lock =
        new ReentrantLock();
      final Condition done =
        lock.newCondition();
      final int timeout=2;
    
      // 保存所有 GuardedObject
      final static Map<Object, GuardedObject> gos=new ConcurrentHashMap<>();
    
      // 静态方法创建 GuardedObject
      static <K> GuardedObject
          create(K key){
        GuardedObject go=new GuardedObject();
        gos.put(key, go);
        return go;
      }
    
      static <K, T> void fireEvent(K key, T obj){
        GuardedObject go=gos.remove(key);
        if (go != null){
          go.onChanged(obj);
        }
      }
      // 获取受保护对象  
      T get(Predicate<T> p) {
        lock.lock();
        try {
          //MESA 管程推荐写法
          while(!p.test(obj)){
            done.await(timeout,
              TimeUnit.SECONDS);
          }
        }catch(InterruptedException e){
          throw new RuntimeException(e);
        }finally{
          lock.unlock();
        }
        // 返回非空的受保护对象
        return obj;
      }
      // 事件通知方法
      void onChanged(T obj) {
        lock.lock();
        try {
          this.obj = obj;
          done.signalAll();
        } finally {
          lock.unlock();
        }
      }
    }
    
    

    使用方式:

    // 处理浏览器发来的请求
    Respond handleWebReq(){
      int id= 序号生成器.get();
      // 创建一消息
      Message msg1 = new
        Message(id,"{...}");
      // 创建 GuardedObject 实例
      GuardedObject<Message> go=
        GuardedObject.create(id);  
      // 发送消息
      send(msg1);
      // 等待 MQ 消息
      Message r = go.get(
        t->t != null);  
    }
    void onMessage(Message msg){
      // 唤醒等待的线程
      GuardedObject.fireEvent(
        msg.id, msg);
    }
    

    也可以用但ConudownLatch来实现,会更简单明了一些

    public static String getNetworkResult() {
            String result = null;
            final CountDownLatch latch = new CountDownLatch(1);
    
            NetworkUtil.encodeFiled(jsonObject.toString(),  new Callback() {
                @Override
                public void onSuccess(Response data) {
                    latch.countDown();
                    result= data.toString();
                }
            });
    
            try {
                latch.await(10,TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            }
    
           return result;
    
    }
    

    二、信号量限流
    信号量可以实现的独特功能就是同时允许多个线程进入临界区,但是信号量不能做的就是同时唤醒多个线程去争抢锁,只能唤醒一个阻塞中的线程,而且信号量模型是没有Condition的概念的,即阻塞线程被醒了直接就运行了而不会去检查此时临界条件是否已经不满足了,基于此考虑信号量模型才会设计出只能让一个线程被唤醒,否则就会出现因为缺少Condition检查而带来的线程安全问题。

    class ObjPool<T, R> {
      final List<T> pool;  //需要用线程安全的vector,因为信号量支持多个线程进入临界区,执行list的add和remove方法时可能是多线程并发执行
      // 用信号量实现限流器
      final Semaphore sem;
      // 构造函数
      ObjPool(int size, T t){
        pool = new Vector<T>(){};
        for(int i=0; i<size; i++){
          pool.add(t);
        }
        sem = new Semaphore(size);
      }
      // 利用对象池的对象,调用 func
      R exec(Function<T,R> func) {
        T t = null;
        sem.acquire();
        try {
          t = pool.remove(0);
          return func.apply(t);
        } finally {
          pool.add(t);
          sem.release();
        }
      }
    }
    // 创建对象池
    ObjPool<Long, String> pool =
      new ObjPool<Long, String>(10, 2);
    // 通过对象池获取 t,之后执行  
    pool.exec(t -> {
        System.out.println(t);
        return t.toString();
    });
    
    

    我们用一个 List来保存对象实例,用 Semaphore 实现限流器。关键的代码是 ObjPool 里面的 exec() 方法,这个方法里面实现了限流的功能。在这个方法里面,我们首先调用 acquire() 方法(与之匹配的是在 finally 里面调用 release() 方法),假设对象池的大小是 10,信号量的计数器初始化为 10,那么前 10 个线程调用 acquire() 方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在 acquire() 方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0) 实现的),分配完之后会执行一个回调函数 func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过 pool.add(t) 实现的),同时调用 release() 方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。
    简言之,使用信号量,我们可以轻松地实现一个限流器,使用起来还是非常简单的。
    总结

    三、二阶段终止模式
    Java 线程在阻塞状态是无法响应任何请求的,因此想转换成终止状态的前提是唤醒线程,进入 RUNNABLE 状态,而实际上线程也可能处在休眠状态,也就是说,我们要想终止一个线程,首先要把线程的状态从休眠状态转换到 RUNNABLE 状态。如何做到呢?这个要靠 Java Thread 类提供的interrupt() 方法,它可以将休眠状态的线程转换到 RUNNABLE 状态。

    能够被中断的阻塞称为轻量级阻塞,对应的线程状态是WAITING或者TIMED_WAITING;而像synchronized 这种不能被中断的阻塞称为重量级阻塞,对应的状态是BLOCKED。
    初始线程处于NEW状态,调用start()之后开始执行,进入RUNNING或者READY状态。如果没有调用任何的阻塞函数,线程只会在RUNNING和READY之间切换,也就是系统的时间片调度。这两种状态的切换是操作系统完成的,开发者基本没有机会介入,除了可以调用yield()函数,放弃对CPU的占用。
    一旦调用了图中的任何阻塞函数,线程就会进入WAITING或者TIMED_WAITING状态,两者的区别只是前者为无限期阻塞,后者则传入了一个时间参数,阻塞一个有限的时间。如果使用了synchronized关键字或者synchronized块,则会进入BLOCKED状态。

    故而t.interrupted()的精确含义是“唤醒轻量级阻塞”,而不是字面意思“中断一个线程”。
    因为t.interrupted()相当于给线程发送了一个唤醒的信号,所以如果线程此时恰好处于WAITING或者TIMED_WAITING状态,就会抛出一个InterruptedException,并且线程被唤醒。而如果线程此时并没有被阻塞,则线程什么都不会做。


    image.png

    Thread提供了interrupt方法,用于中断线程或者查询线程是否已经被中断。每个线程都有一个布尔类型的属性,表示线程的中断状态,当中断线程时将设置这个状态。中断是一种协作机制。java中一个线程不能强制其他线程停止正在执行的操作而去执行其他的操作。当线程A中断B时,A仅仅是要求B在执行到某个可以暂停的地方停止正在执行的操作——前提是如果线程B愿意停止下来。虽然在API或者语言规范中并没有为中断定义任何特定应用级别的语义,但最常使用中断的情况就是取消某个操作。方法对中断请求的响应度越高,就越容易及时取消那些执行时间很长的操作。

    当在代码中调用了一个将抛出InterruptedException受检异常的方法时,你自己的方法也就变成了一个阻塞方法,并且必须要处理对中断的响应。对于库代码来说,有两种基本选择:传递InterruptedException。传递InterruptedException的方法包括,根本不捕获该异常,或者捕获该异常,然后在执行某种简单的清理工作后再次抛出这个异常。恢复中断。有时候不能抛出InterruptedException,例如当代码是Runnable的一部分时。在这些情况下,必须捕获InterruptedException,并通过调用当前线程上的interrupt方法恢复中断状态,这样在调用栈中更高层的代码将看到引发了一个中断

    我们没有办法保证第三方类库正确处理了线程的中断异常,例如第三方类库在捕获到 Thread.sleep() 方法抛出的中断异常后,没有重新设置线程的中断状态,那么就会导致线程不能够正常终止。所以强烈建议你设置自己的线程终止标志位,例如在下面的代码中,使用 isTerminated 作为线程终止标志位,此时无论是否正确处理了线程的中断异常,都不会影响线程优雅地终止。

    
    
    class Proxy {
      // 线程终止标志位
      volatile boolean terminated = false;  //保证可见性
      boolean started = false;
      // 采集线程
      Thread rptThread;
      // 启动采集功能
      synchronized void start(){
        // 不允许同时启动多个采集线程
        if (started) {
          return;
        }
        started = true;
        terminated = false;
        rptThread = new Thread(()->{
          while (!terminated){
            // 省略采集、回传实现
            report();
            // 每隔两秒钟采集、回传一次数据
            try {
              Thread.sleep(2000);
            } catch (InterruptedException e){
              // 重新设置线程中断状态
              Thread.currentThread().interrupt();
            }
          }
          // 执行到此处说明线程马上终止
          started = false;
        });
        rptThread.start();
      }
      // 终止采集功能
      synchronized void stop(){
        // 设置中断标志位
        terminated = true;
        // 中断线程 rptThread
        rptThread.interrupt();
      }
    }
    

    两阶段终止模式是一种应用很广泛的并发设计模式,在 Java 语言中使用两阶段终止模式来优雅地终止线程,需要注意两个关键点:
    1.是仅检查终止标志位是不够的,因为线程的状态可能处于休眠态;
    2.仅检查线程的中断状态也是不够的,因为我们依赖的第三方类库很可能没有正确处理中断异常。

    相关文章

      网友评论

          本文标题:从多线程到分布式(七)多线程的一些使用实践

          本文链接:https://www.haomeiwen.com/subject/jlbfmdtx.html