美文网首页
4-Java并发编程基础

4-Java并发编程基础

作者: 加夕 | 来源:发表于2018-11-17 14:30 被阅读0次

    1.线程简介

    ①什么是线程

    现代操作系统调度的最小单元是线程,也叫轻量级进程,在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等熟悉,并且能够访问共享的内存变量。

    一个Java程序从main方法开始执行,然后按照既定的代码逻辑执行,看似没有其他线程参与,但实际上Java天生就是多线程程序,因为执行main方法的是一个名称为main的线程。

    public class MultiThread {
        public static void main(String[] args) {
            //获取Java线程管理MXBean
            ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
            //不需要获取同步的monitor和synchronized信息,仅获取想象成和线程堆栈信息
            ThreadInfo[] threadInfos = threadMxBean.dumpAllThreads(false, false);
            //遍历线程信息,仅打印线程ID和线程名称信息
            for (ThreadInfo threadInfo :threadInfos) {
                System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo.getThreadName());
            }
        }
    }
    

    输出:

    [6] Monitor Ctrl-Break
    [5] Attach Listener
    [4] Signal Dispatcher // 分发处理发送给JVM信号的线程
    [3] Finalizer     // 调用对象finalize方法的线程
    [2] Reference Handler // 清除Reference的线程
    [1] main          // main线程,用户程序入口
    

    ②为什么要使用多线程

    使用多线程的原因主要有一下几点:

    • 更多的处理器核心。一个线程在一个时刻只能运行在一个处理器核心上。
    • 更快的响应时间。在复杂业务逻辑的代码中,可以使用多线程技术,将数据一致性不强的操作派发给其他线程处理(也可以使用消息队列),响应用户请求的线程能够尽可能快地处理完成,缩短响应时间。
    • 更好的编程模型。Java为多线程编程使开发人员能够更加专注于问题的解决,即为所遇到的问题建立合适的模型,而不是绞尽脑汁考虑如何将其多线程化。

    ③线程优先级

    在Java线程中,通过一个整型成员变量priority来控制优先级,优先级范围从1-10,默认优先级是5,优先级高的线程分配时间片的数量要多于优先级低的线程。设置线程优先级时,针对频繁阻塞(休眠或I/O操作)的线程需要设置较高的优先级,而偏重计算(需要较多CPU时间或者偏运算)的线程则设置较低的优先级,确保处理器不会被独占。

    注意:线程优先级不能作为程序正确性的依赖,因为操作系统可以完全不用理会Java线程对于优先级的设定。

    public class Priority {
        private static volatile boolean notStart = true;
        private static volatile boolean notEnd = true;
        public static void main(String[] args) throws InterruptedException {
            List<Job> jobs = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                int priority = i < 5 ? Thread.MIN_PRIORITY : Thread.MAX_PRIORITY;
                Job job = new Job(priority);
                jobs.add(job);
                Thread thread = new Thread(job, "Thread:" + i);
                thread.setPriority(priority);
                thread.start();
            }
            notStart = false;
            TimeUnit.SECONDS.sleep(10);
            notEnd = false;
            for (Job job : jobs) {
                System.out.println("Job Priority : " + job.priority + ", Count : " + job.jobCount);
            }
        }
        static class Job implements Runnable {
            private int priority;
            private long jobCount;
            public Job(int priority) {
                this.priority = priority;
            }
            @Override
            public void run() {
                while (notStart) {
                    Thread.yield();
                }
                while (notEnd) {
                    Thread.yield();//让出cpu的占有权,但让出的时间不确定 不一定礼让的线程后执行完
                    jobCount++;
                }
            }
        }
    }
    

    输出:

    Job Priority : 1, Count : 10651786
    Job Priority : 1, Count : 10489132
    Job Priority : 1, Count : 10342027
    Job Priority : 1, Count : 10226463
    Job Priority : 1, Count : 10522124
    Job Priority : 10, Count : 10612194
    Job Priority : 10, Count : 10478780
    Job Priority : 10, Count : 10652992
    Job Priority : 10, Count : 10396247
    Job Priority : 10, Count : 10548389
    

    从输出可以看到线程优先级没有生效,计数的结果非常相近,没有明显差距。

    ④线程的状态

    public class ThreadState {
        public static void main(String[] args) {
            new Thread(new TimeWaiting(), "TimeWaitingThread").start();
            new Thread(new Waiting(), "WaitingThread").start();
            new Thread(new Blocked(), "BlockedThread-1").start();
            new Thread(new Blocked(), "BlockedThread-2").start();
        }
        //该线程不断地进行睡眠
        static class TimeWaiting implements Runnable {
            @Override
            public void run() {
                while (true) {
                    SleepUtils.second(10);
                }
            }
        }
        //该线程在Waiting.class实例上等待
        static class Waiting implements Runnable {
            @Override
            public void run() {
                while (true) {
                    synchronized (Waiting.class) {
                        try {
                            Waiting.class.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
        //该线程在Blocked.class实例上加锁后,不会释放该锁
        static class Blocked implements Runnable {
            @Override
            public void run() {
                synchronized (Blocked.class) {
                    while (true) {
                        SleepUtils.second(10);
                    }
                }
            }
        }
    }
    
    public class SleepUtils {
        public static void second(long seconds) {
            try {
                TimeUnit.SECONDS.sleep(seconds);
            } catch (InterruptedException e) {
            }
        }
        public static void mill(long seconds) {
            try {
                TimeUnit.MILLISECONDS.sleep(seconds);
            } catch (InterruptedException e) {
            }
        }
    }
    

    命令行输入 jps,输出:

    1648 Launcher
    7480 Launcher
    19788 Jps
    19948
    20412 RemoteMavenServer
    22252 ThreadState
    

    命令行输入jstack 22252,输出:

    //BlockedThread-2线程阻塞在获取Blocked.class实例的锁上
    "BlockedThread-2" #15 prio=5 os_prio=0 tid=0x000000001f028800 nid=0xab8 waiting for monitor entry [0x00000000200ef000]
        java.lang.Thread.State: BLOCKED (on object monitor)
    //BlockedThread-1线程获取到了Blocked.class的锁
    "BlockedThread-1" #14 prio=5 os_prio=0 tid=0x000000001f028000 nid=0x688 waiting on condition [0x000000001ffee000]
        java.lang.Thread.State: TIMED_WAITING (sleeping)
    //WaitingThread线程在Waiting实例上等待
    "WaitingThread" #13 prio=5 os_prio=0 tid=0x000000001f018800 nid=0x3178 in Object.wait() [0x000000001feee000]
        java.lang.Thread.State: WAITING (on object monitor)
    //TimeWaitingThread线程处于超时等待
    "TimeWaitingThread" #12 prio=5 os_prio=0 tid=0x000000001f00a800 nid=0x56a0 waiting on condition [0x000000001fdef000]
        java.lang.Thread.State: TIMED_WAITING (sleeping)
    

    ⑤Daemon线程

    Daemon线程是一种支持型线程,因为它主要被用作程序中后台调度以及支持性工作。当一个Java虚拟机中不存在非Daemon线程的时候,将会退出。通过Thread.setDaemon(true)将线程设置为Daemon线程(要做启动线程之前设置)。

    Java虚拟机退出时Daemon线程中的finally块并不一定会执行。构建Daemon线程时,不能依靠finally中的内容来确保执行关闭或清理资源的逻辑。

    public class Daemon {
        public static void main(String[] args) {
            Thread thread = new Thread(new DaemonRunner(), "DaemonRunner");
            thread.setDaemon(true);
            thread.start();
        }
        static class DaemonRunner implements Runnable {
            @Override
            public void run() {
                try {
                    SleepUtils.second(10);
                } finally {
                    //这里并不一定会被打印出来
                    System.out.println("DaemonThread finally run");
                }
            }
        }
    }
    

    2.启动和终止线程

    ①构造线程

    Thread.java中线程初始化部分:

    private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc) {
        if (name == null) {
            throw new NullPointerException("name cannot be null");
        }
        this.name = name.toCharArray();
        //当前线程就是该线程的父线程
        Thread parent = currentThread();
        SecurityManager security = System.getSecurityManager();
        if (g == null) {
            if (security != null) {
                g = security.getThreadGroup();
            }
            if (g == null) {
                g = parent.getThreadGroup();
            }
        }
        g.checkAccess();
        if (security != null) {
            if (isCCLOverridden(getClass())) {
                security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
            }
        }
        g.addUnstarted();
        this.group = g;
        //将daemon、priority属性设置为父线程的对应属性
        this.daemon = parent.isDaemon();
        this.priority = parent.getPriority();
        if (security == null || isCCLOverridden(parent.getClass()))
            this.contextClassLoader = parent.getContextClassLoader();
        else
            this.contextClassLoader = parent.contextClassLoader;
        this.inheritedAccessControlContext =
                acc != null ? acc : AccessController.getContext();
        this.target = target;
        setPriority(priority);
        //将父线程的inheritableThreadLocals复制过来
        if (parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals =
                    ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        this.stackSize = stackSize;
        //分配一个线程Id
        tid = nextThreadID();
    }
    

    一个新构造的线程对象是由其parent线程来进行空间分配的,而child线程继承了parent是否为Daemon、优先级和加载资源的contextClassLoader以及可继承的ThreadLocal,同时还会分配一个唯一的ID来标识这个child线程。

    ②启动线程

    线程对象在初始化完成之后,调用start方法就可以启动这个线程。

    线程start方法的含义是:当前线程(即parent线程)同步告知Java虚拟机,只要线程规划器空闲,应立即启动调用start方法的线程。

    ③理解中断

    中断可以理解为线程的一个标识位属性,它表示一个运行中的线程是否被其他线程进行了中断操作。中断操作好比其他线程对该线程打了个招呼,其他线程通过调用该线程的interrupt方法对其进行中断操作。

    Thread.interrupted对当前线程的中断标识进行复位。

    isInterrupted方法判断是否被中断,如果该线程已处于终结状态,即使该线程被中断过,该方法也返回false。许多抛出InterruptedException的方法(例如Thread.sleep(long millis)方法),在抛出之前,Java虚拟机会先将线程的中断标识位清除,然后抛出InterruptedException。

    public class Interrupted {
        public static void main(String[] args) {
            //sleepThread不停的尝试睡眠
            Thread sleepThread = new Thread(new SleepRunner(), "SleepRunner");
            sleepThread.setDaemon(true);
            //busyThread不停的运行
            Thread busyThread = new Thread(new BusyRunner(), "BusyRunner");
            busyThread.setDaemon(true);
    
            sleepThread.start();
            busyThread.start();
            //休眠5秒,让sleepThread和busyThread充分运行
            SleepUtils.second(5);
            sleepThread.interrupt();
            busyThread.interrupt();
            System.out.println("sleepThread interrupted is " + sleepThread.isInterrupted());
            System.out.println("busyThread interrupted is " + busyThread.isInterrupted());
            //防止sleepThread和busyThread立刻退出
            SleepUtils.second(2);
        }
        static class SleepRunner implements Runnable {
            @Override
            public void run() {
                while (true) {
                    SleepUtils.second(10);//sleep会清除掉中断标识
                }
            }
        }
        static class BusyRunner implements Runnable {
            @Override
            public void run() {
                while (true) {
                }
            }
        }
    }
    

    输出:

    sleepThread interrupted is false
    busyThread interrupted is true
    

    ④过期的suspend()、resume()、stop()

    这些API是过期的,不建议使用。不建议使用的原因主要有:

    suspend方法(暂停),在调用后,线程不会释放已经占有的资源(比如锁),容易引发死锁问题。

    stop方法(停止),在终结一个线程时不会保证线程的资源正常释放,通常是没有给予线程完成资源释放工作的机会,因此会导致程序可能工作在不正确状态下。

    public class Deprecated {
        public static void main(String[] args) {
            DateFormat format = new SimpleDateFormat("HH:mm:ss");
            Thread printThread = new Thread(new Runner(), "PrintThread");
            printThread.setDaemon(true);
            printThread.start();
            SleepUtils.second(3);
            //将PrintThread进行暂停,输出内容工作停止
            printThread.suspend();
            System.out.println("main suspend PrintThread at " + format.format(new Date()));
            SleepUtils.second(3);
            //将PrintThread进行恢复,输出内容继续
            printThread.resume();
            System.out.println("main resume PrintThread at " + format.format(new Date()));
            SleepUtils.second(3);
            //将PrintThread进行停止,输出内容停止
            printThread.stop();
            System.out.println("main stop PrintThread at " + format.format(new Date()));
            SleepUtils.second(3);
        }
        static class Runner implements Runnable {
            @Override
            public void run() {
                DateFormat format = new SimpleDateFormat("HH:mm:ss");
                while (true) {
                    System.out.println(Thread.currentThread().getName() + " Run at " + format.format(new Date()));
                    SleepUtils.second(1);
                }
            }
        }
    }
    

    输出:

    PrintThread Run at 18:59:27
    PrintThread Run at 18:59:28
    PrintThread Run at 18:59:29
    main suspend PrintThread at 18:59:30
    main resume PrintThread at 18:59:33
    PrintThread Run at 18:59:33
    PrintThread Run at 18:59:34
    PrintThread Run at 18:59:35
    main stop PrintThread at 18:59:36
    

    ⑤安全地终止线程

    中断状态是线程的一个标识位,而中断操作是一种简便的线程间交互方式,这种交互方式最适合用来取消或停止任务。

    除了中断,还可以用一个Boolean变量来控制是否需要停止任务并终止该线程。

    public class Shutdown {
        public static void main(String[] args) {
            Runner one = new Runner();
            Thread countThread = new Thread(one, "CountThread");
            countThread.start();
            //睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知中断而结束
            SleepUtils.second(1);
            countThread.interrupt();
    
            Runner two = new Runner();
            countThread = new Thread(two, "CountThread");
            countThread.start();
            //睡眠1秒,main线程对Runner two进行取消,使CountThread能够感知on为false而结束
            SleepUtils.second(1);
            two.cancel();
        }
        private static class Runner implements Runnable {
            private long i;
            private volatile boolean on = true;
            @Override
            public void run() {
                while (on && !Thread.currentThread().isInterrupted()) {
                    i++;
                }
                System.out.println("Count i = " + i);
            }
            public void cancel() {
                on = false;
            }
        }
    }
    

    输出:

    Count i = 537076551
    Count i = 480465739
    

    3.线程间通信

    ①volatile和synchronized关键字

    volatile:保证所有线程对变量访问的可见性。

    synchronized:只能有一个线程处于同步方法或同步块中。

    public class Synchronized {
        public static void main(String[] args) {
            //对Synchronized.class对象进行加锁
            synchronized (Synchronized.class) {
            }
            //静态同步方法,对Synchronized.class对象进行加锁
            m();
        }
        public static synchronized void m() {
        }
    }
    

    在Synchronized.class同级目录执行 javap -v Synchronized.class

    public static void main(java.lang.String[]);
        descriptor: ([Ljava/lang/String;)V
        flags: ACC_PUBLIC, ACC_STATIC
        Code:
          stack=2, locals=3, args_size=1
          0: ldc           #2                  // class net/chelaile/thread/Synchronized
          4: monitorenter       //监视器进入,获取锁
          6: monitorexit        //监视器退出,释放锁
          15: invokestatic  #3                  // Method m:()V
          18: return
    public static synchronized void m();
        descriptor: ()V
        flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
        Code:
          stack=0, locals=0, args_size=0
          0: return
    

    同步块的实现使用了monitorenter和monitorexit指令,同步方法则依靠方法修饰符上的ACC_SYNCHRONIZED

    上图可以看到,任意线程对Object(由synchronized保护)的访问,首先要获得Object的监视器。获取失败,线程状态变为BLOCKED。当之前获得了锁的线程释放了锁,该线程重新尝试对监视器的获取。

    ②等待/通知机制

    一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作。前者是生产者,后者是消费者。

    public class WaitNotify {
        static boolean flag = true;
        static Object lock = new Object();
        public static void main(String[] args) {
            Thread waitThread = new Thread(new Wait(), "WaitThread");
            waitThread.start();
            SleepUtils.second(1);
            Thread notifyThread = new Thread(new Notify(), "NotifyThread");
            notifyThread.start();
        }
        static class Wait implements Runnable {
            @Override
            public void run() {
                //加锁,拥有lock的Monitor
                synchronized (lock) {
                    //当条件不满足时,继续wait,同时释放了lock的锁
                    while (flag) {
                        try {
                            System.out.println(Thread.currentThread() + " flag is true. wait @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                            lock.wait();
                        } catch (InterruptedException e) {
                        }
                    }
                    //条件满足时,完成工作
                    System.out.println(Thread.currentThread() + " flag is false. running @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                }
            }
        }
        static class Notify implements Runnable {
            @Override
            public void run() {
                //加锁,拥有lock的Monitor
                synchronized (lock) {
                    //获取lock的锁,然后进行通知,通知时不会释放lock的锁
                    //直到当前线程释放了lock后,WaitThread才能从wait方法中返回
                    System.out.println(Thread.currentThread() + " hold lock. notify @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                    lock.notifyAll();
                    flag = false;
                    SleepUtils.second(5);
                }
                //再次加锁
                synchronized (lock) {
                    System.out.println(Thread.currentThread() + " hold lock again. sleep @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                    SleepUtils.second(5);
                }
            }
        }
    }
    

    输出:

    Thread[WaitThread,5,main] flag is true. wait @ 19:24:43
    Thread[NotifyThread,5,main] hold lock. notify @ 19:24:44
    Thread[NotifyThread,5,main] hold lock again. sleep @ 19:24:49
    Thread[WaitThread,5,main] flag is false. running @ 19:24:54
    

    第3行和第4行输出顺序可能会互换。

    调用wait()、notify()、notifyAll()时需要注意以下细节:

    • 使用wait()、notify()、notifyAll()时需要先对调用对象加锁。
    • 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。
    • notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifyAll()的线程释放锁之后,等待线程才有机会从wait()返回。
    • notify()或notifyAll()将等待队列中的等待线程从等待队列移动到同步队列,被移动的线程状态由WAITING变为BLOCKED。notify()移动一个线程,notifyAll()移动所有。
    • 从wait()方法返回的前提是获得了调用对象的锁。

    等待/通知机制依托于同步机制,其目的是确保等待线程从wait()方法返回时能够感知到通知线程对变量做出的修改。

    ③等待/通知的经典范式

    等待/通知的经典范式分为两部分,分别针对等待方(消费者)和通知方(生产者)。

    线程终止时,会调用线程自身的notifyAll()方法。

    等待方遵循如下原则:

    • 获取对象的锁。
    • 如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。
    • 条件满足则执行对应的逻辑。
    伪代码:
    synchronized(对象) {
      while(条件不满足) {
        对象.wait();
      }
      对应的处理逻辑
    }
    

    通知方遵循如下原则:

    • 获得对象的锁。
    • 改变条件。
    • 通知所有等待在对象上的线程。
    伪代码:
    synchronized(对象) {
      改变条件
      对象.notifyALL();
    }
    

    ④管道输入/输出流

    管道输入/输出流和普通的文件输入/输出流或网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。

    包括4种具体实现:PipedOutputStream、PipedInputStream、PipedReader、PipedWriter

    public class Piped {
        public static void main(String[] args) throws IOException {
            PipedWriter out = new PipedWriter();
            PipedReader in = new PipedReader();
            //将输出流和输入流进行连接,否则在使用时会抛出IOException
            out.connect(in);
            Thread printThread = new Thread(new Print(in), "PrintThread");
            printThread.start();
            int receive;
            try {
                while ( (receive = System.in.read()) != -1) {
                    out.write(receive);
                }
            } finally {
                out.close();
            }
        }
        static class Print implements Runnable {
            private PipedReader in;
            public Print(PipedReader in) {
                this.in = in;
            }
            @Override
            public void run() {
                int receive;
                try {
                    while ( (receive = in.read()) != -1 ) {
                        System.out.print( (char) receive);
                    }
                } catch (IOException e) {
                }
            }
        }
    }
    

    ⑤Thread.join()的使用

    如果一个线程A执行了thread.join()语句,其含义是:当前线程A等待thread线程终止之后才从thread.join()返回。

    join(long millis)和join(long millis, int nanos)两个具备超时特性的方法。这个超时方法标识,如果线程thread在给定的超时时间里没有终止,那么将会从该超时方法中返回。

    public class Join {
        public static void main(String[] args) {
            Thread previous = Thread.currentThread();
            for (int i = 0; i < 10; i++) {
                //每个线程拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回
                Thread thread = new Thread(new Domino(previous), String.valueOf(i));
                thread.start();
                previous = thread;
            }
            SleepUtils.second(5);
            System.out.println(Thread.currentThread().getName() + " terminate.");
        }
        static class Domino implements Runnable {
            private Thread thread;
            public Domino(Thread thread) {
                this.thread = thread;
            }
            @Override
            public void run() {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread().getName() + " terminate.");
            }
        }
    }
    

    输出:

    main terminate.
    0 terminate.
    1 terminate.
    2 terminate.
    3 terminate.
    4 terminate.
    5 terminate.
    6 terminate.
    7 terminate.
    8 terminate.
    9 terminate.
    

    JDK中Thread.join()方法的源码(为方便理解,进行了部分调整)

    public final synchronized void join() throws InterruptedException {
        //条件不满足,继续等待
        while (isAlive()) {
          wait(0);
         }
         //条件符合,方法返回
    }
    

    ⑥ThreadLocal的使用

    public class Profiler {
        //第一次get()方法调用时会进行初始化(如果set方法没有调用),每个线程会调用一次
        private static final ThreadLocal<Long> TIME_THREADLOCAL = ThreadLocal.withInitial(System::currentTimeMillis);
        public static final void begin() {
            TIME_THREADLOCAL.set(System.currentTimeMillis());
        }
        public static final long end() {
            return System.currentTimeMillis() - TIME_THREADLOCAL.get();
        }
        public static void main(String[] args) {
            Profiler.begin();
            SleepUtils.second(1);
            System.out.println("Cost: " + Profiler.end() + " mills");
        }
    }
    

    输出:

    Cost: 1002 mills
    

    在AOP中,可以在方法调用前的切入点执行begin方法,在方法调用后的切入点执行end方法。

    4.线程应用实例

    ①等待超时模式

    等待超时模式伪代码:

    //对当前对象加锁
    public synchronized Object get(long mills) throws InterruptedException {
        long future =  System.currentTimeMillis() + mills;
        long remaining = mills;
        //当超时大于0并且result返回值不满足要求
        while ((result == null) && remaining > 0) {
            wait(remaining);
            remaining = future - System.currentTimeMillis();
        }
        return result;
    }
    

    ②一个简单的数据库连接池示例

    public class ConnectionPool {
        private LinkedList<Connection> pool = new LinkedList<>();
        public ConnectionPool(int initialSize) {
            if (initialSize > 0) {
                for (int i = 0; i < initialSize; i++) {
                    pool.addLast(ConnectionDriver.createConnection());
                }
            }
        }
        public void releaseConnection(Connection connection) {
            if (connection != null) {
                synchronized (pool) {
                    //连接释放后需要进行通知,这样其他消费者能够感知到连接池中已经归还了一个连接
                    pool.addLast(connection);
                    pool.notifyAll();
                }
            }
        }
        //在mills内无法获取到连接,将会返回null
        public Connection fetchConnection(long mills) throws InterruptedException {
            synchronized (pool) {
                //完全超时
                if (mills <= 0) {
                    while (pool.isEmpty()) {
                        pool.wait();
                    }
                    return pool.removeFirst();
                }
    
                long future = System.currentTimeMillis() + mills;
                long remaining = mills;
                while (pool.isEmpty() && remaining > 0) {
                    pool.wait(remaining);
                    remaining = future - System.currentTimeMillis();
                }
                Connection result = null;
                if (!pool.isEmpty()) {
                    result = pool.removeFirst();
                }
                return result;
            }
        }
    }
    

    java.sql.Connection是一个接口,最终实现由数据库驱动提供方来实现,考虑到只是个示例,我们通过动态代理构造了一个Connection,该Connection的代理实现仅仅是在commit方法调用时休眠100毫秒。

    public class ConnectionDriver {
        static class ConnectionHandler implements InvocationHandler {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                if (method.getName().equals("commit")) {
                    SleepUtils.mill(100);
                }
                return null;
            }
        }
        //创建一个Connection的代理,在commit时休眠100毫秒
        public static final Connection createConnection() {
            return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),
                    new Class[] {Connection.class}, new ConnectionHandler());
        }
    }
    
    public class ConnectionPoolTest {
        static ConnectionPool pool = new ConnectionPool(10);
        //保证所有ConnectionRunner能够同时开始
        static CountDownLatch start = new CountDownLatch(1);
        //main线程将会等待所有ConnectionRunner结束后才能继续执行
        static CountDownLatch end;
        public static void main(String[] args) throws InterruptedException {
            //线程数量,可以修改线程数量进行观察
            int threadCount = 10;
            end = new CountDownLatch(threadCount);
            int count =20;
            AtomicInteger got = new AtomicInteger();
            AtomicInteger notGot = new AtomicInteger();
            for (int i = 0; i < threadCount; i++) {
                Thread thread = new Thread(new ConnectionRunner(count, got, notGot), "ConnectionRunnerThread");
                thread.start();
            }
            start.countDown();
            end.await();
            System.out.println("total invoke: " + (threadCount*count));
            System.out.println("got connection: " + got);
            System.out.println("not got connection: " + notGot);
        }
        static class ConnectionRunner implements Runnable {
            int count;
            AtomicInteger got;
            AtomicInteger notGot;
            public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
                this.count = count;
                this.got = got;
                this.notGot = notGot;
            }
            @Override
            public void run() {
                try {
                    start.await();
                } catch (InterruptedException e) {
                }
                while (count > 0) {
                    try {
                        //从线程池中获取连接,如果1000ms内无法获取到,将会返回null
                        //分别统计连接获取的数量got和未获取到的数量notGot
                        Connection connection = pool.fetchConnection(1000);
                        if (connection != null) {
                            try {
                                connection.createStatement();
                                connection.commit();
                            } finally {
                                pool.releaseConnection(connection);
                                got.incrementAndGet();
                            }
                        } else {
                            notGot.incrementAndGet();
                        }
                    } catch (Exception ex) {
                    } finally {
                        count--;
                    }
                }
                end.countDown();
            }
        }
    }
    

    上述示例中使用了CountDownLatch来确保ConnectionRunnerThread能够同时开始执行,并且在全部结束之后,才使main线程从等待状态中返回。

    ③线程池技术及其示例

    线程池技术预先创建了若干数量的线程,并且不能由用户直接对线程的创建进行控制,在这个前提下重复使用固定或较为固定数目的线程来完成任务的执行。这样做的好处是,一方面消除了频繁创建和消亡线程的系统资源开销,另一方面面对过量任务的提交能够平缓的劣化。

    下面是一个简单线程池的示例:

    public interface ThreadPool<Job extends Runnable> {
        //执行一个Job,这个Job需要实现Runnable
        void execute(Job job);
        //关闭线程池
        void shutdown();
        //增加工作者线程
        void addWorkers(int num);
        //减少工作者线程
        void removeWorker(int num);
        //得到正在等待执行的任务数量
        int getJobSize();
    }
    
    public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
        //线程池最大限制数
        private static final int MAX_WORKER_NUMBERS = 10;
        //线程池默认的数量
        private static final int DEFAULT_WORKER_NUMBERS = 5;
        //线程池最小的数量
        private static final int MIN_WORDER_NUMBERS = 1;
        //这是一个工作列表,将会向里面插入工作
        private final LinkedList<Job> jobs = new LinkedList<>();
        //工作者列表
        private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
        //工作者线程的数量
        private int workerNum = DEFAULT_WORKER_NUMBERS;
        //线程编号生成
        private AtomicLong threadNum = new AtomicLong();
        public DefaultThreadPool() {
            initializeWorders(DEFAULT_WORKER_NUMBERS);
        }
        public DefaultThreadPool(int num) {
            workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORDER_NUMBERS ? MIN_WORDER_NUMBERS : num;
            initializeWorders(workerNum);
        }
        @Override
        public void execute(Job job) {
            if (job != null) {
                //添加一个工作,然后进行通知
                synchronized (jobs) {
                    jobs.addLast(job);
                    jobs.notify();
                }
            }
        }
        @Override
        public void shutdown() {
            for (Worker worker : workers) {
                worker.shutdown();
            }
        }
        @Override
        public void addWorkers(int num) {
            synchronized (jobs) {
                //限制新增的Worker数量不能超过最大值
                if (num + this.workerNum > MAX_WORKER_NUMBERS) {
                    num = MAX_WORKER_NUMBERS - this.workerNum;
                }
                initializeWorders(num);
                this.workerNum += num;
            }
        }
        @Override
        public void removeWorker(int num) {
            synchronized (jobs) {
                if (num >= this.workerNum) {
                    throw new IllegalArgumentException("beyond workNum");
                }
                //按照给定的数量停止Worker
                int count = 0;
                while (count < num) {
                    Worker worker = workers.get(count);
                    if (workers.remove(worker)) {
                        worker.shutdown();
                        count++;
                    }
                }
                this.workerNum -= count;
            }
        }
        @Override
        public int getJobSize() {
            return jobs.size();
        }
        //初始化线程工作者
        private void initializeWorders(int num) {
            for (int i = 0; i < num; i++) {
                Worker worker = new Worker();
                workers.add(worker);
                Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
                thread.start();
            }
        }
        //工作者,负责消费任务
        class Worker implements Runnable {
            //是否工作
            private volatile boolean running = true;
            @Override
            public void run() {
                while (running) {
                    Job job;
                    synchronized (jobs) {
                        //如果工作者列表是空的,那么就wait
                        while (jobs.isEmpty()) {
                            try {
                                jobs.wait();
                            } catch (InterruptedException e) {
                                //感知到外部对WorkerThread的中断操作,返回
                                Thread.currentThread().interrupt();
                                return;
                            }
                        }
                        //取出一个Job
                        job = jobs.removeFirst();
                    }
                    if (job != null) {
                        try {
                            job.run();
                        } catch (Exception e) {
                            //忽略Job执行中的Exception
                        }
                    }
                }
            }
            public void shutdown() {
                running = false;
            }
        }
    }
    

    可以看到,线程池的本质就是使用了一个线程安全的工作队列连接工作者线程和客户端线程,客户端线程将任务放入工作队列后便返回,而工作者线程则不断地从工作队列上取出工作并执行。当工作队列为空时,所有的工作者线程均等待在工作队列上,当客户端提交了一个任务之后会通知任意一个工作者线程,随着大量任务被提交,更多的工作者线程会被唤醒。

    ④一个基于线程池技术的简单web服务器

    public class SimpleHttpServer {
        //处理httpRequest的线程
        static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<>(1);
        // SimpleHttpServer的根路径
        static String basePath;
        static ServerSocket serverSocket;
        //服务监听端口
        static int port = 8080;
        public static void setPort(int port) {
            if (port > 0) {
                SimpleHttpServer.port = port;
            }
        }
        public static void setBasePath(String basePath) {
            if (basePath != null && new File(basePath).exists() && new File(basePath).isDirectory()) {
                SimpleHttpServer.basePath = basePath;
            }
        }
        //启动SimpleHttpServer
        public static void start() throws IOException {
            serverSocket = new ServerSocket(port);
            Socket socket;
            while ((socket = serverSocket.accept()) != null) {
                //接收一个客户端Socket,生成一个HttpRequestHandler,翻入线程池执行
                threadPool.execute(new HttpRequestHandler(socket));
            }
            serverSocket.close();
        }
        static class HttpRequestHandler implements Runnable {
            private Socket socket;
            public HttpRequestHandler(Socket socket) {
                this.socket = socket;
            }
            @Override
            public void run() {
                String line = null;
                BufferedReader br = null;
                BufferedReader reader = null;
                PrintWriter out = null;
                InputStream in = null;
                try {
                    reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                    String header = reader.readLine();
                    //由相对路径计算出绝对路径
                    String filePath = basePath + header.split(" ")[1];
                    out = new PrintWriter(socket.getOutputStream());
                    //如果请求资源的后缀为jpg或者ico,则读取资源并输出
                    if (filePath.endsWith("jpg") || filePath.endsWith("ico")) {
                        in = new FileInputStream(filePath);
                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                        int i;
                        while ((i = in.read()) != -1) {
                            baos.write(i);
                        }
                        byte[] array = baos.toByteArray();
                        out.println("HTTP/1.1 200 OK");
                        out.println("Server: Molly");
                        out.println("Content-Type: image/jpeg");
                        out.println("Content-Length: " + array.length);
                        out.println("");
                        socket.getOutputStream().write(array, 0, array.length);
                    } else {
                        br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
                        out = new PrintWriter(socket.getOutputStream());
                        out.println("HTTP/1.1 200 OK");
                        out.println("Server: Molly");
                        out.println("Content-Type: text/html; charset=UTF-8");
                        out.println("");
                        while ( (line = br.readLine()) != null) {
                            out.println(line);
                        }
                    }
                    out.flush();
                } catch (Exception e) {
                    out.println("HTTP/1.1 200");
                    out.println("");
                    out.flush();
                    e.printStackTrace();
                } finally {
                    close(br, in, reader, out, socket);
                }
            }
        }
        private static void close(Closeable... closeables) {
            if (closeables != null) {
                for (Closeable closeable : closeables) {
                    try {
                        if (closeable != null) {
                            closeable.close();
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
    }
    

    通过一个测试对比来认识线程技术带来服务器吞吐量的提高。准备一个HTML页面,代码清单如下:

    <html>
     <head>
      <title>测试页面</title>
     </head>
     <body>
      <h1>第一张图片</h1>
      <img src="1.jpg" />
      <h1>第二张图片</h1>
      <img src="2.jpg" />
      <h1>第三张图片</h1>
      <img src="3.jpg" />
     </body>
    </html>
    

    将SimpleHttpServer的根目录设定到该HTML页面所在目录,并启动SimpleHttpServer,通过Apache HTTP server benchmarking tool(版本2.3)来测试不同线程数下,SimpleHttpServer的吞吐量表现。

    但是线程池中线程数量并不是越多越好,具体的数量需要评估每个人物的处理时间,以及当前计算机的处理能力和数量。

    相关文章

      网友评论

          本文标题:4-Java并发编程基础

          本文链接:https://www.haomeiwen.com/subject/vnobfqtx.html