美文网首页
Concurrent Java 03 - 线程安全共享策略

Concurrent Java 03 - 线程安全共享策略

作者: 阿武_Accat | 来源:发表于2019-02-03 22:26 被阅读0次

    怎么才能安全地共享对象?

    上章介绍对象的发布,这章将介绍一下发布的对象如何让其安全地共享。
    内容: 不可变对象, 线程封闭, 同步容器

    不可变对象

    条件 final关键字

    修饰变量: 基本类型和引用类型

    package com.accat.concurrency.example.immutable;
    
    import com.google.common.collect.Maps;
    import com.accat.concurrency.annoations.NotThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Map;
    
    @Slf4j
    @NotThreadSafe
    public class ImmutableExample1 {
    
        private final static Integer a = 1;
        private final static String b = "2";
        private final static Map<Integer, Integer> map = Maps.newHashMap();
    
        static {
            map.put(1, 2);
            map.put(3, 4);
            map.put(5, 6);
        }
    
        public static void main(String[] args) {
    //        a = 2;
    //        b = "3";
    //        map = Maps.newHashMap();
            map.put(1, 3);
            log.info("{}", map.get(1));
        }
    
        private void test(final int a) {
    //        a = 1;
        }
    }
    

    不可增加删除的数据集(List, Set, Map)


    Collections中提供的不可变集合
    package com.accat.concurrency.example.immutable;
    
    import com.google.common.collect.Maps;
    import com.accat.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Collections;
    import java.util.Map;
    
    @Slf4j
    @ThreadSafe
    public class ImmutableExample2 {
    
        private static Map<Integer, Integer> map = Maps.newHashMap();
    
        static {
            map.put(1, 2);
            map.put(3, 4);
            map.put(5, 6);
            map = Collections.unmodifiableMap(map);
        }
    
        public static void main(String[] args) {
            map.put(1, 3);
            log.info("{}", map.get(1));
        }
    
    }
    
    package com.accat.concurrency.example.immutable;
    
    import com.google.common.collect.ImmutableList;
    import com.google.common.collect.ImmutableMap;
    import com.google.common.collect.ImmutableSet;
    import com.accat.concurrency.annoations.ThreadSafe;
    
    @ThreadSafe
    public class ImmutableExample3 {
    
        private final static ImmutableList<Integer> list = ImmutableList.of(1, 2, 3);
    
        private final static ImmutableSet set = ImmutableSet.copyOf(list);
    
        private final static ImmutableMap<Integer, Integer> map = ImmutableMap.of(1, 2, 3, 4);
    
        private final static ImmutableMap<Integer, Integer> map2 = ImmutableMap.<Integer, Integer>builder()
                .put(1, 2).put(3, 4).put(5, 6).build();
    
        public static void main(String[] args) {
            System.out.println(map2.get(3));
        }
    }
    

    线程封闭

    把对象封装到一个线程里,只有这个线程能看到该对象,从而实现线程安全。

    重点 使用ThreadLocal封装一个全局的请求参数对象
    package com.accat.concurrency.example.threadLocal;
    
    public class RequestHolder {
    
        private final static ThreadLocal<Map> requestHolder = new ThreadLocal<>();
    
        public static void add(Map parameterMap) {
            requestHolder.set(parameterMap);
        }
    
        public static Long getParameterMap() {
            return requestHolder.get();
        }
    
        public static void remove() {
            requestHolder.remove();
        }
    }
    
    package com.accat.concurrency;
    
    import com.accat.concurrency.example.threadLocal.RequestHolder;
    import lombok.extern.slf4j.Slf4j;
    
    import javax.servlet.Filter;
    import javax.servlet.FilterChain;
    import javax.servlet.FilterConfig;
    import javax.servlet.ServletException;
    import javax.servlet.ServletRequest;
    import javax.servlet.ServletResponse;
    import javax.servlet.http.HttpServletRequest;
    import java.io.IOException;
    
    @Slf4j
    public class HttpFilter implements Filter {
    
        @Override
        public void init(FilterConfig filterConfig) throws ServletException {
    
        }
    
        @Override
        public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
            HttpServletRequest request = (HttpServletRequest) servletRequest;
    //        log.info("do filter, {}, {}", Thread.currentThread().getId(), request.getServletPath());
            RequestHolder.add(Thread.currentThread().getParameterMap());
            filterChain.doFilter(servletRequest, servletResponse);
        }
    
        @Override
        public void destroy() {
    
        }
    }
    
    package com.accat.concurrency;
    
    import com.accat.concurrency.example.threadLocal.RequestHolder;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    @Slf4j
    public class HttpInterceptor extends HandlerInterceptorAdapter {
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
    //        log.info("preHandle");
            return true;
        }
    
        @Override
        public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
            RequestHolder.remove();
    //        log.info("afterCompletion");
            return;
        }
    }
    
    package com.accat.concurrency.example.threadLocal;
    
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    @Controller
    @RequestMapping("/threadLocal")
    public class ThreadLocalController {
    
        @RequestMapping("/test")
        @ResponseBody
        public Object test() {
            return RequestHolder.getParameterMap();
        }
    }
    

    同步容器

    当对象即为可变共享对象,又没有使用线程封闭技术,那么就需要将对象转变为线程安全类。如:
    StringBuilder -> StringBuffer
    java.text.SimpleDateFormat -> org.joda.time.DateTimeFormatter
    ArrayList -> Vector
    HashMap -> HashTable(key, value 不能为null)
    Collections.synchronizedXXX(List, Set, Map)

    private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList());
    private static Set<Integer> set = Collections.synchronizedSet(Sets.newHashSet());
    private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());
    

    并发容器

    并发容器

    并发容器在JDK提供的一组并发集合包,JUC。

    CopyOnWriteArrayList读写分离,最终一致性,另外开辟空间, 读不加锁,写加锁

    package com.accat.concurrency.example.concurrent;
    
    import com.accat.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.List;
    import java.util.concurrent.*;
    
    @Slf4j
    @ThreadSafe
    public class CopyOnWriteArrayListExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static List<Integer> list = new CopyOnWriteArrayList<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", list.size());
        }
    
        private static void update(int i) {
            list.add(i);
        }
    }
    
    /**
         * {@inheritDoc}
         *
         * @throws IndexOutOfBoundsException {@inheritDoc}
         */
        public E get(int index) {
            return get(getArray(), index);
        }
    
        /**
         * Replaces the element at the specified position in this list with the
         * specified element.
         *
         * @throws IndexOutOfBoundsException {@inheritDoc}
         */
        public E set(int index, E element) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                Object[] elements = getArray();
                E oldValue = get(elements, index);
    
                if (oldValue != element) {
                    int len = elements.length;
                    Object[] newElements = Arrays.copyOf(elements, len);
                    newElements[index] = element;
                    setArray(newElements);
                } else {
                    // Not quite a no-op; ensures volatile write semantics
                    setArray(elements);
                }
                return oldValue;
            } finally {
                lock.unlock();
            }
        }
        /**
         * Appends the specified element to the end of this list.
         *
         * @param e element to be appended to this list
         * @return {@code true} (as specified by {@link Collection#add})
         */
        public boolean add(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                Object[] elements = getArray();
                int len = elements.length;
                Object[] newElements = Arrays.copyOf(elements, len + 1);
                newElements[len] = e;
                setArray(newElements);
                return true;
            } finally {
                lock.unlock();
            }
        }
    
    CopyOnWriteArrayList

    相似的结构有CopyOnWriteSet

    package com.accat.concurrency.example.concurrent;
    
    import com.accat.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Set;
    import java.util.concurrent.ConcurrentSkipListSet;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    @Slf4j
    @ThreadSafe
    public class ConcurrentSkipListSetExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Set<Integer> set = new ConcurrentSkipListSet<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", set.size());
        }
    
        private static void update(int i) {
            set.add(i);
        }
    }
    
    ConcurrentSkipListSet
    package com.accat.concurrency.example.concurrent;
    
    import com.accat.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    @Slf4j
    @ThreadSafe
    public class ConcurrentHashMapExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer, Integer> map = new ConcurrentHashMap<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", map.size());
        }
    
        private static void update(int i) {
            map.put(i, i);
        }
    }
    
    package com.mmall.concurrency.example.concurrent;
    
    import com.mmall.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Map;
    import java.util.concurrent.*;
    
    @Slf4j
    @ThreadSafe
    public class ConcurrentSkipListMapExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", map.size());
        }
    
        private static void update(int i) {
            map.put(i, i);
        }
    }
    

    ConcurrentHashMapConcurrentSkipListMap的区别在于
    前者性能更好,4个线程,1.6万数据量下,前者的性能是后者的4倍。
    但是后者是有序的,前者做不到有序。

    总结1
    总结2

    相关文章

      网友评论

          本文标题:Concurrent Java 03 - 线程安全共享策略

          本文链接:https://www.haomeiwen.com/subject/dhghsqtx.html