美文网首页JMCR
JMCR 线程调度

JMCR 线程调度

作者: 毒死预言家的女巫 | 来源:发表于2019-11-24 21:04 被阅读0次

简述

在上一篇文章JMCR 约束求解原理中,我们通过约束求解的方式得到了一个新的前缀。本文将探讨JMCR如何使程序按照规定的序列进行调度

提纲

  • 插桩前后比较
  • before 系列函数
  • 自定义调度的实现

系列文章:
1. JMCR 简介
2. JMCR 字节码插桩(一)
3. JMCR 字节码插桩(二)
4. JMCR 约束求解原理
5. JMCR 线程调度

一、插装前后对比

结合之前的插装和约束求解中定义的事件,JMCR 在每一个定义的事件发生之前都会通过字节码插装来调用一系列的函数... 就像这样:
插装前:

package edu.tamu.aser.tests;

import edu.tamu.aser.reex.JUnit4MCRRunner;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(JUnit4MCRRunner.class)
public class MyTest {
    static int a = 0;
    static int b = 1;

    @Test
    public void test() throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                a = 1;
                System.out.println("b : " + b);
            }
        }).start();
        Thread.sleep(10);
        if(a == 0){ //Read  a 0
            b = 2;   //Write b 2
        } else {
            System.err.println("Error! Read a != 0");
            System.exit(-1);
        }
        System.out.println(b); // Read b 2
    }
}

后:

package edu.tamu.aser.tests;

import edu.tamu.aser.reex.JUnit4MCRRunner;
import edu.tamu.aser.reex.Scheduler;
import edu.tamu.aser.runtime.RVRunTime;
import java.io.PrintStream;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(JUnit4MCRRunner.class)
public class MyTest {
    static int a;
    static int b;

    public MyTest() {
    }

    @Test
    public void test() throws InterruptedException {
        Thread var6;
        Thread var10000 = var6 = new Thread(new Runnable() {
            public void run() {
                if (this instanceof Runnable) {
                    RVRunTime.logThreadBegin();
                }

                try {
                    RVRunTime.logSleep();
                    Thread.sleep(10L);
                } catch (InterruptedException var9) {
                    var9.printStackTrace();
                }

                byte var6 = 1;
                Scheduler.beforeFieldAccess(false, "edu/tamu/aser/tests/MyTest", "a", "I");
                MyTest.a = 1;
                RVRunTime.logFieldAcc(30, (Object)null, 1, Integer.valueOf(var6), true);
                Scheduler.beforeFieldAccess(true, "java/lang/System", "out", "Ljava/io/PrintStream;");
                PrintStream var7;
                PrintStream var10000 = var7 = System.out;
                RVRunTime.logFieldAcc(31, (Object)null, 4, var7, false);
                StringBuilder var10001 = (new StringBuilder()).append("b : ");
                Scheduler.beforeFieldAccess(true, "edu/tamu/aser/tests/MyTest", "b", "I");
                int var8;
                int var10002 = var8 = MyTest.b;
                RVRunTime.logFieldAcc(34, (Object)null, 2, var8, false);
                var10000.println(var10001.append(var10002).toString());
                if (this instanceof Runnable) {
                    RVRunTime.logThreadEnd();
                }

            }
        });
        RVRunTime.logBeforeStart(5, var6);
        var10000.start();
        RVRunTime.logSleep();
        Thread.sleep(10L);
        Scheduler.beforeFieldAccess(true, "edu/tamu/aser/tests/MyTest", "a", "I");
        int var7;
        int var12 = var7 = a;
        RVRunTime.logFieldAcc(7, (Object)null, 1, var7, false);
        PrintStream var13;
        if (var12 == 0) {
            byte var8 = 2;
            Scheduler.beforeFieldAccess(false, "edu/tamu/aser/tests/MyTest", "b", "I");
            b = 2;
            RVRunTime.logFieldAcc(10, (Object)null, 2, Integer.valueOf(var8), true);
        } else {
            Scheduler.beforeFieldAccess(true, "java/lang/System", "err", "Ljava/io/PrintStream;");
            PrintStream var9;
            var13 = var9 = System.err;
            RVRunTime.logFieldAcc(11, (Object)null, 3, var9, false);
            var13.println("Error! Read a != 0");
            System.exit(-1);
        }

        Scheduler.beforeFieldAccess(true, "java/lang/System", "out", "Ljava/io/PrintStream;");
        PrintStream var10;
        var13 = var10 = System.out;
        RVRunTime.logFieldAcc(15, (Object)null, 4, var10, false);
        Scheduler.beforeFieldAccess(true, "edu/tamu/aser/tests/MyTest", "b", "I");
        int var11;
        int var10001 = var11 = b;
        RVRunTime.logFieldAcc(16, (Object)null, 2, var11, false);
        var13.println(var10001);
    }

    static {
        byte var6 = 0;
        Scheduler.beforeFieldAccess(false, "edu/tamu/aser/tests/MyTest", "a", "I");
        a = 0;
        RVRunTime.logInitialWrite(20, (Object)null, 1, Integer.valueOf(var6));
        byte var7 = 1;
        Scheduler.beforeFieldAccess(false, "edu/tamu/aser/tests/MyTest", "b", "I");
        b = 1;
        RVRunTime.logInitialWrite(22, (Object)null, 2, Integer.valueOf(var7));
    }
}

二、before 系列函数

在所有 beforeXX 系列的函数中,他们无一例外的调用了 beforeEvent 这个函数。

TIM截图20191124194204.png

事已至此,自然要去这个 beforeEvent 里一探究竟:

/**
     * Helper method called before a schedule relevant event.
     * 
     * @param eventDesc
     *            {@link EventDesc} describing the schedule relevant event.
     * @param pause
     *            whether to pause the current thread before the event.
     */
    private static void beforeEvent(EventDesc eventDesc, boolean pause) {
        ThreadInfo currentThreadInfo;
        schedulerStateLock.lock();       //保证beforeEvent操作的原子性
        try {
            //...
            currentThreadInfo = liveThreadInfos.get(Thread.currentThread());
            if(currentThreadInfo!=null)
            {
                //...
                if (pause) {
                    pausedThreadInfos.add(currentThreadInfo); // 1
                }
            }
        } finally {
            if (pause) {
                schedulerWakeupCondition.signal();
            }
            schedulerStateLock.unlock();
        }
        
        try {
            if (pause) {
                if(currentThreadInfo!=null){
                    currentThreadInfo.getPausingSemaphore().acquire(); // 2
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  1. 代码里面那些schedulerStateXXX.XXX() 函数是为了保证测试代码中的原子性。
  2. 代码标注为1的那行代码将当前线程ThreadInfo加入了pausedThreadInfo中。ThreadInfo 是对于 Thread 类的拓展,包含了线程本身、线程当前执行行数、和一个初始值为 0 的信号量,其中信号量可以通过 getPausingSemaphore() 获得,且每一个 Thread 对应一个 ThreadInfo。
  3. 代码标注为 2 的地方,调用了 ThreadInfo.getPausingSemaphore().acquire() 使调用 beforeEvent() 的线程阻塞。
  4. pause 参数是用来判断是否需要在插装点运行之前阻塞线程,即我们并不需要调度所有的节点都暂停。

点到声明 PausedThreadInfos 的地方附近

    private static Map<Thread, ThreadInfo> liveThreadInfos;
    private static SortedSet<ThreadInfo> pausedThreadInfos;
    private static Set<ThreadInfo> blockedThreadInfos;

通过查看这些 Map 在代码中的引用情况,可以得知:

  • liveThreadInfo 是程序中所有的线程
  • pausedThreadInfo 是由于下一步即将运行到某种插装点之前而人为暂停的线程
  • blockedThreadInfo 是由于程序运行而 block 掉的线程。

随后可以开启后台进程,配合之前的操作,进行调度。

三、调度实现

观察在 Scheduler 中的这段代码:

    static {
        Thread schedulerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                boolean timeout = false;
                while (true) {  
                    schedulerStateLock.lock();
                    try {
                        if (!liveThreadInfos.isEmpty()) {
                            if (liveThreadInfos.size() == blockedThreadInfos.size()) {
                                deadlockOrFinishNotifier.release();//当前存活线程数等于阻塞线程数,表示死锁或结束(0 == 0)
                            }
                            if (!pausedThreadInfos.isEmpty() && 
                                    pausedThreadInfos.size() == liveThreadInfos.size() - blockedThreadInfos.size()) {
                                //选择一个要执行的线程
                                if (pausedThreadInfos.size() > 1){
                                    System.out.println(1);
                                }
                                ThreadInfo chosenPausedThreadInfo = (ThreadInfo) choose(pausedThreadInfos, ChoiceType.THREAD_TO_SCHEDULE);
                                if(chosenPausedThreadInfo!=null)
                                {
                                    pausedThreadInfos.remove(chosenPausedThreadInfo);//在pausingthread中去除选择的线程
                                    chosenPausedThreadInfo.getPausingSemaphore().release();//并令其通行
                                }
                                //something wrong
                                //just release the lock, wait for the thread to be added to the paused thread
                            }
                            else if(timeout && !pausedThreadInfos.isEmpty())//JEFF
                            {
                                //..处理超时  
                            }
                       timeout = !schedulerWakeupCondition.await(500, Reex_TimeUnit.MILLISECONDS);
                        
                    } catch (Throwable exp) {
                        System.out.flush();
                        System.err.println("Uncaught exception in scheduler thread:");
                        exp.printStackTrace(System.err);
                        System.exit(2);
                    } finally {
                        schedulerStateLock.unlock();
                    }
                } //end while
            }
        }, "Scheduler");
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

这个后台进程会一直循环,循环中做了这么几件事。

  1. 首先判断 liveThreadInfos的大小和 blockedThreadInfos 大小是否相等,如果相等,则有可能在程序中出现了死锁,或者程序运行结束,此时释放 deadlockOrFinishNotifier,运行死锁检测程序。
  2. 如果 pausedThreadInfos 不是一个空的Map,且 liveThreadInfos 的大小等于pausedThreadInfosblockedThreadInfos的大小之和,就调用 strategy .choose() 方法从 pausedThreadInfos 中选择出一个ThreadInfo,调用ThreadInfo.getSemaphore().release() 方法。换成人话来说,就是当前存在一些我们在调度点之前调用 acquire 而阻塞的线程,且现在程序里面所有进程不是被手动阻塞在了调度点之前,就是原来代码执行导致的阻塞,这时候我们就应该去从那些被阻塞到了调度点的线程中选择一个线程,使其放行。然后这个线程运行到下一个调度点之前,继续被阻塞,这里就会执行新的一轮放行。如此往复,便实现了对于线程的可控调度。 choose 使用了策略模式,具体实现方法不展开描述。默认使用的是MCRStrategy,这里也是使用了 IoC 控制反转。

相关文章

  • JMCR 线程调度

    简述 在上一篇文章JMCR 约束求解原理中,我们通过约束求解的方式得到了一个新的前缀。本文将探讨JMCR如何使程序...

  • JMCR 简介

    本系列记录了我研究JMCR的学习笔记和一些自己的想法,JMCR 是一个多线程测试工具的实现,其源代码地址为:链接。...

  • java虚拟机读书笔记之线程调度

    java线程调度 线程调度主要有两种方式,协同式线程调度和抢占式线程调度。1、协同式: 线程的执行时间由线程本身...

  • [Java]线程和锁

    0x00 线程调度 线程调度指的是系统为线程分配CPU使用权。分为两种: 协同式线程调度线程想用CPU多久就用多久...

  • CPU调度

    CPU调度 基本概念 CPU调度在讨论普通调度概念时使用进程调度,特别指定为线程概念时使用线程调度 CPU-I/O...

  • 2018-04-03 线程基础

    线程调度 是指系统分配CPU使用权限的方式,分为协同式线程调度和抢占式线程调度 进程、线程概念 进程是应用程序的一...

  • 线程优先级和守护线程

    线程优先级: Java提供一个线程调度器来监控程序中启动后进入就绪状态的所有线程,线程调度器按照优先级决定调度哪个...

  • 并发--线程和锁

    线程调度 协同式调度 1.一个线程执行完毕之后再通知其他线程执行 抢占式调度(JAVA使用的是这种方式) 1.os...

  • 2.1 Java线程调度

    线程调度是指系统为线程分配处理器使用权的过程,主要调度方式有两种,分别是协同式线程调度(CooperativeTh...

  • 网络编程(三)

    Volley用法完全解析 从上图可以看到Volley分为三个线程,分别是主线程、缓存调度线程、和网络调度线程,首先...

网友评论

    本文标题:JMCR 线程调度

    本文链接:https://www.haomeiwen.com/subject/phtrwctx.html