美文网首页
多线程(一)

多线程(一)

作者: IfZhou | 来源:发表于2017-10-25 15:38 被阅读0次

    [TOC]

    1. Java并发编程基础

    1.1 什么是线程?

    现代操作系统调度的最小单元;

    一个进程可以创建多个线程,每个线程拥有自己的计数器、堆栈、局部变量等属性,同时可以访问共享的内存变量;

    CPU在线程之前高速切换,使之有同时执行的感觉。

    1.2 为什么使用多线程?

    • 更多的处理器核心;
    • 更快的响应时间
    • 更好地编程模型

    1.3 线程的优先级

    1-10个级别,默认是5;

    注意:程序的正确性不能依赖线程的优先级高低。

    1.4 线程的状态

    • NEW;
    • RUNNABLE;
    • BLOCKED;
    • WAITING;
    • TIME_WAITING;
    • TERMINATED;

    1.5 Daemon线程

    一种支持型线程。用于程序中后台调度及支持性工作。

    注意:当一个Java虚拟机中不存在非Daemon线程时,Java虚拟机将会退出。

    可以通过Thread.setDaemon(true)将线程设置为Daemon线程

    注意:在构建Daemon线程时,不能依靠finall块中的内容来确保执行关闭或清理资源的逻辑

    1.6 启动和终止线程

    • 构造线程:需要提供:线程组、优先级、是否是Daemon线程等信息
    • 启动线程:start();其含义是:当前线程(即parent线程)同步告知Java虚拟机,只要线程规划器空闲,立即启动该线程
    • 理解中断:线程的一个标志位属性,它表示一个运行中的线程是否被其他线程进行了中断。(见Interrupted.java类)
    • 如何安全的终止线程: 见Shutdown.java
    /**
     * Shutdown.java
     *
     * 创建了一个线程CountThread,它不断地进行变量累加,而主线程尝试对其进行中断操作和停止操作。
     */
    public class Shutdown {
    
        public static void main(String[] args) throws InterruptedException {
            Runner one= new Runner();
            Thread countThread = new Thread(one,"countThread");
            countThread.start();
            //睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知中断而结束
            TimeUnit.SECONDS.sleep(1);
            countThread.interrupt();
            Runner two= new Runner();
            countThread = new Thread(two,"countThread");
            countThread.start();
            //睡眠1秒,main线程对two进行取消,使CountThread能够感知on为false而结束
            TimeUnit.SECONDS.sleep(1);
            two.cancel();
        }
    
        private static class Runner implements Runnable{
            private long i;
            private volatile boolean on = true;
    
            @Override
            public void run() {
                while (on && !Thread.currentThread().isInterrupted()){
                    i ++ ;
                }
                System.out.println("Count i="+ i);
            }
            public void cancel(){
                on = false;
            }
        }
    }
    

    1.7 线程间通信

    volatile关键字:告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,以保证可见性。

    注意:过多的使用它会降低程序的效率。
    

    synchronized关键字:确保多个线程在同一时刻,只能有一个线程处于方法或者同步块中,保证了线程对变量访问的可见性和排他性。

    对象、对象的监视器、同步队列、执行线程之间的关系:

    线程想要对Object(Object由Synchronized保护)进行访问
    
    -> 首先需要获得Object的监视器
    
    -> 获取监视器成功,则可访问
    
    -> 获取监视器失败,则该线程进入同步队列,状态变为阻塞,直到拥有锁的线程释放了锁,会唤醒同步队列中的线程,再次尝试进行对监视器的获取操作
    

    1.8 等待/通知机制

    • notify()
    • notifyAll()
    • wait()
    • wait(long)
    • wait(long,int)

    见WaitNotify.java

    /**
     * 两个线程wait线程由notify线程唤醒
     */
    public class WaitNotify {
    
        static boolean flag = true;
        static Object lock = new Object();
    
    
        public static void main(String[] args) throws InterruptedException {
            Thread waitThread = new Thread(new Wait(),"waitThread");
            waitThread.start();
            TimeUnit.SECONDS.sleep(1);
            Thread notifyThread = new Thread(new Notify(),"notifyThread");
            notifyThread.start();
        }
    
    
            static class Wait implements  Runnable{
            @Override
            public void run() {
                synchronized(lock){
                    while(flag){
                        try{
                            System.out.println(Thread.currentThread() +"flag is true. wait@ "+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(Thread.currentThread() +"flag is false. running@ "+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
    
                }
            }
        }
    
        static class Notify implements Runnable{
    
            @Override
            public void run() {
                synchronized(lock){
                    System.out.println(Thread.currentThread() +"hold lock. notify@ "+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
                    lock.notifyAll();
                    flag = false;
                    SleepUtils.second(5);
                }
                synchronized(lock){
                    System.out.println(Thread.currentThread() +"hold lock again. sleep@ "+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
                    SleepUtils.second(5);
                }
            }
        }
    
    }
    
    
    输出:
    
    Connected to the target VM, address: '127.0.0.1:57763', transport: 'socket'
    Thread[waitThread,5,main]flag is true. wait@ 09:24:43
    Thread[notifyThread,5,main]hold lock. notify@ 09:24:44
    Thread[notifyThread,5,main]hold lock again. sleep@ 09:24:49
    Disconnected from the target VM, address: '127.0.0.1:57763', transport: 'socket'
    Thread[waitThread,5,main]flag is false. running@ 09:24:54
    

    注意点:

    • 先对调用对象枷锁,再调用notify()、notifyAll()、wait()
    • wait()方法使线程状态由Running变为Waiting,同时线程被放置到等待队列
    • notify()、notifyAll()被调用后,等待线程不会立即从wait()返回,需要有锁的那个线程先释放锁以后,才有机会从wait()返回
    • notify()、notifyAll()的操作是将等待队列中的线程放置到同步队列中,同时被移动的线程状态由Waiting转变为Blocked(不同的是,前者只放一个,后者放置所有线程)。
    • wait()能够返回,前提是获得了锁。

    1.9 经典范式:生产者/消费者模式

    等待方遵循如下原则:

    • 获取对象锁
    • 如果条件不满足,进行wait()操作,被通知后仍要检查条件。
    • 条件满足则执行对应的条件
      通知方遵循如下原则:
    • 获取对象锁
    • 改变条件
    • 通知所有等待在对象上的线程

    1.10 管道输入/输出流

    主要用于线程之间的数据传输,而传输的媒介为内存

    4种具体实现:

    • PipedOutputStream
    • PipedInputStream
    • PipedReader
    • PipedWriter

    见Piped.java

    /**
     * PipedWriter和PipedReader相连接,主线程读入控制台输入的字符,传给Print线程,打印到控制台
     */
    public class Piped {
    
        public static void main(String[] args)   {
            PipedWriter out = new PipedWriter();
            PipedReader in = new PipedReader();
            try {
                out.connect(in);
            } catch (IOException e) {
                e.printStackTrace();
            }
            Thread printThread = new Thread(new Print(in),"PrintThread");
            printThread.start();
            int receive = 0;
            try {
                while((receive = System.in.read())!= 1){
                    out.write(receive);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                try {
                    out.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
        static class Print implements  Runnable{
            private PipedReader in;
            public Print(PipedReader in){
                this.in= in;
            }
            @Override
            public void run() {
                int receive = 0;
                try {
                    while((receive = in.read()) != -1 ){
                        System.out.println((char) receive);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    1.11 Thread.join()的使用

    当线程A执行了thread.join()语句,其含义是:当前线程A等待thread线程终止之后才从thread.join()返回。

    见Join.java

    /**
     * Join.java
     * 每个线程调用前一个线程的join()方法,意味着:从主线程结束->线程1结束-> ... -> 线程10结束
     */
    public class Join {
        public static void main(String[] args) throws InterruptedException {
            Thread previous = Thread.currentThread();
            for(int i = 0; i < 10; i ++){
                Thread thread = new Thread(new Domino(previous),String.valueOf(i));
                thread.start();
                previous = thread;
            }
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + " terminate.");
        }
    
        static class Domino implements Runnable{
            private Thread thread;
            public Domino(Thread thread){
                this.thread = thread;
            }
            @Override
            public void run() {
                try {
                    this.thread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " terminate.");
            }
        }
    
    }
    

    输出:

    main terminate.
    0 terminate.
    1 terminate.
    2 terminate.
    3 terminate.
    4 terminate.
    5 terminate.
    6 terminate.
    7 terminate.
    8 terminate.
    9 terminate.
    

    join()方法的逻辑结构和等待/通知经典范式一致,即加锁、循环和处理逻辑3个步骤

    1.12 ThreadLocal的使用

    ThreadLocal,即线程变量。键值存储结构。

    一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。
    见Profiler.java

    /**
     * Profiler.java
     *
     *相当于每个线程自己会有自己的本地变量,虽然共享了TIME_THREADLOCAL变量,但在get的时候只会获取自己线程的本地变量值
     *通过匿名内部类来构建一个ThreadLocal子类,重写方法initialValue,以便在get和set方法第一次调用时,进行初始化
     */
    public class Profiler {
        private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>(){
            protected  Long initialValue(){
                return System.currentTimeMillis();
            }
        };
        public static final void begin(){
            TIME_THREADLOCAL.set(System.currentTimeMillis());
        }
    
        public static final long end(){
            return System.currentTimeMillis() - TIME_THREADLOCAL.get();
        }
    
        public static void main(String[] args) throws InterruptedException {
            Profiler.begin();
            TimeUnit.SECONDS.sleep(1);
            System.out.println("Cost:" + Profiler.end() + " mills");
        }
    
    
    }
    

    1.13 线程实例

    等待超时模式:

    • 使用场景:调用一个方法时等待一段时间,能在时间内返回,则立即返回,超时,返回默认结果。
    • 基本点:等待持续时间 REMAINING = T 、超时时间 FUTURE = now + T
      见TimeoutPattern.java
    /**
     * 一个经典的等待超时模式
     */
    public class TimeoutPattern {
    
        public synchronized Object get(long mills) throws InterruptedException {
            Object result = null;
            long future = System.currentTimeMillis() + mills;
            long remaining = mills;
            while((result == null)&& remaining > 0 ){
                wait(remaining);
                remaining = future - System.currentTimeMillis();
            }
            return result;
        }
    }
    

    一个简单的数据库连接池实例

    重点是:使用等待超时模式,在获取连接的过程,如果有连接则直接返回;如果没有,则wait(mills),在其他线程释放连接时被唤醒,如果超时未被唤醒,返回null。

    public class ConnectionPool {
    
        private LinkedList<Connection> pool = new LinkedList<Connection>();
    
        public ConnectionPool(int initialSize){
            if(initialSize > 0 ){
                for(int i = 0; i < initialSize ; i ++){
                    pool.add(ConnectionDriver.createConnection()); //使用动态代理创建一个连接
                }
            }
        }
        public void releaseConnection(Connection connection){
            if(connection != null){
                synchronized (pool){
                    pool.add(connection);
                    pool.notifyAll();
                }
    
            }
        }
        public Connection fetchConnection(long mills) throws InterruptedException {
            synchronized (pool){
                //完全超时
                if(mills < 0 ){
                    while (pool.isEmpty()){
                        pool.wait();
                    }
                    return pool.removeFirst();
                }else {
                    long future = System.currentTimeMillis() + mills;
                    long remaining = mills;
                    while(pool.isEmpty() && remaining > 0 ){
                        pool.wait(remaining);
                        remaining = future - System.currentTimeMillis();
                    }
                    Connection result = null;
                    if(!pool.isEmpty()){
                        result = pool.removeFirst();
                    }
                    return result;
                }
            }
        }
    }
    
    public class ConnectionDriver {
        static class ConnectionHandler implements InvocationHandler{
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                if(method.getName().equals("commit")){
                    TimeUnit.MILLISECONDS.sleep(100);
                }
                return null;
            }
        }
        public static  final Connection createConnection(){
            return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),new Class<?>[]{Connection.class},new ConnectionHandler());
    
        }
    }
    
    public class ConnectionPoolTest {
    
        static ConnectionPool pool = new ConnectionPool(10);
        static CountDownLatch start = new CountDownLatch(1);
        static CountDownLatch end;
    
        public static void main(String[] args) throws InterruptedException {
            int threadCount = 1000;
            end = new CountDownLatch(threadCount);
            int count = 1000;
            AtomicInteger got = new AtomicInteger();
            AtomicInteger notGot = new AtomicInteger();
            for(int i = 0; i < threadCount; i ++){
                Thread thread = new Thread(new ConnectionRunner(count,got,notGot),"ConnectionRunnerThread");
                thread.start();
            }
            start.countDown();
            end.await();
            System.out.println("total invoke: " + (threadCount * count));
            System.out.println("got connection:  " + got);
            System.out.println("not got connectio " + notGot);
        }
        static class ConnectionRunner implements  Runnable{
            int count;
            AtomicInteger got;
            AtomicInteger notgot;
    
            public ConnectionRunner(int count,AtomicInteger got,AtomicInteger notgot){
                this.count = count;
                this.got = got;
                this.notgot = notgot;
            }
    
            @Override
            public void run() {
                try {
                    start.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                while (count > 0 ){
                    try {
                        Connection connection = pool.fetchConnection(1000);
                        if(connection!=null){
                            try {
                                connection.createStatement();
                                connection.commit();
                            } catch (SQLException e) {
                                e.printStackTrace();
                            }finally {
                                pool.releaseConnection(connection);
                                got.incrementAndGet();
                            }
                        }else{
                            notgot.incrementAndGet();
                        }
                    } catch (InterruptedException e) {
    
                    }finally {
                        count --;
                    }
                }
                end.countDown();
            }
        }
    }
    

    线程池技术

    本质:一个线程安全的任务队列,它连接了工作者线程和客户端线程。工作者线程中,不断地在任务队列中获取任务,没有任务就wait,直到在任务队列中新增一个任务后notify唤醒。

    public interface ThreadPool<Job extends Runnable> {
    
        void execute(Job job);
    
        void shutdown();
    
        void addWorkers(int num);
    
        void removeWorker(int num);
    
        int getJobSize();
    }
    
    public class DefaultThreadPool<Job extends  Runnable> implements ThreadPool<Job> {
    
        private static final int MAX_WORKER_NUMBERS = 10;
    
        private static final int DEFAULT_WORKER_NUMBERS = 5;
    
        private static final int MIN_WORKER_NUMBERS = 1;
        //工作列表,将会向里面插入工作
        private final LinkedList<Job> jobs = new LinkedList<Job>();
        //工作者列表
        private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
        //工作者线程数量
        private int workerNum = DEFAULT_WORKER_NUMBERS;
        //线程编号
        private AtomicLong threadNum = new AtomicLong();
    
        public DefaultThreadPool(){
            initializeWorker(DEFAULT_WORKER_NUMBERS);
        }
        public DefaultThreadPool(int num){
            workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num <  MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
            initializeWorker(workerNum);
        }
        //初始化线程工作者
        private void initializeWorker(int num){
            for(int i = 0 ; i < num; i ++ ){
                Worker worker = new Worker();
                workers.add(worker);
                Thread thread = new Thread(worker, "ThreadPool-Worker-"+ threadNum.incrementAndGet());
                thread.start();
            }
        }
    
    
        @Override
        public void execute(Job job) {
            if(job != null){
                synchronized (jobs){
                    jobs.addLast(job);
                    jobs.notifyAll();
                }
            }
        }
    
        @Override
        public void shutdown() {
            for(Worker worker : workers){
                worker.shutdown();
            }
        }
    
        @Override
        public void addWorkers(int num) {
            synchronized (jobs){
                if(num + this.workerNum > MAX_WORKER_NUMBERS){
                    num = MAX_WORKER_NUMBERS;
                }
                initializeWorker(num);
                this.workerNum += num;
            }
        }
    
        @Override
        public void removeWorker(int num) {
            synchronized (jobs){
                if(num >= this.workerNum){
                    throw  new IllegalArgumentException("beyond worknum");
                }
                //按照给定的数量停止Worker
                int count = 0;
                while(count < num){
                    Worker worker = workers.get(count);
                    if(workers.remove(worker)){
                        worker.shutdown();
                        count ++;
                    }
                }
                this.workerNum -= count;
            }
        }
    
        @Override
        public int getJobSize() {
            return jobs.size();
        }
    
    
    
        class Worker implements  Runnable{
            //是否工作
            private volatile boolean running = true;
            @Override
            public void run() {
                while (running){
                    Job job = null;
                    synchronized (jobs){
                        //如果工作者列表是空的,那么久wait
                        while(jobs.isEmpty()){
                            try {
                                jobs.wait();
                            } catch (InterruptedException e) {
                                //感知到外部对WorkerThread的中断操作,返回
                                e.printStackTrace();
                                return ;
                            }
    
                        }
                        job = jobs.removeFirst();
                    }
                    if(job != null){
                        try{
                            job.run();
                        }catch (Exception e){
                            //此处暂时忽略Job执行中的Exception
                        }
                    }
                }
            }
            public void shutdown(){
                running =false;
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:多线程(一)

          本文链接:https://www.haomeiwen.com/subject/ekmqpxtx.html