美文网首页
并发编程的学习笔记。

并发编程的学习笔记。

作者: 会飞的蜗牛66666 | 来源:发表于2019-03-09 11:28 被阅读0次

关于锁的问题:
1,避免一个线程同时获取多个锁
2,避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源
3,尝试使用定时锁,使用lock.tryLock(timeout)来替代内部锁机制 lock.tryLock(300L, TimeUnit.SECONDS);
4,对于数据库锁,加锁和解锁必须在一个数据库连接里,否则会出现解锁失败的现象

synchronize和volatile
volatile是轻量级的synchronize,在线程中保证了共享变量的可见性,即当一个线程修改了这个值以后,其他线程能被同步刷新,而不会被锁住
synchronized可以修饰方法或者以同步代码块的形式进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中
volatile基本替代了synchronize重量级的锁
采用TimeUnit.SECONDS.sleep(1) 替代Thread.sleep(200)
public class SafeDoubleCheckedLocking {
private volatile static SafeDoubleCheckedLocking instance;
public SafeDoubleCheckedLocking() {
}
public static SafeDoubleCheckedLocking getInstance() {
if (instance == null) {
synchronized (SafeDoubleCheckedLocking.class) {
if (instance == null) {
instance = new SafeDoubleCheckedLocking();
}
}
}
return instance;
}
}

线程
//获取java线程管理mxbean
final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
final ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
for (ThreadInfo threadInfo : threadInfos) {
System.out.println(threadInfo.getThreadName());
}
线程的优先级
在java中,通过一个整型成员变量priority来控制优先级,优先级的范围从1到10,在线程构建的时候可以通过setPriority(int)方法来修改优先级,默认的优先级是5
优先级高的线程分配的时间片的数量要多于优先级低的线程
安全的终止线程
public class ShutDown {

public static void main(String[] args) throws Exception {
    final Runner one = new Runner();
    Thread countThread = new Thread(one, "CountThread");
    countThread.start();
    //睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知on为false而结束
    TimeUnit.SECONDS.sleep(1);
    countThread.interrupt();
    final Runner two = new Runner();
    countThread = new Thread(two, "CountThread");
    countThread.start();
    //睡眠1秒,main线程对Runner two进行取消,使CountThread能够感知on为false而结束
    TimeUnit.SECONDS.sleep(1);
    two.cancle();
}

private static class Runner implements Runnable {
    private long i;
    private volatile boolean on = true;

    @Override
    public void run() {
        while (on && !Thread.currentThread().isInterrupted()) {
            i++;
        }
        System.out.println("Count i=" + i);
    }

    private void cancle() {
        on = false;
    }
}

}

线程之间的通信
notify()与wait()
notify()通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提是该线程获取到了对象的锁
notifyAll()通知所有等待在该对象上的线程
wait()调用该方法的线程进入waitting状态,只有等待另外的线程的通知或被中断才会返回,需要注意的是调用该方法后会释放对象的锁
wait(long)是等待多少毫秒

ThreadLocal 线程变量
是一个以ThreadLocal对象为主键,任意对象为值的存储结构,这个结果被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一值

CountDownLatch类只提供了一个构造器:
public CountDownLatch(int count) { }; //参数count为计数值
然后下面这3个方法是CountDownLatch类中最重要的方法:
public void await() throws InterruptedException { }; //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public void countDown() { }; //将count值减1,直到count的值为0才开始执行

hashmap与ConcurrentHashMap
在多线程环境中采用线程不安全的hashMap进行操作会有死循环的现象,导致cpu的利用率接近100%。
为何呢?因为多线程会导致HashMap的entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry
Hashtable容器使用sychronized来保证线程安全,但是在线程竞争激烈的情况下,其效率非常低下,因为当一个线程访问hashtable的同步方法,其他线程也访问hashtable的同步方法的时候
会进入阻塞和轮询的状态。
ConcurrentHashMap采用分段锁技术,每一把锁用于容器其中一部分数据,所以线程间不存在锁竞争。

CountDownLatch允许一个或多个线程等待其他线程完成操作
现在有这样一个场景,现在有一个excel文件,文件里面有多个sheet数据,此时可以考虑多线程去处理。
每个线程解析一个sheet里面的数据,等到所有的sheet都解析完成之后,程序需要提示解析完成,在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的方式是join()
join用于让当前执行线程等待join线程执行结束。其实原理是不停检查join线程是否存活,如果join线程存活则让当前线程永远等待。其中wait(0)表示永远等待下去,代码片段如下:
while (isAllLive()){
wait(0)
}
public class Test {

public static void main(String[] args) throws InterruptedException {
    Thread parse1 = new Thread(new Runnable() {
        public void run() {
            System.out.println("parse1 finished");
        }
    });
    Thread parse2 = new Thread(new Runnable() {
        public void run() {
            System.out.println("parse2 finished");
        }
    });

    parse1.start();
    parse2.start();
    //join用于让当前执行线程等待join线程执行结束
    parse1.join();//不停的检查join线程是否存活,如果还存活就永远的让当前线程等待
    parse2.join();
    //检查到join线程终止以后,线程就会调用this.notifyAll()方法,这个是在jvm里面自动实现的。
    System.out.println("all parse finished");
}

}

那么我们来看下CountDownLatch的具体使用方式:
public class CountDownLatchTest {
//传入一个int类型的参数作为计数器N
private static CountDownLatch c = new CountDownLatch(2);

public static void main(String[] args) throws InterruptedException {

    new Thread(new Runnable() {
        public void run() {
            System.out.println(1);
            //当调用countDown方法时,N就会减1,它的await方法会阻塞当前线程,直到N变成0,这里的N可以是N个线程也可以是N个步骤
            c.countDown();
            System.out.println(2);
            c.countDown();
        }
    }).start();
    //如果有某个解析sheet的线程比较慢,我们不可能一直让主线程等待,所以采用一个带时间参数的await方法如await(long timeout, TimeUnit unit)
    //这个方法就是等待特定的时间后,就不会再阻塞当前线程
    c.await();
    System.out.println("3");
}

同步屏障CyclicBarrier的使用
/**

  • @Author: xwj

  • @Date: 2019/3/8 0008 17:13

  • @Version 1.0

  • 同步屏障,他的作用是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障,屏障才会开门,所有被屏障阻截的线程才会继续运行
    */
    public class CyclicBarrierTest {

    //构造方法CyclicBarrier(N)表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞
    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {

     new Thread(new Runnable() {
         public void run() {
             try {
                 c.await();
             } catch (InterruptedException e) {
                 e.printStackTrace();
             } catch (BrokenBarrierException e) {
                 e.printStackTrace();
             }
             System.out.println(1);
         }
     }).start();
     try {
         c.await();
     } catch (InterruptedException e) {
         e.printStackTrace();
     } catch (BrokenBarrierException e) {
         e.printStackTrace();
     }
     System.out.println(2);
     //因为主线程和子线程的调度都是由cpu决定的,2个线程都有可能先执行,所以会有2种结果。
     //如果把new CyclicBarrier(2);改为new CyclicBarrier(3);则会一直等待下去,因为没有第3个线程到达屏障
    

    }
    }

/**

  • @Author: xwj

  • @Date: 2019/3/8 0008 17:25

  • @Version 1.0

  • CyclicBarrier提供了一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction)

  • 用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
    */
    public class CyclicBarrierTest2 {

    static CyclicBarrier c = new CyclicBarrier(2, new A());

    public static void main(String[] args) {

     new Thread(new Runnable() {
         public void run() {
             try {
                 c.await();
             } catch (InterruptedException e) {
                 e.printStackTrace();
             } catch (BrokenBarrierException e) {
                 e.printStackTrace();
             }
             System.out.println(1);
         }
     }).start();
    
     try {
         c.await();
     } catch (InterruptedException e) {
         e.printStackTrace();
     } catch (BrokenBarrierException e) {
         e.printStackTrace();
     }
     System.out.println(2);
    

    }

    //因为设置了拦截线程的数据是2,所以等代码中线程A和主线程都执行完之后,才会继续执行主线程。
    static class A implements Runnable {

     public void run() {
         System.out.println(3);
     }
    

    }
    }

CyclicBarrier实际应用例子

/**

  • @Author: xwj

  • @Date: 2019/3/9 0009 11:09

  • @Version 1.0

  • CyclicBarrier的应用,多线程进行计算数据,最后合并计算结果的场景

  • 有这样一个场景,用一个excel保存了用户所有银行流水,每个sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水

  • 实现方法:先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后再用barrierAction用这些线程计算结果,计算出整个EXCEL的日均银行流水
    */
    public class BandWaterService implements Runnable {

    /**

    • 创建4个线程屏障,处理完之后执行当前类的run方法
      */
      private CyclicBarrier c = new CyclicBarrier(4, this);

    /**

    • 假设只有4个sheet,所以只启动4个线程
      */
      private ExecutorService executor = newFixedThreadPool(4);

    /**

    • 保存每个sheet计算出的银流结果
      */
      private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();

    private void count() {
    for (int i = 0; i < 4; i++) {
    executor.execute(new Runnable() {
    @Override
    public void run() {
    //计算当前sheet的银流数据,计算代码省略
    sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
    //银流计算完成,插入一个屏障
    try {
    //让4个线程都到达屏障后等待,再执行主线程的汇总结果
    c.await();
    } catch (InterruptedException | BrokenBarrierException e) {
    e.printStackTrace();
    }
    }
    });
    }
    }

    @Override
    public void run() {
    int result = 0;
    //汇总每个sheet计算出的结果
    final Set<Map.Entry<String, Integer>> entries = sheetBankWaterCount.entrySet();
    for (Map.Entry<String, Integer> sheet : entries) {
    result += sheet.getValue();
    }
    //将结果输出
    sheetBankWaterCount.put("result", result);
    System.out.println(result);
    }

    public static void main(String[] args) {
    final BandWaterService service = new BandWaterService();
    service.count();
    }
    }

下面来了解CyclicBarrier和CountDownLatch的区别
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以用getNumberWaiting获取线程阻塞的数量,isBorken方法来了解线程是否阻塞等。

控制并发线程数的Semaphore
Semaphore是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。可以理解为红绿灯
/**

  • @Author: xwj

  • @Date: 2019/3/9 0009 11:38

  • @Version 1.0

  • 假如有一个需求,需要读取几万个文件的数据,所以我们启动几十个线程并发地读取,然后将它写入到数据库,但是数据库连接数只有10个

  • 这个时候,我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。

  • 这种情况,就可以使用Semaphore来做流量控制了。
    */
    public class SemaphoreTest {
    private static final int THREAD_COUNT = 30;

    private static ExecutorService threadPool = newFixedThreadPool(THREAD_COUNT);

    private static Semaphore s = new Semaphore(10);

    /**

    • Semaphore的构造方法要求传入一个int的参数,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后release归还许可证。
    • 还可以用tryAcquire()方法尝试获取许可证
    • 它的用法类似于去图书馆借书,由于资源有限,每次只云允许最后10个人来看书,每个人去看书都要申请许可证,获取许可后才可以看书
    • 看完书以后,就要归还许可证,然后其他人又可以去申请许可证。
    • 我们来看它的一些方法
    • s.getQueueLength();等待获取许可证的线程数
    • s.availablePermits();许可证的可用数量
    • s.hasQueuedThreads();是否有线程正在等待获取许可证
    • @param args
      */
      public static void main(String[] args) {
      for (int i = 0; i < THREAD_COUNT; i++) {
      threadPool.execute(new Runnable() {
      @Override
      public void run() {
      try {
      s.acquire();
      System.out.println("save data");
      s.release();
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      });
      }
      threadPool.shutdown();
      }
      }

相关文章

网友评论

      本文标题:并发编程的学习笔记。

      本文链接:https://www.haomeiwen.com/subject/rtlspqtx.html