美文网首页
线程安全策略

线程安全策略

作者: 磊_5d71 | 来源:发表于2018-11-03 12:38 被阅读0次

    不可变对象(安全的)

    图片.png
    • String类通过final修饰,为不可变对象
      public final class String


      图片.png
    • 通过guawa jar包 Maps配置map
      Map<Integer,Integer> map = Maps.newHashMap();

    • final修饰案例

    package com.alan.concurrency.example.immutable;
    
    
    
    import com.alan.concurrency.annoations.NotThreadSafe;
    import com.google.common.collect.Maps;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Slf4j
    @NotThreadSafe
    public class ImmutableExample1 {
    
        private final static Integer a =1;
        private final static String b = "2";
        //final修饰引用类型时,只是不允许指向其他对象,但是其值是可以修改的
        private final static Map<Integer,Integer> map = Maps.newHashMap();
    
        static {
            map.put(1,2);
            map.put(3,4);
            map.put(5,6);
        }
    
    
        public static void main(String[] args) {
    
            map.put(1,3);
            log.info("{}",map.get(1));
        }
    
    }
    
    • Collections.unmodifiableMap实现
    package com.alan.concurrency.example.immutable;
    import com.alan.concurrency.annoations.NotThreadSafe;
    import com.alan.concurrency.annoations.ThreadSafe;
    import com.google.common.collect.Maps;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Collections;
    import java.util.Map;
    
    @Slf4j
    @ThreadSafe
    public class ImmutableExample2 {
        private  static Map<Integer,Integer> map = Maps.newHashMap();
    
        static {
            map.put(1,2);
            map.put(3,4);
            map.put(5,6);
            map = Collections.unmodifiableMap(map);
        }
        public static void main(String[] args) {
    
            map.put(1,3);
            log.info("{}",map.get(1));
        }
    }
    
    • Guava中的ImmutableList实现
    package com.alan.concurrency.example.immutable;
    
    import com.alan.concurrency.annoations.ThreadSafe;
    import com.google.common.collect.ImmutableList;
    import com.google.common.collect.ImmutableSet;
    
    
    @ThreadSafe
    public class ImmutableExample3 {
    
        private final static ImmutableList list = ImmutableList.of(1,2,3);
    
        private final static ImmutableSet set = ImmutableSet.copyOf(list);
    
        public static void main(String[] args) {
    
            list.add(1);
        }
    }
    
    图片.png
    • 全局的变量容易引起并发问题。
    • JDBC的connection就是,本身不是线程安全的,但是线程封闭在一个线程里。

    ThreadLocal线程封闭

    • springboot配置拦截器,interceptor
    package com.alan.concurrency;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.autoconfigure.web.servlet.WebMvcAutoConfiguration;
    import org.springframework.boot.web.servlet.FilterRegistrationBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
    import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
    import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
    
    @SpringBootApplication
    public class ConcurrencyApplication extends WebMvcConfigurerAdapter {
    
        public static void main(String[] args) {
            SpringApplication.run(ConcurrencyApplication.class, args);
        }
    
    
        //springboot在此配置拦截请求,而不是配置在web.xml中
        @Bean
        public FilterRegistrationBean httpFilter() {
            FilterRegistrationBean registrationBean = new FilterRegistrationBean();
            registrationBean.setFilter(new HttpFilter());
            registrationBean.addUrlPatterns("/threadLocal/*");
            return registrationBean;
        }
    
        @Override
        public void addInterceptors(InterceptorRegistry registry) {
            registry.addInterceptor(new HttpInterceptor()).addPathPatterns("/**");
        }
    
    }
    
    • RequestHolder
    package com.alan.concurrency.example.threadLocal;
    
    public class RequestHolder {
    
        private final static ThreadLocal<Long> requestHolder = new ThreadLocal<>();
    
    
        //通过拦截器在调用接口处理之前进行ID添加
        public static void add(Long id) {
            requestHolder.set(id);
        }
    
        public static Long getId() {
            return requestHolder.get();
        }
    
        //接口处理完成之后通过interceptor处理
        public static void remove() {
            requestHolder.remove();
        }
    }
    
    • ThreadLocalController
    package com.alan.concurrency.example.threadLocal;
    
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    @Controller
    @RequestMapping("/threadLocal")
    public class ThreadLocalController {
    
        @RequestMapping("/test")
        @ResponseBody
        public Long test() {
            return RequestHolder.getId();
        }
    }
    
    • HttpFilter
    package com.alan.concurrency;
    
    import com.alan.concurrency.example.threadLocal.RequestHolder;
    import com.alan.concurrency.example.threadLocal.RequestHolder;
    import lombok.extern.slf4j.Slf4j;
    
    import javax.servlet.Filter;
    import javax.servlet.FilterChain;
    import javax.servlet.FilterConfig;
    import javax.servlet.ServletException;
    import javax.servlet.ServletRequest;
    import javax.servlet.ServletResponse;
    import javax.servlet.http.HttpServletRequest;
    import java.io.IOException;
    
    @Slf4j
    public class HttpFilter implements Filter {
    
        @Override
        public void init(FilterConfig filterConfig) throws ServletException {
    
        }
    
        @Override
        public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
            HttpServletRequest request = (HttpServletRequest) servletRequest;
            log.info("do filter, {}, {}", Thread.currentThread().getId(), request.getServletPath());
            RequestHolder.add(Thread.currentThread().getId());
            filterChain.doFilter(servletRequest, servletResponse);
        }
    
        @Override
        public void destroy() {
    
        }
    }
    
    • HttpInterceptor
    package com.alan.concurrency;
    
    import com.alan.concurrency.example.threadLocal.RequestHolder;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    @Slf4j
    public class HttpInterceptor extends HandlerInterceptorAdapter {
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
            log.info("preHandle");
            return true;
        }
    
        @Override
        public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
            RequestHolder.remove();
            log.info("afterCompletion");
            return;
        }
    }
    

    线程不安全类与写法

    StringBuilder与StringBuffer
    • StringBuilder线程不安全
      一般在方法里面定义,堆栈封闭,只有一个线程可以使用,所以不用考虑线程不安全问题,没有加synchornized关键字,效率上会比较快。
    package com.alan.concurrency.example.commonUnsafe;
    
    import com.alan.concurrency.annoations.NotThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    
    @Slf4j
    @NotThreadSafe
    public class StringExample1 {
    
    
    
        //请求数1000
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static StringBuilder stringBuilder = new StringBuilder();
    
    
        private static void update(){
            stringBuilder.append(1);
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            //定义线程池ExecutorService接口
            ExecutorService executorService = Executors.newCachedThreadPool();
            //定义信号量,传入并发线程数 final修饰不允许重新赋值
            final Semaphore semaphore = new Semaphore(threadTotal);
            //定义计数器闭锁。传入请求总数
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
    
            for (int i = 0; i < clientTotal; i++) {
                //通过匿名内部类方式
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //semaphore控制并发数量
                            semaphore.acquire();
                            update();
                            semaphore.release();
                        } catch (InterruptedException e) {
                            log.error("exception",e);
                        }
                        //每次执行计数器减掉一个
                        countDownLatch.countDown();
                    }
    
                });
    
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",stringBuilder.length());
        }
    }
    
    • StringBuffer线程安全,因为方法里面基本都加上了synchronized关键字了
    package com.alan.concurrency.example.commonUnsafe;
    
    import com.alan.concurrency.annoations.NotThreadSafe;
    import com.alan.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    
    @Slf4j
    @ThreadSafe
    public class StringExample2 {
    
    
    
        //请求数1000
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static StringBuffer stringBuffer = new StringBuffer();
    
    
        private static void update(){
            stringBuffer.append(1);
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            //定义线程池ExecutorService接口
            ExecutorService executorService = Executors.newCachedThreadPool();
            //定义信号量,传入并发线程数 final修饰不允许重新赋值
            final Semaphore semaphore = new Semaphore(threadTotal);
            //定义计数器闭锁。传入请求总数
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
    
            for (int i = 0; i < clientTotal; i++) {
                //通过匿名内部类方式
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //semaphore控制并发数量
                            semaphore.acquire();
                            update();
                            semaphore.release();
                        } catch (InterruptedException e) {
                            log.error("exception",e);
                        }
                        //每次执行计数器减掉一个
                        countDownLatch.countDown();
                    }
    
                });
    
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",stringBuffer.length());
        }
    }
    

    SimpleDateFormat和JodaTime

    • 错误形式,声明了全局变量
    package com.alan.concurrency.example.commonUnsafe;
    
    import com.alan.concurrency.annoations.NotThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.text.SimpleDateFormat;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    @Slf4j
    @NotThreadSafe
    public class DateFormatExample1 {
    
        private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
        }
    
        private static void update() {
            try {
                simpleDateFormat.parse("20180208");
            } catch (Exception e) {
                log.error("parse exception", e);
            }
        }
    }
    
    • 局部变量中使用,线程安全的
    package com.alan.concurrency.example.commonUnsafe;
    
    
    import com.alan.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.text.SimpleDateFormat;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    @Slf4j
    @ThreadSafe
    public class DateFormatExample2 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
        }
    
        private static void update() {
            try {
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
                simpleDateFormat.parse("20180208");
            } catch (Exception e) {
                log.error("parse exception", e);
            }
        }
    }
    
    • joda.time建议使用
    package com.alan.concurrency.example.commonUnsafe;
    
    import com.alan.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    import org.joda.time.DateTime;
    import org.joda.time.format.DateTimeFormat;
    import org.joda.time.format.DateTimeFormatter;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    @Slf4j
    @ThreadSafe
    public class DateFormatExample3 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyyMMdd");
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
        }
    
        private static void update(int i) {
            log.info("{}, {}", i, DateTime.parse("20180208", dateTimeFormatter).toDate());
        }
    }
    

    ArrayList、HashSet、HashMap等collection集合

    • 一般在局部变量中堆栈封闭,不用考虑线程安全。但其本身是线程不安全的。
    • arraylist
    package com.alan.concurrency.example.commonUnsafe;
    
    import com.alan.concurrency.annoations.NotThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    
    @Slf4j
    @NotThreadSafe
    public class ArrayListExample {
    
    
    
        //请求数1000
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static List<Integer> list = new ArrayList<>();
    
    
        private static void update(){
            list.add(1);
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            //定义线程池ExecutorService接口
            ExecutorService executorService = Executors.newCachedThreadPool();
            //定义信号量,传入并发线程数 final修饰不允许重新赋值
            final Semaphore semaphore = new Semaphore(threadTotal);
            //定义计数器闭锁。传入请求总数
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
    
            for (int i = 0; i < clientTotal; i++) {
                //通过匿名内部类方式
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //semaphore控制并发数量
                            semaphore.acquire();
                            update();
                            semaphore.release();
                        } catch (InterruptedException e) {
                            log.error("exception",e);
                        }
                        //每次执行计数器减掉一个
                        countDownLatch.countDown();
                    }
    
                });
    
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",list.size());
        }
    }
    
    • 先检查再执行 也是线程不安全的 if(condition){} 如果是对线程共享或者条件和执行不是原子性的就要注意,使用线程安全手段。

    线程安全同步容器(使用 synchronized关键字)

    图片.png
    • 同步容器也有线程不安全的情况 stack继承vector
    package com.alan.concurrency.example.syncContainer;
    
    import com.alan.concurrency.annoations.NotThreadSafe;
    import com.alan.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Vector;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    
    @Slf4j
    @NotThreadSafe
    public class VectorExample2 {
    
        private static Vector<Integer> vector = new Vector<>();
    
        public static void main(String[] args) {
            
            
            while (true){
    
                for (int i = 0; i < 10; i++) {
                    vector.add(i);
                }
    
                Thread thread1 = new Thread(){
                    public void run(){
                        for (int i = 0; i < vector.size(); i++) {
                            vector.remove(i);
                        }
                    }
                };
    
                Thread thread2 = new Thread(){
                    public void run(){
                        for (int i = 0; i <vector.size(); i++) {
                            vector.get(i);
                        } 
                    }
                };
    
                thread1.start();
                thread2.start();
            }
            
        }
    }
    
    • HashTable同步容器
    package com.alan.concurrency.example.syncContainer;
    
    import com.alan.concurrency.annoations.NotThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.HashMap;
    import java.util.Hashtable;
    import java.util.Map;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    @Slf4j
    public class HashTableExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer, Integer> map = new Hashtable<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", map.size());
        }
    
        private static void update(int i) {
            map.put(i, i);
        }
    }
    
    • Collections.synchronized相关同步容器
      1、public static List<Integer> list = Collections.synchronizedList(new ArrayList<>()) ;
      2、 public static Set<Integer> set = Collections.synchronizedSet(new HashSet<>()) ;
      3、 private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>()) ;

    相关文章

      网友评论

          本文标题:线程安全策略

          本文链接:https://www.haomeiwen.com/subject/mxjcxqtx.html