Java 线程浅析

作者: mrwangyong | 来源:发表于2017-11-12 20:31 被阅读130次

    什么是线程

    线程 是操作系统能够运行的最小执行单元,被包含在进程之中,而进程,可以广泛的理解为一个 application,一个进程可以包含多个线程,进程和线程之间是包含与被包含的关系,线程的引入主要是为了让应用拥有多任务处理的能力

    1. 进程是一个容器,里面至少含有一个线程, 比如 Android 中的主线程(main)
    2. 线程是进程容器中的一个执行单元
    3. 操作系统按照进程分配资源
    4. 多个线程可以共享进程中的所以资源,比如内存等
    5. CPU 最终执行的是线程
    6. 每个线程都有自己的线程栈和程序执行计数器,保存自己的函数调用过程

    Java 中的线程

    开启一个线程,Java 中提供了两种方式

    1. 继承Thread,直接调用 start

    2. 实现 Runable,然后

      Thread helloThread = new Thread(new HelloRunnable());
      helloThread.start();
      

    两者有什么区别呢,打开Thread 源码,发现:

      public class Thread implements Runnable {
        ........
      }
    

    Thread 直接继承Runable,而Runable只是一个普通的接口,用来封装一些一个 task,真正开启线程并启动的是 Thread类的 start 方法

    public synchronized void start() {//防止多个线程引发异常
            //判断是否已经 start 否则抛出异
            if (threadStatus != 0 || started)
                throw new IllegalThreadStateException();
    
            group.add(this);//加入到当前的线程组中
    
            started = false;//恢复当前线程的 started
            try {
            //在 C层用 pthred创建一个线程
                nativeCreate(this, stackSize, daemon);
                started = true;//修改当前线程的 started 状态
            } finally {
                try {
                    if (!started) {
                        group.threadStartFailed(this);
                    }
                } catch (Throwable ignore) {
                    /* do nothing. If start0 threw a Throwable then
                      it will be passed up the call stack */
                }
            }
        }
    

    可以看出,一个线程,真正的启动过程:

    1. group.add(this)将当前线程加入到线程组里

    2. nativeCreate(this, stackSize, daemon);通过 JNI 使用 Linux 的pthread 创建C线程,这里才是真正的线程创建的地方

    3. JNI 回调 Java, 调用Thread 类的 run 方法,最终回调Runable接口中的实现task,此时,已经切换到新的线程环境

    线程间状态:

    Thread 类中用一个枚举表示,

    public enum State {
            NEW,
            RUNNABLE,
            BLOCKED,
            WAITING,
            TIMED_WAITING,
            TERMINATED;
        }
    

    一图胜前言


    多线程通信

    Java 多线程通信,通常有以下两种形式

    一.共享内存机制

    因为多个线程共享进程中的内存资源,所以我们可以用共享内存的方式实现多个线程之间的通信,

    线程之间共享程序的公共状态,线程之间通过写-读内存中的公共状态来隐式进行通信。而线程间的同步是显式的,程序必须指定某个字段在多个线程间互斥执行

    最简单的,我们经常会设置一个共享变量。然后多个线程去操作同一个共享变量。从而达到线程通讯的目的,属于线程之间的间接通信.线程间的同步是隐式

    下面是经典的抢票流程:

    实现代码:

    /**
     * 售票窗口,不断的卖票直到票卖完
     * @author chengwangyong
     * @date 2017/11/12
     */
    public class TicketWindow extends Thread {
      TicketWindow(String name) {
        super(name);
      }
    
      @Override
      public void run() {
        while (TicketConstance.ticketCount > 0) {
          System.out.println(Thread.currentThread().getName() + " 售出了第" + TicketConstance.ticketCount + "票");
          TicketConstance.ticketCount--;
        }
      }
    }
    

    测试类:

    public class ThreadTest {
    
      public static void main(String[] args) {
        TicketWindow ticket1 = new TicketWindow("窗口1");
        TicketWindow ticket2 = new TicketWindow("窗口2");
        TicketWindow ticket3 = new TicketWindow("窗口3");
        TicketWindow ticket4 = new TicketWindow("窗口4");
        ticket1.start();
        ticket2.start();
        ticket3.start();
        ticket4.start();
      }
    }
    

    结果

    窗口2 售出了第100张票
    窗口1 售出了第100张票
    窗口4 售出了第100张票
    窗口3 售出了第100张票
    窗口4 售出了第97张票
    窗口1 售出了第98张票
    窗口2 售出了第99张票
    窗口2 售出了第93张票
    窗口2 售出了第92张票
    窗口2 售出了第91张票
    窗口2 售出了第90张票
    窗口1 售出了第94张票
    ......
    

    发现,第100张票买了四次,明显出现了问题,

    原因出在了两个方面:

    1. ticketCount 内存可见性,线程1修改了值,但是因为 CPU 还有三级缓存,不一定线程2可见
    2. ticketCount--并不是原子操作,分为三个步骤
      1. 取counter的当前值
      2. 在当前值基础上加1
      3. 将新值重新赋值给counter

    解决办法:

    1. 给调用过程加锁,让同一时间只有一个线程去访问被保护的代码,从而让保护的代码变成一个原子操作

      public class TicketWindow extends Thread {
        TicketWindow(String name) {
          super(name);
        }
      
        @Override
        public void run() {
          ticket();
        }
      
        private void ticket() {
          synchronized (TicketWindow.class) {
            while (TicketConstance.ticketCount > 0) {
              System.out.println(Thread.currentThread().getName() + " 售出了第" + TicketConstance.ticketCount + "张票");
              TicketConstance.ticketCount--;
            }
          }
        }
      }
      

      注意synchronized是加到类上面的.synchronized加到方法上和不加任何参数形式,都是加到对象上面的,对同一个对象有效,而加到类上则对同一个类的不同对象都有效,synchronized大致的执行流程如下

      1. 线程 A 尝试获得锁,如果能获得锁,则执行,不能,加入到线程的等待队列中(wait()),并且阻塞当前的调用(线程状态BLOCKED)
      2. 执行锁里面的代码
      3. 释放锁,从等待队列中唤醒一个线程接着往下执行(notify() 锁可重用),如果有多个,随机唤醒一个,并且把修改后的内容写入到内存,保证内存可见性
    2. 保证原子性操作:

      可以给ticketCount—加一个volatile修饰,这样ticketCount--则变为原子性,每次修改 Java 都能保证每次修改写入到内存中,其他线程可见,并且,volatilesynchronized更加的轻量级,如果能用volatile解决,就不要用synchronized

      public static volatile int ticketCount = 100;
      
    二. 消息通信机制

    线程之间的直接通信,不同的线程之间通过显式的发送消息来达到交互目的,更加符合面向对象的机制

    Java中的每个对象,都有一个锁和线程等待队列,并且在Object类中,加入了wait/notify/notifyAll方法,让每个对象都拥有控制线程执行状态的能力

    wait: 当前线程放入线程等待队列,释放锁,线程状态变为WAITING或 者TIMED_WAITING(取决于是否设置等待时间)

    notify: 从当前对象的等待队列中,随机选择一个唤醒,竞争对象锁,获取到则进入Running状态执行,否则加入等待队列,等待下一次唤醒

    notifyAll: 和notify类似,不同的是唤醒所有等待队列的线程,让所有被唤醒的线程一起竞争对象锁,胜者执行,其他的继续等待下一次唤醒

    演示一个基本的线程间互相通信的过程
    让主线程执行完毕后通知子线程去执行:

    public class ThreadOne extends Thread {
      @Override
      public void run() {
        super.run();
        add(100);
      }
    
      public synchronized int add(int num) {
        try {
          wait();
          num = num + 100;
          System.out.println("num=" + num);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return num;
      }
    
      public synchronized void notifyThread(){
        notify();
      }
    }
    

    测试代码

      public static void main(String[] args) {
        ThreadOne threadOne = new ThreadOne();
        threadOne.start();
        try {
          Thread.sleep(1000L);
          threadOne.notifyThread();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
    
      }
    

    可以看出,threadOnewait之后,只有 notify 后才能从wait()的下一行接着往下执行;

    下面基于消息通信机制,实现一个最常见的生产者/消费者模型

    生产者/消费者模型:生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到队列上,而消费者从队列上取数据或任务,如果队列满,生产者暂停生成,如果队列空,消费者等待

    任务队列代码:

    public class TaskQueue<E> {
      private Queue<E> taskQueue; //任务队列
      private int maxSize; //任务队列最大能装载的任务
    
      public TaskQueue(int maxSize) {
        this.maxSize = maxSize;
        taskQueue = new ArrayDeque<>(maxSize);
      }
    
      // 生产者将数据存入到队列中
      public synchronized void put(E e) throws InterruptedException {
        if (taskQueue.size() == maxSize) {
          wait(); // 如果队列已满 生产者等待状态
        }
        taskQueue.add(e);
        notifyAll();
      }
    
      // 消费者从队列中拉取数据
      public synchronized E get() throws InterruptedException {
        if (taskQueue.isEmpty()) {
          wait();// 如果队列已空 消费者者等待状态
        }
        E e = taskQueue.poll();
        notifyAll();
        return e;
      }
    }
    

    生产者代码:

    public class Producer extends Thread {
      TaskQueue<String> queue;
    
      public Producer(TaskQueue<String> queue) {
        this.queue = queue;
      }
    
      @Override
      public void run() {
        int num = 0; // 任务序号
        try {
          while (true) { // 不停的生产
            String task = String.valueOf(num);
            queue.put(task);
            System.out.println("生产者 生产task " + task);
            num++;
            Thread.sleep((int) (Math.random() * 100)); // 保证生产者和消费者不同的间隔,乱序
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
    

    消费者代码:

    public class Consumer extends Thread {
      TaskQueue<String> queue;
    
      public Consumer(TaskQueue<String> queue) {
        this.queue = queue;
      }
    
      @Override
      public void run() {
        try {
          while (true) { // 不停的从任务队列中获取
            String task = queue.get();
            System.out.println("消费者 消费 task " + task);
            Thread.sleep((int) (Math.random() * 100)); // 保证生产者和消费者不同的间隔,乱序
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
    

    测试代码:

      public static void main(String[] args){
        TaskQueue<String> queue = new TaskQueue<>(10);
        new Producer(queue).start();
        new Consumer(queue).start();
      }
    

    输出:

    生产者 生产task 0
    消费者 消费 task 0
    生产者 生产task 1
    生产者 生产task 2
    消费者 消费 task 1
    消费者 消费 task 2
    生产者 生产task 3
    生产者 生产task 4
    消费者 消费 task 3
    生产者 生产task 5
    消费者 消费 task 4
    消费者 消费 task 5
    生产者 生产task 6
    .......
    

    可以看出,基于消息通信机制的代码,比共享变量控制颗粒更小,更复杂,更适合复杂的多线程环境

    相关文章

      网友评论

        本文标题:Java 线程浅析

        本文链接:https://www.haomeiwen.com/subject/pepomxtx.html