重点1:常见错误
ArrayList在并发下的问题:
eg.
package org.chain.current.demo.exceptiondemo;
import java.util.ArrayList;
public class ArrayListIssue {
static class ArrayListThread implements Runnable {
private ArrayList<Integer> list;
ArrayListThread(ArrayList<Integer> lsit) {
this.list = lsit;
}
@Override
public void run() {
for (int i = 0; i < 1000000; i++) {
list.add(i);
}
}
}
public static void main(String[] args) throws InterruptedException {
ArrayList<Integer> list = new ArrayList<>(10);
Thread t1 = new Thread(new ArrayListThread(list));
Thread t2 = new Thread(new ArrayListThread(list));
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("process end...,the list size is " + list.size());
}
}
HashMap在并发下的问题:
eg.
package org.chain.current.demo.exceptiondemo;
import java.util.HashMap;
public class HashMapIssue {
static class HashMapThread implements Runnable {
int startNum;
HashMap<String, String> hashMap;
HashMapThread(int startNum, HashMap<String, String> hashMap) {
this.startNum = startNum;
this.hashMap = hashMap;
}
@Override
public void run() {
for (int i = startNum; i < 10000; i += 2) {
hashMap.put(String.valueOf(i), "test");
}
}
}
public static void main(String[] args) throws InterruptedException {
HashMap<String, String> hashMap = new HashMap<>(16);
Thread t1 = new Thread(new HashMapThread(0, hashMap));
Thread t2 = new Thread(new HashMapThread(1, hashMap));
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("process end the map size is " + hashMap.size());
}
}
ps:JDK8对HashMap内部做了大规模的调整,不会出现程序无法结束的问题
错误的加锁
eg.
package org.chain.current.demo.exceptiondemo;
/**
* 锁应该加在正确的对象上
*/
public class LockObjectDemo {
static class BadLockOnInteger implements Runnable {
Integer integer = 0;
@Override
public void run() {
for (int j = 0; j < 10000; j++) {
synchronized (integer) {
integer++;
}
// synchronized (this) {
// integer++;
// }
}
}
}
public static void main(String[] args) throws InterruptedException {
BadLockOnInteger badLockOnInteger = new BadLockOnInteger();
Thread t1 = new Thread(badLockOnInteger);
Thread t2 = new Thread(badLockOnInteger);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("process is end the integer is " + badLockOnInteger.integer);
}
}
重点二:并行模式
并行流水线
如果我们要计算(B+C)*B/2,必须按照算数优先级的顺序依次计算,无法并行。
可以借鉴生产流水线的思想,把计算过程拆分为:
1.p1=B+C
2.p2=p1*B
3.p3=p2/2
eg.
package org.chain.current.demo.parallelpattern;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 流水线demo
* 计算(B+C)*B/2
*/
public class FlowLineDemo {
private static BlockingQueue<Msg> qOne = new LinkedBlockingQueue<>();
private static BlockingQueue<Msg> qTwo = new LinkedBlockingQueue<>();
private static BlockingQueue<Msg> qThree = new LinkedBlockingQueue<>();
static class Msg {
float a;
float b;
StringBuilder log = new StringBuilder();
}
static class ProcessOne implements Runnable {
@Override
public void run() {
while (true) {
try {
Msg temp = qOne.take();
float a = temp.a;
float b = temp.b;
temp.a = (a + b);
temp.b = a;
temp.log.append("P1:B=").append(a).append(",C=").append(b).append(",(B+C)=").append(temp.a).append("<<- ->>");
qTwo.put(temp);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class ProcessTwo implements Runnable {
@Override
public void run() {
while (true) {
try {
Msg temp = qTwo.take();
float a = temp.a;
float b = temp.b;
temp.a = a * b;
temp.log.append("P2:(B+C)=").append(a).append(",B=").append(b).append(",(B+C)*B=").append(temp.a).append("<<- ->>");
qThree.put(temp);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class ProcessThree implements Runnable {
@Override
public void run() {
while (true) {
try {
Msg temp = qThree.take();
float a = temp.a / 2;
temp.a = a;
temp.log.append("P3:(B+C)*B=").append(a).append(",(B+C)*B/2=").append(temp.a);
System.out.println("flow end," + temp.log);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
Random random = new Random();
Msg msg = new Msg();
msg.a = random.nextInt(10);
msg.b = random.nextInt(10);
try {
qOne.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Producer());
Thread t2 = new Thread(new ProcessOne());
Thread t3 = new Thread(new ProcessTwo());
Thread t4 = new Thread(new ProcessThree());
t1.start();
t2.start();
t3.start();
t4.start();
t1.join();
t2.join();
t3.join();
t4.join();
}
}
并行搜索
比如在一个数组中查询某个数,可以将数组分段利用多线程并行搜索
eg
package org.chain.current.demo.producerandconsumer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 并行搜索demo
*/
public class parallelSearch {
private static int[] arr;
private static AtomicInteger result = new AtomicInteger(-1);
private static Integer search(int val, int start, int end) {
for (int i = start; i < end; i++) {
if (result.get() != -1) {
return result.get();
}
if (val == arr[i]) {
result.compareAndSet(-1, i);
return result.get();
}
}
return -1;
}
static class SearchTask implements Callable<Integer> {
private int val;
private int start;
private int end;
public SearchTask(int val, int start, int end) {
this.val = val;
this.start = start;
this.end = end;
}
@Override
public Integer call() throws Exception {
System.out.println("ThreadName:" + Thread.currentThread().getName() + ",start:" + start + ",end:" + end);
return search(val, start, end);
}
}
private static int PSearch(int val, int threadNum) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
List<Future<Integer>> futures = new ArrayList<>();
int step = arr.length / threadNum;
for (int i = 0; i < arr.length; i += step) {
int end = i + step;
if ((end + step) > arr.length) {
end = arr.length;
futures.add(executorService.submit(new SearchTask(val, i, end)));
break;
}
futures.add(executorService.submit(new SearchTask(val, i, end)));
}
executorService.shutdown();
for (Future<Integer> future : futures) {
if (future.get() >= 0) {
return future.get();
}
}
return -1;
}
private static void initArray(int i) {
arr = new int[i];
Random random = new Random();
for (int j = 0; j < i; j++) {
arr[j] = random.nextInt(10);
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
initArray(10);
System.out.println("arr:" + Arrays.toString(arr));
System.out.println("result:" + PSearch(5, 3));
}
}
并行排序
回顾了基础的排序算法,冒泡排序,奇偶交换排序,插入排序,希尔排序。
利用并发实现奇偶交换排序和希尔排序。
eg1:奇偶交换排序并发版
package org.chain.current.demo.parallelpattern.sort;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 奇偶交换排序并行模式
*/
public class ParallelOddEvenSortDemo {
private static int[] arr;
private static ExecutorService pool = Executors.newCachedThreadPool();
private static int exchFlag = 1;
private static synchronized int getExchFlag() {
return exchFlag;
}
private static synchronized void setExchFlag(int v) {
exchFlag = v;
}
private static class ExchangeTask implements Runnable {
int i;
CountDownLatch cdl;
ExchangeTask(int i, CountDownLatch cdl) {
this.i = i;
this.cdl = cdl;
}
@Override
public void run() {
if (arr[i] > arr[i + 1]) {
arr[i] = arr[i] ^ arr[i + 1];
arr[i + 1] = arr[i] ^ arr[i + 1];
arr[i] = arr[i] ^ arr[i + 1];
setExchFlag(1);
}
cdl.countDown();
}
}
private static void parallelOddEvenSort() throws InterruptedException {
int start = 0;
while (getExchFlag() == 1 || start == 1) {
setExchFlag(0);
CountDownLatch cdl = new CountDownLatch(arr.length / 2 - (arr.length % 2 == 0 ? start : 0));
for (int i = start; i < arr.length - 1; i += 2) {
pool.submit(new ExchangeTask(i, cdl));
}
cdl.await();
start = start == 0 ? 1 : 0;
}
}
public static void main(String[] args) throws InterruptedException {
arr = DemoUtil.init(10, 100);
System.out.println("the origin arr is:" + Arrays.toString(arr));
parallelOddEvenSort();
System.out.println("the sorted arr is:" + Arrays.toString(arr));
pool.shutdown();
}
}
eg2:希尔排序并发版本
package org.chain.current.demo.parallelpattern.sort;
import java.util.Arrays;
/**
* 希尔排序串行demo
*/
public class ShellSortDemo {
public static void main(String[] args) {
int[] arr = DemoUtil.init(19, 100);
System.out.println("origin array:" + Arrays.toString(arr));
shellSort(arr);
System.out.println("sorted array:" + Arrays.toString(arr));
}
public static void shellSort(int[] arr) {
//计算最大h
int h = 1;
while (h <= arr.length / 3) {
h = 3 * h + 1;
}
while (h > 0) {
for (int i = h; i < arr.length; i++) {
if (arr[i] < arr[i - h]) {
int tmp = arr[i];
int j = i - h;
while (j >= 0 && arr[j] > tmp) {
arr[j + h] = arr[j];
j -= h;
}
arr[j + h] = tmp;
}
}
h = (h - 1) / 3;
}
}
}
网友评论