美文网首页
分享工程算法在业务开发中的应用(二)并发编程中的数据结构

分享工程算法在业务开发中的应用(二)并发编程中的数据结构

作者: 周群力 | 来源:发表于2020-09-29 09:36 被阅读0次

    之前写过篇分享,结合工作内容聊了聊写业务代码时会用到的工程算法;这次继续本系列的分享,结合今年新写的代码,聊一聊并发编程中会用到的数据结构。

    一、虽然无锁但是并发安全的数据结构

    写java时,常用的并发容器大部分都是基于锁的,最基础的是使用粗粒度锁,如Collections.synchronizedCollection(Collection<T> c),如ArrayBlockingQueue,优点是简单好实现,但缺点是粒度粗导致并发吞吐量不高;优化后的是基于细粒度锁的容器,如ConcurrentHashMap,里面塞满了各种优化(结合只用一个字母命名的变量、层层大括号嵌套,简直让人没法点进去看)
    基于锁的并发容器易于实现,但也有些缺点,包括:
    线程调度带来的性能损失;
    活跃度风险,如死锁,如持有锁的线程挂起导致其他线程不能继续处理,优先级倒置等。
    相比之下,一些虽然无锁但是并发安全的数据结构在性能上更好,同时也能避免死锁等活跃度问题。

    1.1. 无锁并发读

    虽然无锁但是并发读安全的数据结构有:

    1.1.1. 不可变数据结构

    如果不可变数据结构的读操作不改变内部状态,那么稍加改造就能拿来当并发容器。这里说稍加改造,是因为需要考虑安全发布,如果不可变数据结构内部的变量没有加volatile、final关键字,很可能初始化时候写进去的内容没法被其他线程看见,因此需要我们人为加上final、volatile,保证安全发布。

    典型例子是项目中很多配置数据放在guava的ImmutableMap里,允许并发读;
    再一个例子是,上游传来的数据有A,B,C三个字段,其中C可能为空(只给A、B),或者B、C都为空(只给A),下游需要靠这3个字段唯一标识一个业务类型。
    这个场景用Trie写比较优雅,于是实现了一个不可变Trie

    1.1.2. Copy-on-Write数据结构

    不可变数据结构虽好,但有很多情况不适用,比如确实有很小的可能改动,比如限于框架的生命周期,不好用不可变数据结构。例如项目中开发了给其他系统用的sdk,sdk中提供了回调钩子,业务系统只需实现回调钩子,当sdk收到消息后即可回调通知业务系统。在这个场景中,系统启动后由spring把实现类注入到sdk的容器中,此时容器得等spring注入完了才“不可变”,在注入之前是可变的,因此不能用不可变容器。这种场景用Copy-on-write数据结构即可。
    另外值得一提的是,jdk里的copy-on-write数据结构在copy时比较粗暴、直接把所有数据全复制了一遍,更好的实现是用可持久化数据结构,每次写操作只复制一小部分数据、没必要复制全家。我在轮自己的copy-on-write可持久化数据结构库,没写完就不贴了。

    1.2. 无锁并发读写

    既然能做到无锁并发读,我们自然会想到,能否做到无锁并发读写,并且并发安全?
    理论上管这类算法叫非阻塞(non-blocking)算法,意思是我们嫌弃基于锁的算法隔离性太差,当持有锁的线程死锁/挂起等情况发生时所有等待锁的线程都会阻塞住没法前进,于是我们就想:能不能搞出一类隔离性好的算法,一个线程出问题了不会导致别的线程阻塞?

    An algorithm is called non‐blocking if
    failure or suspension of any thread cannot cause failure or suspension of another thread

    按照提供的保证由弱到强,非阻塞算法可以分为以下几类,感兴趣可以看看https://www.youtube.com/watch?v=DdAV7891-OAhttps://www.baeldung.com/lock-free-programming 这里不细展开。

    • Obstruction-Free

    a thread is guaranteed to proceed if all other threads are suspended.

    • Lock-Free
      无论何时,无论出现了啥情况,总有一个线程能继续处理。
      有了Lock-Free级别的保证,我们不用担心一个死锁导致全家阻塞的情况,但是还是有可能出现饥饿的情况:比如某线程一直在等待cpu调度执行,但cpu偏心,一直调度别的线程不调度它。

    • Wait-Free
      每个线程都能得到保证,保证在有限步骤后能够继续执行。因此也避免了饥饿情况

    我们在日常开发时常用的是Lock-Free级别的容器,它们往往通过cas等原语实现,比如juc包下面的ConcurrentLinkedQueue,ConcurrentLinkedDeque,比如ConcurrentSkipListMap里也有用到(我没细看),又比如Disruptor等。相关讨论见https://www.quora.com/Are-there-any-non-blocking-lock-free-concurrent-data-structure-libraries-for-Java
    基于cas实现的lock-free数据结构好处是在并发度不高的时候吞吐量高、线程隔离性好,但缺点是并发度非常高的时候,自旋cas会浪费CPU,吞吐量反而会降低。也因此,jdk虽然已经提供了一个基于cas的Random类,后来又加了个ThreadLocalRandom,在并发度高的情况下减少overhead。

    1.3. 避免共享:ThreadLocal

    没有共享就没有并发安全问题,ThreadLocal很基础这里就不细讲了。
    一个例子是写业务代码时候经常用到SimpleDateFormat,可以用ThreadLocal实现一个对象池,免得频繁创建对象。

    二、能够实现延时任务/定时任务的数据结构

    项目中遇到了一个问题:日志量太大,磁盘很快就满了。于是想针对日志做一些优化,其中一个优化点是对于性能监控的日志,原先每次调用打一条,可以优化成按秒/按分钟聚合日志,每秒或每分钟打一条,记录一分钟调用次数、平均耗时。那么如何实现这个功能呢?

    2.1. 延迟队列DelayQueue

    刚好在我的RPC框架福报RPC里用延迟队列实现过这个功能,原理就是每次要打某个key的性能监控日志时,就生成当前key当前秒的任务,把任务塞到延迟队列里等下一秒打印;同时把任务塞到hashMap里作为索引,map的key为"key@时间戳"(比如"key123@2020-09-09 00:00:00"),当同一秒内再打同一个key的日志时,根据key从索引中找出来任务,更改任务里的调用次数、调用时长。
    详细代码见这里

    2.2. ScheduledExecutorService

    ScheduledExecutorService可以看成worker thread线程池+延迟队列。普通ExecutorService线程池里的线程是不断从阻塞队列里取任务,而ScheduledExecutorService里的线程是不断从DelayQueue里取任务。
    用ScheduledExecutorService实现这个功能和直接用延迟队列实现本质上没区别。

    2.3. 时间轮

    时间轮可以拿来执行定时任务,具体介绍可以看看这篇文章
    时间轮可以看成支持按槽“批处理”的DelayQueue,相比DelayQueue有如下优势:

    • O(logN) vs O(1)

    • delayQueue每take一个任务后,都要走await/signal重新线程调度;而分槽后就能支持按槽批处理,槽内元素一口气批处理掉,无需每次完成一个任务就重新线程调度

    • 锁消耗:DelayQueue每个操作都得用一把大锁;

      [图片上传中...(image-2c0039-1601365646856-0)]

      而时间轮无锁(单个消费者线程,而生产者写入时单独写一个MPSQueue里,该队列无锁读写,消费者将任务从queue转移到wheel里)见https://albenw.github.io/posts/ec8df8c/

    通过时间轮实现日志聚合场景的话,顺带还保证了容错:消费者出问题的话也不至于堵队列堵生产者,因为就算消费者堵了,生产者生产数据的时候直接把时间轮中过于旧的槽覆盖掉即可,队列永远不会堵。

    另外值得一提的是,如果是想在分布式的环境中使用时间轮,可以基于中心化存储实现时间轮,每个槽对应一条key-value数据,由server轮询中心化存储,方案细节见解密“达达-京东到家”的订单即时派发技术原理和实践。不过粗看了一下,文中这种方案存在可用性问题。

    三、图

    3.1. 任务集合可以抽象成图

    并发编程中,经常面对一些可以并发执行的任务集合,我们可以把这些任务集合抽象成图,以便简化编程。
    例如某pipeline中,任务之间没有互相先后顺序依赖,可以抽象成以下的任务关系图,图的点代表任务,边代表依赖关系。对于这种任务之间没有任何依赖关系的图,直接把所有任务放到线程池ExecutorService执行即可。


    image.png

    java的线程池ExecutorService适合提交互相之间没有依赖的任务,如果任务之间有依赖,就不能简单的调用ExecutorService.submit()了,可能出现死锁、饥饿等情况。例如下图线性pipeline,图中有4个任务,彼此之间互相依赖,如果把4个任务扔进一个只有3个线程的ExecutorService,那么可能出现任务2、任务3、任务4占用了3个线程,但都需要等待前面的任务完成;而任务1还在ExecutorService的任务队列里放着,永远没有空闲线程能把他取出来执行,因此整个线程池就会永远阻塞、无法前进。


    image.png

    如果任务之间的依赖很简单,靠CompletionService+future.get()可以搞定,比如下图的非线性pipeline;但是写起来很麻烦,每一个阶段需要拿到前面阶段的future对象,感兴趣可以尝试写一下,个人感觉不好写。


    image.png

    如果是分治任务,可以使用Fork/join,任务图如下(网上抄的图);


    image.png

    而如果任务之间的依赖图不规则、有点复杂,比如下图,该如何写任务编排呢?

    image.png
    有很多编程模式可以选择,包括以下(详细可以看并发编程模型并发编程时,如何写复杂的任务编排?,这里不细聊)
    A. 水平切分:Parallel Workers
    每个线程串行跑全套任务,不管任务图多复杂,我们就老老实实的单线程按顺序一个一个执行,通过添加更多线程来提高并发度
    B. 垂直切分:pipeline模式
    类似于CPU流水线,每个任务由独立的线程(池)执行,任务与任务之间靠queue异步通信,个人理解go中的csp模型就是这种方案
    C. 生产消费模式
    D. Actor模式
    E. 函数式并行

    办法很多,但写起来还是有些麻烦的,有没有更好的办法?

    3.2. 图线程池

    为了简化上述任务图的并发编程,我写了个图线程池,见https://github.com/seeflood/Advanced-Concurrent/blob/master/src/test/java/io/github/seeflood/advanced/concurrent/executor/dag/DAGTaskExecutorImplTest.java
    假设有一组做菜任务,任务图如下,其中烧水/洗菜/听歌可以并行,烧水/洗菜/听歌都做好了才能切菜,切菜完成了才能炒菜

    image.png
    那么使用图线程池执行这些任务,只需要构造好DAG,扔到线程池里执行即可,代码如下:
            DAGTaskGroup<String> dag = new DAGTaskGroup<>();
            Callable<String> one_one = () -> handle("烧水");
            Callable<String> one_two = () -> handle("洗菜");
            Callable<String> one_three = () -> handle("听歌");
            Callable<String> two_one = () -> handle("切菜");
            Callable<String> three_one = () -> handle("炒菜");
            dag.link(one_one, two_one);
            dag.link(one_two, two_one);
            dag.link(one_three, two_one);
            dag.link(two_one, three_one);
            dag.link(one_three, three_one);
            DAGTaskExecutorImpl executor = new DAGTaskExecutorImpl(Executors.newFixedThreadPool(3));
            Map<Callable, String> submit = executor.submit(dag);
            submit.forEach((k, v) -> System.out.println("result map value:" + v));
    
    

    (这个框架只是个周末灵机一动的demo,没有加超时、容错之类的功能,还不好用于生产)

    后续

    这个系列的分享我已经构思好了几个主题,可以一个季度写一篇,嘻嘻

    时隔一星期之后发现傻屌简书丢了部分数据,简单修改了一下,懒得补救了。

    相关文章

      网友评论

          本文标题:分享工程算法在业务开发中的应用(二)并发编程中的数据结构

          本文链接:https://www.haomeiwen.com/subject/xhzxxktx.html