不可变对象(安全的)
图片.png-
String类通过final修饰,为不可变对象
public final class String
图片.png -
通过guawa jar包 Maps配置map
Map<Integer,Integer> map = Maps.newHashMap(); -
final修饰案例
package com.alan.concurrency.example.immutable;
import com.alan.concurrency.annoations.NotThreadSafe;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@NotThreadSafe
public class ImmutableExample1 {
private final static Integer a =1;
private final static String b = "2";
//final修饰引用类型时,只是不允许指向其他对象,但是其值是可以修改的
private final static Map<Integer,Integer> map = Maps.newHashMap();
static {
map.put(1,2);
map.put(3,4);
map.put(5,6);
}
public static void main(String[] args) {
map.put(1,3);
log.info("{}",map.get(1));
}
}
- Collections.unmodifiableMap实现
package com.alan.concurrency.example.immutable;
import com.alan.concurrency.annoations.NotThreadSafe;
import com.alan.concurrency.annoations.ThreadSafe;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.Map;
@Slf4j
@ThreadSafe
public class ImmutableExample2 {
private static Map<Integer,Integer> map = Maps.newHashMap();
static {
map.put(1,2);
map.put(3,4);
map.put(5,6);
map = Collections.unmodifiableMap(map);
}
public static void main(String[] args) {
map.put(1,3);
log.info("{}",map.get(1));
}
}
- Guava中的ImmutableList实现
package com.alan.concurrency.example.immutable;
import com.alan.concurrency.annoations.ThreadSafe;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@ThreadSafe
public class ImmutableExample3 {
private final static ImmutableList list = ImmutableList.of(1,2,3);
private final static ImmutableSet set = ImmutableSet.copyOf(list);
public static void main(String[] args) {
list.add(1);
}
}
图片.png
- 全局的变量容易引起并发问题。
- JDBC的connection就是,本身不是线程安全的,但是线程封闭在一个线程里。
ThreadLocal线程封闭
- springboot配置拦截器,interceptor
package com.alan.concurrency;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.web.servlet.WebMvcAutoConfiguration;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
@SpringBootApplication
public class ConcurrencyApplication extends WebMvcConfigurerAdapter {
public static void main(String[] args) {
SpringApplication.run(ConcurrencyApplication.class, args);
}
//springboot在此配置拦截请求,而不是配置在web.xml中
@Bean
public FilterRegistrationBean httpFilter() {
FilterRegistrationBean registrationBean = new FilterRegistrationBean();
registrationBean.setFilter(new HttpFilter());
registrationBean.addUrlPatterns("/threadLocal/*");
return registrationBean;
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new HttpInterceptor()).addPathPatterns("/**");
}
}
- RequestHolder
package com.alan.concurrency.example.threadLocal;
public class RequestHolder {
private final static ThreadLocal<Long> requestHolder = new ThreadLocal<>();
//通过拦截器在调用接口处理之前进行ID添加
public static void add(Long id) {
requestHolder.set(id);
}
public static Long getId() {
return requestHolder.get();
}
//接口处理完成之后通过interceptor处理
public static void remove() {
requestHolder.remove();
}
}
- ThreadLocalController
package com.alan.concurrency.example.threadLocal;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@RequestMapping("/threadLocal")
public class ThreadLocalController {
@RequestMapping("/test")
@ResponseBody
public Long test() {
return RequestHolder.getId();
}
}
- HttpFilter
package com.alan.concurrency;
import com.alan.concurrency.example.threadLocal.RequestHolder;
import com.alan.concurrency.example.threadLocal.RequestHolder;
import lombok.extern.slf4j.Slf4j;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
@Slf4j
public class HttpFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) servletRequest;
log.info("do filter, {}, {}", Thread.currentThread().getId(), request.getServletPath());
RequestHolder.add(Thread.currentThread().getId());
filterChain.doFilter(servletRequest, servletResponse);
}
@Override
public void destroy() {
}
}
- HttpInterceptor
package com.alan.concurrency;
import com.alan.concurrency.example.threadLocal.RequestHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@Slf4j
public class HttpInterceptor extends HandlerInterceptorAdapter {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
log.info("preHandle");
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
RequestHolder.remove();
log.info("afterCompletion");
return;
}
}
线程不安全类与写法
StringBuilder与StringBuffer
- StringBuilder线程不安全
一般在方法里面定义,堆栈封闭,只有一个线程可以使用,所以不用考虑线程不安全问题,没有加synchornized关键字,效率上会比较快。
package com.alan.concurrency.example.commonUnsafe;
import com.alan.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
@NotThreadSafe
public class StringExample1 {
//请求数1000
public static int clientTotal = 5000;
//同时并发执行的线程数
public static int threadTotal = 200;
public static StringBuilder stringBuilder = new StringBuilder();
private static void update(){
stringBuilder.append(1);
}
public static void main(String[] args) throws InterruptedException {
//定义线程池ExecutorService接口
ExecutorService executorService = Executors.newCachedThreadPool();
//定义信号量,传入并发线程数 final修饰不允许重新赋值
final Semaphore semaphore = new Semaphore(threadTotal);
//定义计数器闭锁。传入请求总数
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
//通过匿名内部类方式
executorService.execute(new Runnable() {
@Override
public void run() {
try {
//semaphore控制并发数量
semaphore.acquire();
update();
semaphore.release();
} catch (InterruptedException e) {
log.error("exception",e);
}
//每次执行计数器减掉一个
countDownLatch.countDown();
}
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}",stringBuilder.length());
}
}
- StringBuffer线程安全,因为方法里面基本都加上了synchronized关键字了
package com.alan.concurrency.example.commonUnsafe;
import com.alan.concurrency.annoations.NotThreadSafe;
import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
@ThreadSafe
public class StringExample2 {
//请求数1000
public static int clientTotal = 5000;
//同时并发执行的线程数
public static int threadTotal = 200;
public static StringBuffer stringBuffer = new StringBuffer();
private static void update(){
stringBuffer.append(1);
}
public static void main(String[] args) throws InterruptedException {
//定义线程池ExecutorService接口
ExecutorService executorService = Executors.newCachedThreadPool();
//定义信号量,传入并发线程数 final修饰不允许重新赋值
final Semaphore semaphore = new Semaphore(threadTotal);
//定义计数器闭锁。传入请求总数
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
//通过匿名内部类方式
executorService.execute(new Runnable() {
@Override
public void run() {
try {
//semaphore控制并发数量
semaphore.acquire();
update();
semaphore.release();
} catch (InterruptedException e) {
log.error("exception",e);
}
//每次执行计数器减掉一个
countDownLatch.countDown();
}
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}",stringBuffer.length());
}
}
SimpleDateFormat和JodaTime
- 错误形式,声明了全局变量
package com.alan.concurrency.example.commonUnsafe;
import com.alan.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.text.SimpleDateFormat;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
@NotThreadSafe
public class DateFormatExample1 {
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
update();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
}
private static void update() {
try {
simpleDateFormat.parse("20180208");
} catch (Exception e) {
log.error("parse exception", e);
}
}
}
- 局部变量中使用,线程安全的
package com.alan.concurrency.example.commonUnsafe;
import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.text.SimpleDateFormat;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
@ThreadSafe
public class DateFormatExample2 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
update();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
}
private static void update() {
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
simpleDateFormat.parse("20180208");
} catch (Exception e) {
log.error("parse exception", e);
}
}
}
- joda.time建议使用
package com.alan.concurrency.example.commonUnsafe;
import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
@ThreadSafe
public class DateFormatExample3 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyyMMdd");
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
}
private static void update(int i) {
log.info("{}, {}", i, DateTime.parse("20180208", dateTimeFormatter).toDate());
}
}
ArrayList、HashSet、HashMap等collection集合
- 一般在局部变量中堆栈封闭,不用考虑线程安全。但其本身是线程不安全的。
- arraylist
package com.alan.concurrency.example.commonUnsafe;
import com.alan.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
@NotThreadSafe
public class ArrayListExample {
//请求数1000
public static int clientTotal = 5000;
//同时并发执行的线程数
public static int threadTotal = 200;
public static List<Integer> list = new ArrayList<>();
private static void update(){
list.add(1);
}
public static void main(String[] args) throws InterruptedException {
//定义线程池ExecutorService接口
ExecutorService executorService = Executors.newCachedThreadPool();
//定义信号量,传入并发线程数 final修饰不允许重新赋值
final Semaphore semaphore = new Semaphore(threadTotal);
//定义计数器闭锁。传入请求总数
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
//通过匿名内部类方式
executorService.execute(new Runnable() {
@Override
public void run() {
try {
//semaphore控制并发数量
semaphore.acquire();
update();
semaphore.release();
} catch (InterruptedException e) {
log.error("exception",e);
}
//每次执行计数器减掉一个
countDownLatch.countDown();
}
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}",list.size());
}
}
- 先检查再执行 也是线程不安全的 if(condition){} 如果是对线程共享或者条件和执行不是原子性的就要注意,使用线程安全手段。
线程安全同步容器(使用 synchronized关键字)
图片.png- 同步容器也有线程不安全的情况 stack继承vector
package com.alan.concurrency.example.syncContainer;
import com.alan.concurrency.annoations.NotThreadSafe;
import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
@NotThreadSafe
public class VectorExample2 {
private static Vector<Integer> vector = new Vector<>();
public static void main(String[] args) {
while (true){
for (int i = 0; i < 10; i++) {
vector.add(i);
}
Thread thread1 = new Thread(){
public void run(){
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
};
Thread thread2 = new Thread(){
public void run(){
for (int i = 0; i <vector.size(); i++) {
vector.get(i);
}
}
};
thread1.start();
thread2.start();
}
}
}
- HashTable同步容器
package com.alan.concurrency.example.syncContainer;
import com.alan.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class HashTableExample {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static Map<Integer, Integer> map = new Hashtable<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", map.size());
}
private static void update(int i) {
map.put(i, i);
}
}
- Collections.synchronized相关同步容器
1、public static List<Integer> list = Collections.synchronizedList(new ArrayList<>()) ;
2、 public static Set<Integer> set = Collections.synchronizedSet(new HashSet<>()) ;
3、 private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>()) ;
网友评论