关于锁的问题:
1,避免一个线程同时获取多个锁
2,避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源
3,尝试使用定时锁,使用lock.tryLock(timeout)来替代内部锁机制 lock.tryLock(300L, TimeUnit.SECONDS);
4,对于数据库锁,加锁和解锁必须在一个数据库连接里,否则会出现解锁失败的现象
synchronize和volatile
volatile是轻量级的synchronize,在线程中保证了共享变量的可见性,即当一个线程修改了这个值以后,其他线程能被同步刷新,而不会被锁住
synchronized可以修饰方法或者以同步代码块的形式进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中
volatile基本替代了synchronize重量级的锁
采用TimeUnit.SECONDS.sleep(1) 替代Thread.sleep(200)
public class SafeDoubleCheckedLocking {
private volatile static SafeDoubleCheckedLocking instance;
public SafeDoubleCheckedLocking() {
}
public static SafeDoubleCheckedLocking getInstance() {
if (instance == null) {
synchronized (SafeDoubleCheckedLocking.class) {
if (instance == null) {
instance = new SafeDoubleCheckedLocking();
}
}
}
return instance;
}
}
线程
//获取java线程管理mxbean
final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
final ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
for (ThreadInfo threadInfo : threadInfos) {
System.out.println(threadInfo.getThreadName());
}
线程的优先级
在java中,通过一个整型成员变量priority来控制优先级,优先级的范围从1到10,在线程构建的时候可以通过setPriority(int)方法来修改优先级,默认的优先级是5
优先级高的线程分配的时间片的数量要多于优先级低的线程
安全的终止线程
public class ShutDown {
public static void main(String[] args) throws Exception {
final Runner one = new Runner();
Thread countThread = new Thread(one, "CountThread");
countThread.start();
//睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知on为false而结束
TimeUnit.SECONDS.sleep(1);
countThread.interrupt();
final Runner two = new Runner();
countThread = new Thread(two, "CountThread");
countThread.start();
//睡眠1秒,main线程对Runner two进行取消,使CountThread能够感知on为false而结束
TimeUnit.SECONDS.sleep(1);
two.cancle();
}
private static class Runner implements Runnable {
private long i;
private volatile boolean on = true;
@Override
public void run() {
while (on && !Thread.currentThread().isInterrupted()) {
i++;
}
System.out.println("Count i=" + i);
}
private void cancle() {
on = false;
}
}
}
线程之间的通信
notify()与wait()
notify()通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提是该线程获取到了对象的锁
notifyAll()通知所有等待在该对象上的线程
wait()调用该方法的线程进入waitting状态,只有等待另外的线程的通知或被中断才会返回,需要注意的是调用该方法后会释放对象的锁
wait(long)是等待多少毫秒
ThreadLocal 线程变量
是一个以ThreadLocal对象为主键,任意对象为值的存储结构,这个结果被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一值
CountDownLatch类只提供了一个构造器:
public CountDownLatch(int count) { }; //参数count为计数值
然后下面这3个方法是CountDownLatch类中最重要的方法:
public void await() throws InterruptedException { }; //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public void countDown() { }; //将count值减1,直到count的值为0才开始执行
hashmap与ConcurrentHashMap
在多线程环境中采用线程不安全的hashMap进行操作会有死循环的现象,导致cpu的利用率接近100%。
为何呢?因为多线程会导致HashMap的entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry
Hashtable容器使用sychronized来保证线程安全,但是在线程竞争激烈的情况下,其效率非常低下,因为当一个线程访问hashtable的同步方法,其他线程也访问hashtable的同步方法的时候
会进入阻塞和轮询的状态。
ConcurrentHashMap采用分段锁技术,每一把锁用于容器其中一部分数据,所以线程间不存在锁竞争。
CountDownLatch允许一个或多个线程等待其他线程完成操作
现在有这样一个场景,现在有一个excel文件,文件里面有多个sheet数据,此时可以考虑多线程去处理。
每个线程解析一个sheet里面的数据,等到所有的sheet都解析完成之后,程序需要提示解析完成,在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的方式是join()
join用于让当前执行线程等待join线程执行结束。其实原理是不停检查join线程是否存活,如果join线程存活则让当前线程永远等待。其中wait(0)表示永远等待下去,代码片段如下:
while (isAllLive()){
wait(0)
}
public class Test {
public static void main(String[] args) throws InterruptedException {
Thread parse1 = new Thread(new Runnable() {
public void run() {
System.out.println("parse1 finished");
}
});
Thread parse2 = new Thread(new Runnable() {
public void run() {
System.out.println("parse2 finished");
}
});
parse1.start();
parse2.start();
//join用于让当前执行线程等待join线程执行结束
parse1.join();//不停的检查join线程是否存活,如果还存活就永远的让当前线程等待
parse2.join();
//检查到join线程终止以后,线程就会调用this.notifyAll()方法,这个是在jvm里面自动实现的。
System.out.println("all parse finished");
}
}
那么我们来看下CountDownLatch的具体使用方式:
public class CountDownLatchTest {
//传入一个int类型的参数作为计数器N
private static CountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
public void run() {
System.out.println(1);
//当调用countDown方法时,N就会减1,它的await方法会阻塞当前线程,直到N变成0,这里的N可以是N个线程也可以是N个步骤
c.countDown();
System.out.println(2);
c.countDown();
}
}).start();
//如果有某个解析sheet的线程比较慢,我们不可能一直让主线程等待,所以采用一个带时间参数的await方法如await(long timeout, TimeUnit unit)
//这个方法就是等待特定的时间后,就不会再阻塞当前线程
c.await();
System.out.println("3");
}
同步屏障CyclicBarrier的使用
/**
-
@Author: xwj
-
@Date: 2019/3/8 0008 17:13
-
@Version 1.0
-
同步屏障,他的作用是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障,屏障才会开门,所有被屏障阻截的线程才会继续运行
*/
public class CyclicBarrierTest {//构造方法CyclicBarrier(N)表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞
static CyclicBarrier c = new CyclicBarrier(2);public static void main(String[] args) {
new Thread(new Runnable() { public void run() { try { c.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(1); } }).start(); try { c.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(2); //因为主线程和子线程的调度都是由cpu决定的,2个线程都有可能先执行,所以会有2种结果。 //如果把new CyclicBarrier(2);改为new CyclicBarrier(3);则会一直等待下去,因为没有第3个线程到达屏障
}
}
/**
-
@Author: xwj
-
@Date: 2019/3/8 0008 17:25
-
@Version 1.0
-
CyclicBarrier提供了一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction)
-
用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
*/
public class CyclicBarrierTest2 {static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) {
new Thread(new Runnable() { public void run() { try { c.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(1); } }).start(); try { c.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(2);
}
//因为设置了拦截线程的数据是2,所以等代码中线程A和主线程都执行完之后,才会继续执行主线程。
static class A implements Runnable {public void run() { System.out.println(3); }
}
}
CyclicBarrier实际应用例子
/**
-
@Author: xwj
-
@Date: 2019/3/9 0009 11:09
-
@Version 1.0
-
CyclicBarrier的应用,多线程进行计算数据,最后合并计算结果的场景
-
有这样一个场景,用一个excel保存了用户所有银行流水,每个sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水
-
实现方法:先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后再用barrierAction用这些线程计算结果,计算出整个EXCEL的日均银行流水
*/
public class BandWaterService implements Runnable {/**
- 创建4个线程屏障,处理完之后执行当前类的run方法
*/
private CyclicBarrier c = new CyclicBarrier(4, this);
/**
- 假设只有4个sheet,所以只启动4个线程
*/
private ExecutorService executor = newFixedThreadPool(4);
/**
- 保存每个sheet计算出的银流结果
*/
private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
private void count() {
for (int i = 0; i < 4; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
//计算当前sheet的银流数据,计算代码省略
sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
//银流计算完成,插入一个屏障
try {
//让4个线程都到达屏障后等待,再执行主线程的汇总结果
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}@Override
public void run() {
int result = 0;
//汇总每个sheet计算出的结果
final Set<Map.Entry<String, Integer>> entries = sheetBankWaterCount.entrySet();
for (Map.Entry<String, Integer> sheet : entries) {
result += sheet.getValue();
}
//将结果输出
sheetBankWaterCount.put("result", result);
System.out.println(result);
}public static void main(String[] args) {
final BandWaterService service = new BandWaterService();
service.count();
}
} - 创建4个线程屏障,处理完之后执行当前类的run方法
下面来了解CyclicBarrier和CountDownLatch的区别
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以用getNumberWaiting获取线程阻塞的数量,isBorken方法来了解线程是否阻塞等。
控制并发线程数的Semaphore
Semaphore是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。可以理解为红绿灯
/**
-
@Author: xwj
-
@Date: 2019/3/9 0009 11:38
-
@Version 1.0
-
假如有一个需求,需要读取几万个文件的数据,所以我们启动几十个线程并发地读取,然后将它写入到数据库,但是数据库连接数只有10个
-
这个时候,我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。
-
这种情况,就可以使用Semaphore来做流量控制了。
*/
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;private static ExecutorService threadPool = newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
/**
- Semaphore的构造方法要求传入一个int的参数,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后release归还许可证。
- 还可以用tryAcquire()方法尝试获取许可证
- 它的用法类似于去图书馆借书,由于资源有限,每次只云允许最后10个人来看书,每个人去看书都要申请许可证,获取许可后才可以看书
- 看完书以后,就要归还许可证,然后其他人又可以去申请许可证。
- 我们来看它的一些方法
- s.getQueueLength();等待获取许可证的线程数
- s.availablePermits();许可证的可用数量
- s.hasQueuedThreads();是否有线程正在等待获取许可证
- @param args
*/
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
threadPool.shutdown();
}
}
网友评论