美文网首页Clojure技术集合
函数式内功心法-05: 无锁高并发技术之STM海市蜃楼

函数式内功心法-05: 无锁高并发技术之STM海市蜃楼

作者: larluo_罗浩 | 来源:发表于2018-10-31 14:51 被阅读161次

    不知道大家听说过没有,函数式天生擅长高并发。

    并发concurrency与并行parallel的区别相信大家都懂。并发是不同的逻辑相互打配合,并行则是相同的逻辑一起跑加快速度。当然不完全准确,思想上差不多是这个意思。

    目前函数式语言高并发的技术有很多种,
    比如以Erlang/Scala为代表的Actor,
    还有Clojure/Go为代表的CSP,
    以及目前要介绍的Haskell/Clojure为代表的STM.
    其它技术将会在后文一一介绍.

    让我们从此刻开始拥抱STM吧!

    1. STM是什么?
    2. 深入理解STM
      a. STM到底是怎么做到的呢?
      b. 进一步控制STM!
    3. STM的haskell源码解析
      a. STM世界
      b. TVar变量
      c. retry与orElse流程控制
      d. rts运行时接口清单
    4. STM的rts运行时分析
      a. PrimOps介绍
      b. rts运行时基本概念
      c. STM数据类型-StgTSO, StgTRecHeader , StgTVar
      d. Tvar操作-newTVar#, readTVar#, writeTVar#
      e. STM操作-atomically#, raiseIO#, catchSTM#, retry#, catchRetry#

    一. STM是什么?

    函数式值是不可变的,但是如果两个线程需要通讯,则需要特殊的变量来通讯。如果需要单个变量通讯,系统大部分提供了原子类型。如果涉及多个变量通讯,则问题比较复杂了。

    在用锁的解决方案中,则需要保证锁的顺序一致性,不然则会出死锁。锁的资源管理也是非常头痛的,因为代码分散在各个地方,一处更改对其它均有影响。

    STM则简单很多。我们把需要改变的事务变量读过来,在自己构建的环境里面处理它。处理完了之后进行乾坤大挪移,最终把所有更新过的虚拟事务变量一起写回去。如果处理失败,自然还是原来的值。其它线程也只能看到处理前或处理后的值。所以达到了all-or-nothing的效果。

    对于用户接口来说,则只需要新建事务变量,读写事务变量,最终提交整个事务逻辑即可,其它的什么也不用做,实在是快乐得狠。。。

    我们简单看一下代码:

    newtype Gold = Gold Int
        deriving (Eq, Ord, Show, Num)
    type Balance = TVar Gold
    
    basicTransfer :: Gold -> Balance -> Balance -> STM ()
    basicTransfer qty fromBal toBal = do
      fromQty <- readTVar fromBal
      toQty   <- readTVar toBal
      writeTVar fromBal (fromQty - qty)
      writeTVar toBal   (toQty + qty)
    
    transferTest = do
      alice <- newTVar (12 :: Gold)
      bob   <- newTVar 4
      basicTransfer 3 alice bob
      liftM2 (,) (readTVar alice) (readTVar bob)
    
    atomically transferTest
    

    a. 我们创建了两个事务变量TVar,
    b. 接着调用basicTransfer在STM世界里面构建转账事务逻辑
    c. 最后调用atomically运行transferTest事务逻辑。

    我看可以看到,只要在STM世界里进行逻辑处理,不管有多少个TVAR都是支持的,我们只需要newTVar后调用readTVar及writeTVar进行变量的修改,其它一切都不需要管。简单方便极了,好用好用好用!

    二. 深入理解STM

    官方的文档可以参考: https://ghc.haskell.org/trac/ghc/wiki/Commentary/Rts/STM
    https://en.wikipedia.org/wiki/Concurrent_Haskell

    这里请允许我用自己粗俗的再次转述一下:

    1. STM到底是怎么做到的呢?

    a. 每个线程创建自己的TRec事务记录


    b. 每次对不同的Tvar操作时,会记录在TRec的TRecEntry里面。


    c. 当提交STM事务时, 线程检查TRec里面TVar的值是否被其它线程提交改变。如果没有其它线程动过,则由系统底层全部锁住批量改写。如果其它线程动过这些TVar,则再次以这些新的tvar值重新运行事务。

    2. 进一步控制STM!

    前面的事务控制是自动发生的,当然有些时候我们需要stm的特殊服务。
    比如我们想进行转账,可是转账的时候TVar的余额不足。我们能不能监控TVar,如果TVar的值发生改变了,我们再次重试本次事务呢? 或者失败了回滚后去运行另一个事务呢?
    这里面就是STM的两大流程控制: retry与orElse

    a. 我们来看一下retry的控制代码

    transferBlocking :: Int -> TVar Int -> TVar Int -> STM ()
    transferBlocking v a b = do
        x <- readTVar a
        y <- readTVar b
        if x < v
          then retry
          else do
                  writeTVar a (x - v)
                  writeTVar b (y + v)
    

    我们可以看到如果x<v的时候,事务不能发生,调用了retry进行了阻塞。阻塞了什么时候唤醒呢?当阻塞的时候,线程会将自己加入到TVar的watchQueue队列里面,当其它线程修改了TVar,则会唤醒线程重新运行事务。
    不用写代码,就有这么高级的功能,是不是很震憾!

    b. orElse就更加高级了,直接作用在retry上面.

    如果前面的retry没有成功,我们不让它阻塞等待唤醒,我们跑另一个事务!连retry也被控制了,可怕,太可怕!

    transferChoice :: Int -> TVar Int -> TVar Int -> TVar Int -> STM ()
    transferChoice v a a' b = do
        transferBlocking v a b `orElse` transferBlocking v a' b
    

    orElse很简单,将前面retry事务作为一个嵌套事务,如果失败则回滚继续其它事务。当然如果是其它线程修改数据造成验证失败,则不算是前面的事务失败,还是要重新再提交事务的喽。

    发生了这么多有趣的事情,是不是忍受不了看源码的诱惑了。
    好的,遵命!

    三. STM的haskell源码解析

    STM已经作为haskell的核心部分,所以直接在运行时与base模块里面了。
    在这节我们关注haskell接口部分。
    前面讲的几个部分,我们再来回顾一下:
    a. STM世界
    b. retry与orElse流程控制
    c. TVar变量
    d. atomically事务提交

    haskell stm的代码均在GHC.Conc.Sync模块里面:
    https://github.com/ghc/ghc/blob/master/libraries/base/GHC/Conc/Sync.hs

    1. STM世界

    a. 先看STM的类型定义

    newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
    
    unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
    unSTM (STM a) = a
    
    atomically :: STM a -> IO a
    atomically (STM m) = IO (\s -> (atomically# m) s )
    
    

    可以看到STM世界跟IO世界非常像,都是状态传递。
    当atomically运行STM事务逻辑时,传送到了IO世界.

    b. 接着看stm是如何传递复合的

    instance Applicative STM where
      {-# INLINE pure #-}
      {-# INLINE (*>) #-}
      {-# INLINE liftA2 #-}
      pure x = returnSTM x
      (<*>) = ap
      liftA2 = liftM2
      m *> k = thenSTM m k
    
    instance  Monad STM  where
        {-# INLINE (>>=)  #-}
        m >>= k     = bindSTM m k
        (>>) = (*>)
    
    bindSTM :: STM a -> (a -> STM b) -> STM b
    bindSTM (STM m) k = STM ( \s ->
      case m s of
        (# new_s, a #) -> unSTM (k a) new_s
      )
    
    thenSTM :: STM a -> STM b -> STM b
    thenSTM (STM m) k = STM ( \s ->
      case m s of
        (# new_s, _ #) -> unSTM k new_s
      )
    
    instance Alternative STM where
      empty = retry
      (<|>) = orElse
    
    

    这里的传递复合过程比较简单。
    a. >>=为代表的bindSTM, 运行完后接着运行,
    >>为其快捷方式,不绑定直接thenSTM,忽略绑定参数运行
    b. <|>为代表的如果发生retry则运行另外一个
    <|>的逻辑即为后文的orElse on retry逻辑

    c. 最后看STM是如何构造的

    returnSTM :: a -> STM a
    returnSTM x = STM (\s -> (# s, x #))
    
    newTVar :: a -> STM (TVar a)
    newTVar val = STM $ \s1# ->
        case newTVar# val s1# of
             (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
    
    readTVar :: TVar a -> STM a
    readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
    
    writeTVar :: TVar a -> a -> STM ()
    writeTVar (TVar tvar#) val = STM $ \s1# ->
        case writeTVar# tvar# val s1# of
             s2# -> (# s2#, () #)
    
    throwSTM :: Exception e => e -> STM a
    throwSTM e = STM $ raiseIO# (toException e)
    
    

    STM构造方法分为三种,一种常量构造,一种是通过TVar构建,还有一种是异常接入。
    TVar是最常用的形式,所以我们接着进一步了解

    2. TVar变量

    data TVar a = TVar (TVar# RealWorld a)
    
    instance Eq (TVar a) where
            (TVar tvar1#) == (TVar tvar2#) = isTrue# (sameTVar# tvar1# tvar2#)
    
    newTVar :: a -> STM (TVar a)
    newTVar val = STM $ \s1# ->
        case newTVar# val s1# of
             (# s2#, tvar# #) -> (# s2#, TVar tvar# #)
    
    readTVar :: TVar a -> STM a
    readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
    
    writeTVar :: TVar a -> a -> STM ()
    writeTVar (TVar tvar#) val = STM $ \s1# ->
        case writeTVar# tvar# val s1# of
             s2# -> (# s2#, () #)
    

    TVar数据类型本质上是TVar#原始类型的封装,TVar#由运行时提供。
    TVar#包括新建,读取,写入三种方式调用.
    分别用由运行时的newTVar#, readTVar#, writeTVar#提供实现

    3. retry与orElse流程控制

    throwSTM :: Exception e => e -> STM a
    throwSTM e = STM $ raiseIO# (toException e)
    
    catchSTM :: Exception e => STM a -> (e -> STM a) -> STM a
    catchSTM (STM m) handler = STM $ catchSTM# m handler'
        where
          handler' e = case fromException e of
                         Just e' -> unSTM (handler e')
                         Nothing -> raiseIO# e
    
    retry :: STM a
    retry = STM $ \s# -> retry# s#
    
    orElse :: STM a -> STM a -> STM a
    orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
    
    

    这里的流程也包含了异常处理.
    a. 首先throwSTM 调用运行时接口raiseIO#封装异常到STM世界
    接着catchSTM调用运行时catchSTM#处理异常
    b. 而retry则由运行时的retry# 方法提供
    c. orElse则是对retry进行catch,由运行时的catchRetry#方法提供

    4. 运行时接口清单

    现在一切都指向运行时了,我们重新回顾一下。
    TVar类型-TVar#
    Tvar操作-newTVar#, readTVar#, writeTVar#
    STM操作-atomically#, raiseIO#, catchSTM#, retry#, catchRetry#

    到了最激动人心的时刻了,进军STM运行时!

    四. STM的运行时分析

    1. PrimOps介绍

    rts基本的操作在PrimOps里面实现,
    由genprimopcode工具动态生成GHC.Prim模块。

    a. C语言层

    b. cmm层

    cmm实现GHC.Prim方法, 对接底层c接口
    https://github.com/ghc/ghc/blob/master/rts/PrimOps.cmm
    包含如下函数:
    stg_newTVarzh -> newTVar#
    stg_readTVarzh -> readTVar#
    stg_writeTVarzh -> writeTVar#
    stg_atomicallyzh -> atomically#
    stg_catchSTMzh -> catchSTM#
    stg_retryzh -> retry#
    stg_catchRetryzh -> catchRetry#

    c . 接口定义层

    定义rts运行时底层调用方法列表:
    https://github.com/ghc/ghc/blob/master/compiler/prelude/primops.txt.pp

    d. Haskell层

    动态生成GHC.Prim 模块供haskell调用,
    接口映射过程包含一些字符转换将#转为zh

    usage: genprimopcode command < primops.txt > ...\n"
    

    https://github.com/ghc/ghc/blob/master/utils/genprimopcode/Main.hs

                   -- Constructors
                   encode_ch '('  = "ZL"    -- Needed for things like (,), and (->)
                   encode_ch ')'  = "ZR"    -- For symmetry with (
                   encode_ch '['  = "ZM"
                   encode_ch ']'  = "ZN"
                   encode_ch ':'  = "ZC"
                   encode_ch 'Z'  = "ZZ"
    
                   -- Variables
                   encode_ch 'z'  = "zz"
                   encode_ch '&'  = "za"
                   encode_ch '|'  = "zb"
                   encode_ch '^'  = "zc"
                   encode_ch '$'  = "zd"
                   encode_ch '='  = "ze"
                   encode_ch '>'  = "zg"
                   encode_ch '#'  = "zh"
                   encode_ch '.'  = "zi"
                   encode_ch '<'  = "zl"
                   encode_ch '-'  = "zm"
                   encode_ch '!'  = "zn"
                   encode_ch '+'  = "zp"
                   encode_ch '\'' = "zq"
                   encode_ch '\\' = "zr"
                   encode_ch '/'  = "zs"
                   encode_ch '*'  = "zt"
                   encode_ch '_'  = "zu"
                   encode_ch '%'  = "zv"
                   encode_ch c    = 'z' : shows (ord c) "U"
    

    2. rts运行时基本概念

    前面介绍了STM涉及c, cmm, haskell三种接口。现在就来详细说明一下。

    a. Haskell源码编译过程

                                                                               +---------+
                                                             LLVM backend /--->| LLVM IR |--\
                                                                          |    +---------+  | LLVM
                                                                          |                 v
     +------------+ Desugar  +------+ STGify  +-----+ CodeGen  +-----+    |  NCG    +----------+
     | Parse tree |--------->| Core |-------->| STG |--------->| C-- |----+-------->| Assembly |
     +------------+          +------+         +-----+          +-----+    |         +----------+
                                                                          |            ^
                                                                          |     +---+  | GCC
                                                                C backend \---->| C |--/
                                                                                +---+
    
    

    a. haskell代码首先desugar之后变成haskell core.
    b. 接着经过stg转换生成cmm
    c. 最后与不同的c语言接口进行链接(LLVM, GCC, Assembly)


    b. RTS的线程状态保存在TSO(Thread State Object)里面

    包含了StgStack_堆栈与StgTRecHeader事务记录。
    其中StgStack_用于闭包调用,StgTRecHeader用于TVar事务处理。



    TSO对象定义在TSO.h文件中:

    typedef struct StgTSO_ {
        StgHeader               header;
    
        struct StgTSO_*         _link;
    
        struct StgTSO_*         global_link;    // Links threads on the
                                                // generation->threads lists
    
        struct StgStack_       *stackobj;
    
        StgWord16               what_next;      // Values defined in Constants.h
        StgWord16               why_blocked;    // Values defined in Constants.h
        StgWord32               flags;          // Values defined in Constants.h
        StgTSOBlockInfo         block_info;
        StgThreadID             id;
        StgWord32               saved_errno;
        StgWord32               dirty;          /* non-zero => dirty */
        struct InCall_*         bound;
        struct Capability_*     cap;
    
        struct StgTRecHeader_ * trec;       /* STM transaction record */
    
        struct MessageThrowTo_ * blocked_exceptions;
    
        struct StgBlockingQueue_ *bq;
    
        StgInt64  alloc_limit;     /* in bytes */
    
        StgWord32  tot_stack_size;
    
    #if defined(TICKY_TICKY)
        /* TICKY-specific stuff would go here. */
    #endif
    #if defined(PROFILING)
        StgTSOProfInfo prof;
    #endif
    #if defined(mingw32_HOST_OS)
        StgWord32 saved_winerror;
    #endif
    
    } *StgTSOPtr; // StgTSO defined in rts/Types.h
    
    

    https://github.com/ghc/ghc/blob/master/includes/rts/storage/TSO.h

    c. STG寄存器指针

    我们可以看到CurrentTSO寄存器指向了当前线程TSO地址
    https://github.com/ghc/ghc/blob/master/compiler/cmm/CmmExpr.hs

    d. haskell的闭包调用与非函数式不一样。

    haskell的闭包调用是通过操纵Stack指针入口进行调用的。而不是采用c语言传统的后入先出的入栈方式。

    更多的详情请参照如下文档:
    https://www.cs.tufts.edu/~nr/cs257/archive/simon-peyton-jones/eval-apply-jfp.pdf
    https://github.com/ghc/ghc/blob/master/compiler/cmm/CmmParse.y

    3. STM数据类型-StgTSO, StgTRecHeader , StgTVar

    我们首先来看STM相关的数据结构。
    前面提到有两种数据结构,一种是事务变量 StgTVar,另一种是线程自己的事务记录StgTRecHeader,记录在StgTSO线程状态对象里面,StgTSO在上一节中有介绍。

    STM相关的数据结构定义在如下文件中:
    https://github.com/ghc/ghc/blob/master/includes/rts/storage/Closures.h

    typedef struct {
      StgHeader                  header;
      StgClosure                *volatile current_value;
      StgTVarWatchQueue         *volatile first_watch_queue_entry;
      StgInt                     volatile num_updates;
    } StgTVar;
    
    
    struct StgTRecHeader_ {
      StgHeader                  header;
      struct StgTRecHeader_     *enclosing_trec;
      StgTRecChunk              *current_chunk;
      TRecState                  state;
    };
    
    typedef struct StgTRecChunk_ {
      StgHeader                  header;
      struct StgTRecChunk_      *prev_chunk;
      StgWord                    next_entry_idx;
      TRecEntry                  entries[TREC_CHUNK_NUM_ENTRIES];
    } StgTRecChunk;
    
    typedef struct {
      StgTVar                   *tvar;
      StgClosure                *expected_value;
      StgClosure                *new_value;
    #if defined(THREADED_RTS)
      StgInt                     num_updates;
    #endif
    } TRecEntry;
    
    

    我们可以看:
    a. StgTVar包含了一个StgTVarWatchQueue队列,用于阻塞唤醒操作,还有一个current_value用于存储当前值。

    b. StgTRecHeader_包含了StgTRecChunk,每个StgTRecChunk包含TREC_CHUNK_NUM_ENTRIES个TRecEntry。StgTRecHeader_可以通过enclosing_trec指向下一个StgTRecHeader_完成容量的扩充。

    c. TRecEntry里面包含了StgTVar,以及expected_value与new_value值用于对比及记录修改状态。详情请见后文

    有了基础的数据结构,我们接着看具体的操作过程。

    4. Tvar操作-newTVar#, readTVar#, writeTVar#

    a. 首先看新建事务变量的过程: newTVar#

    stg_newTVarzh (P_ init)
    {
        W_ tv;
    
        ALLOC_PRIM_P (SIZEOF_StgTVar, stg_newTVarzh, init);
    
        tv = Hp - SIZEOF_StgTVar + WDS(1);
        SET_HDR (tv, stg_TVAR_DIRTY_info, CCCS);
    
        StgTVar_current_value(tv) = init;
        StgTVar_first_watch_queue_entry(tv) = stg_END_STM_WATCH_QUEUE_closure;
        StgTVar_num_updates(tv) = 0;
    
        return (tv);
    }
    

    stg_newTVarzh接受一个init值。
    调用ALLOC_PRIM_P分配空间后,操作堆指针赋值给tv。
    接着初始化StgTVar事务变量tv:

    • 将current_value设为 init值
    • 将first_watch_queue_entry置为stg_END_STM_WATCH_QUEUE_closure
    • 将StgTVar_num_updates置为0

    b. 接着看变量读取函数readTVar#

    stg_readTVarzh (P_ tvar)
    {
        P_ trec;
        P_ result;
    
        // Call to stmReadTVar may allocate
        MAYBE_GC_P (stg_readTVarzh, tvar);
    
        trec = StgTSO_trec(CurrentTSO);
        ("ptr" result) = ccall stmReadTVar(MyCapability() "ptr", trec "ptr",
                                           tvar "ptr");
        return (result);
    }
    
    StgClosure *stmReadTVar(Capability *cap,
                            StgTRecHeader *trec,
                            StgTVar *tvar) {
      StgTRecHeader *entry_in = NULL;
      StgClosure *result = NULL;
      TRecEntry *entry = NULL;
      TRACE("%p : stmReadTVar(%p)", trec, tvar);
      ASSERT(trec != NO_TREC);
      ASSERT(trec -> state == TREC_ACTIVE ||
             trec -> state == TREC_CONDEMNED);
    
      entry = get_entry_for(trec, tvar, &entry_in);
    
      if (entry != NULL) {
        if (entry_in == trec) {
          // Entry found in our trec
          result = entry -> new_value;
        } else {
          // Entry found in another trec
          TRecEntry *new_entry = get_new_entry(cap, trec);
          new_entry -> tvar = tvar;
          new_entry -> expected_value = entry -> expected_value;
          new_entry -> new_value = entry -> new_value;
          result = new_entry -> new_value;
        }
      } else {
        // No entry found
        StgClosure *current_value = read_current_value(trec, tvar);
        TRecEntry *new_entry = get_new_entry(cap, trec);
        new_entry -> tvar = tvar;
        new_entry -> expected_value = current_value;
        new_entry -> new_value = current_value;
        result = current_value;
      }
    
      TRACE("%p : stmReadTVar(%p)=%p", trec, tvar, result);
      return result;
    }
    
    
    

    这里使用ccall调用stmReadTVar底层c函数

    c. 最后不得不提到的写入函数writeTVar#

    stg_writeTVarzh (P_ tvar,     /* :: TVar a */
                     P_ new_value /* :: a      */)
    {
        W_ trec;
    
        // Call to stmWriteTVar may allocate
        MAYBE_GC_PP (stg_writeTVarzh, tvar, new_value);
    
        trec = StgTSO_trec(CurrentTSO);
        ccall stmWriteTVar(MyCapability() "ptr", trec "ptr", tvar "ptr",
                           new_value "ptr");
        return ();
    }
    void stmWriteTVar(Capability *cap,
                      StgTRecHeader *trec,
                      StgTVar *tvar,
                      StgClosure *new_value) {
    
      StgTRecHeader *entry_in = NULL;
      TRecEntry *entry = NULL;
      TRACE("%p : stmWriteTVar(%p, %p)", trec, tvar, new_value);
      ASSERT(trec != NO_TREC);
      ASSERT(trec -> state == TREC_ACTIVE ||
             trec -> state == TREC_CONDEMNED);
    
      entry = get_entry_for(trec, tvar, &entry_in);
    
      if (entry != NULL) {
        if (entry_in == trec) {
          // Entry found in our trec
          entry -> new_value = new_value;
        } else {
          // Entry found in another trec
          TRecEntry *new_entry = get_new_entry(cap, trec);
          new_entry -> tvar = tvar;
          new_entry -> expected_value = entry -> expected_value;
          new_entry -> new_value = new_value;
        }
      } else {
        // No entry found
        StgClosure *current_value = read_current_value(trec, tvar);
        TRecEntry *new_entry = get_new_entry(cap, trec);
        new_entry -> tvar = tvar;
        new_entry -> expected_value = current_value;
        new_entry -> new_value = new_value;
      }
    
      TRACE("%p : stmWriteTVar done", trec);
    }
    
    

    5. STM操作-atomically#, raiseIO#, catchSTM#, retry#, catchRetry#

    a. 提交事务函数之atomically#

    stg_atomicallyzh (P_ stm)
    {
        P_ old_trec;
        P_ new_trec;
        P_ code, frame_result;
    
        // stmStartTransaction may allocate
        MAYBE_GC_P(stg_atomicallyzh, stm);
    
        STK_CHK_GEN();
    
        old_trec = StgTSO_trec(CurrentTSO);
    
        /* Nested transactions are not allowed; raise an exception */
        if (old_trec != NO_TREC) {
            jump stg_raisezh(base_ControlziExceptionziBase_nestedAtomically_closure);
        }
    
        code = stm;
        frame_result = NO_TREC;
    
        /* Start the memory transcation */
        ("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr", old_trec "ptr");
        StgTSO_trec(CurrentTSO) = new_trec;
    
        jump stg_ap_v_fast
            (ATOMICALLY_FRAME_FIELDS(,,stg_atomically_frame_info, CCCS, 0,
                                     code,frame_result))
            (stm);
    }
    
    INFO_TABLE_RET(stg_atomically_frame, ATOMICALLY_FRAME,
                   // layout of the frame, and bind the field names
                   ATOMICALLY_FRAME_FIELDS(W_,P_,
                                           info_ptr, p1, p2,
                                           code,
                                           frame_result))
        return (P_ result) // value returned to the frame
    {
        W_ valid;
        gcptr trec, outer, q;
    
        trec   = StgTSO_trec(CurrentTSO);
        outer  = StgTRecHeader_enclosing_trec(trec);
    
        /* Back at the atomically frame */
        frame_result = result;
    
        /* try to commit */
        (valid) = ccall stmCommitTransaction(MyCapability() "ptr", trec "ptr");
        if (valid != 0) {
            /* Transaction was valid: commit succeeded */
            StgTSO_trec(CurrentTSO) = NO_TREC;
            return (frame_result);
        } else {
            /* Transaction was not valid: try again */
            ("ptr" trec) = ccall stmStartTransaction(MyCapability() "ptr",
                                                     NO_TREC "ptr");
            StgTSO_trec(CurrentTSO) = trec;
    
            jump stg_ap_v_fast
                // push the StgAtomicallyFrame again: the code generator is
                // clever enough to only assign the fields that have changed.
                (ATOMICALLY_FRAME_FIELDS(,,info_ptr,p1,p2,
                                         code,frame_result))
                (code);
        }
    }
    
    StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
      StgInt64 max_commits_at_start = max_commits;
    
      TRACE("%p : stmCommitTransaction()", trec);
      ASSERT(trec != NO_TREC);
    
      lock_stm(trec);
    
      ASSERT(trec -> enclosing_trec == NO_TREC);
      ASSERT((trec -> state == TREC_ACTIVE) ||
             (trec -> state == TREC_CONDEMNED));
    
      // Use a read-phase (i.e. don't lock TVars we've read but not updated) if
      // the configuration lets us use a read phase.
    
      bool result = validate_and_acquire_ownership(cap, trec, (!config_use_read_phase), true);
      if (result) {
        // We now know that all the updated locations hold their expected values.
        ASSERT(trec -> state == TREC_ACTIVE);
    
        if (config_use_read_phase) {
          StgInt64 max_commits_at_end;
          StgInt64 max_concurrent_commits;
          TRACE("%p : doing read check", trec);
          result = check_read_only(trec);
          TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed");
    
          max_commits_at_end = max_commits;
          max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
                                    (n_capabilities * TOKEN_BATCH_SIZE));
          if (((max_concurrent_commits >> 32) > 0) || shake()) {
            result = false;
          }
        }
    
        if (result) {
          // We now know that all of the read-only locations held their expected values
          // at the end of the call to validate_and_acquire_ownership.  This forms the
          // linearization point of the commit.
    
          // Make the updates required by the transaction.
          FOR_EACH_ENTRY(trec, e, {
            StgTVar *s;
            s = e -> tvar;
            if ((!config_use_read_phase) || (e -> new_value != e -> expected_value)) {
              // Either the entry is an update or we're not using a read phase:
              // write the value back to the TVar, unlocking it if necessary.
    
              ACQ_ASSERT(tvar_is_locked(s, trec));
              TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s);
              unpark_waiters_on(cap,s);
              IF_STM_FG_LOCKS({
                s -> num_updates ++;
              });
              unlock_tvar(cap, trec, s, e -> new_value, true);
            }
            ACQ_ASSERT(!tvar_is_locked(s, trec));
          });
        } else {
            revert_ownership(cap, trec, false);
        }
      }
    
      unlock_stm(trec);
    
      free_stg_trec_header(cap, trec);
    
      TRACE("%p : stmCommitTransaction()=%d", trec, result);
    
      return result;
    }
    
    static StgBool validate_and_acquire_ownership (Capability *cap,
                                                   StgTRecHeader *trec,
                                                   int acquire_all,
                                                   int retain_ownership) {
      StgBool result;
    
      if (shake()) {
        TRACE("%p : shake, pretending trec is invalid when it may not be", trec);
        return false;
      }
    
      ASSERT((trec -> state == TREC_ACTIVE) ||
             (trec -> state == TREC_WAITING) ||
             (trec -> state == TREC_CONDEMNED));
      result = !((trec -> state) == TREC_CONDEMNED);
      if (result) {
        FOR_EACH_ENTRY(trec, e, {
          StgTVar *s;
          s = e -> tvar;
          if (acquire_all || entry_is_update(e)) {
            TRACE("%p : trying to acquire %p", trec, s);
            if (!cond_lock_tvar(trec, s, e -> expected_value)) {
              TRACE("%p : failed to acquire %p", trec, s);
              result = false;
              BREAK_FOR_EACH;
            }
          } else {
            ASSERT(config_use_read_phase);
            IF_STM_FG_LOCKS({
              TRACE("%p : will need to check %p", trec, s);
              if (s -> current_value != e -> expected_value) {
                TRACE("%p : doesn't match", trec);
                result = false;
                BREAK_FOR_EACH;
              }
              e -> num_updates = s -> num_updates;
              if (s -> current_value != e -> expected_value) {
                TRACE("%p : doesn't match (race)", trec);
                result = false;
                BREAK_FOR_EACH;
              } else {
                TRACE("%p : need to check version %ld", trec, e -> num_updates);
              }
            });
          }
        });
      }
    
      if ((!result) || (!retain_ownership)) {
          revert_ownership(cap, trec, acquire_all);
      }
    
      return result;
    }
    
    static StgBool cond_lock_tvar(StgTRecHeader *trec,
                                  StgTVar *s,
                                  StgClosure *expected) {
      StgClosure *result;
      StgWord w;
      TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
      w = cas((void *)&(s -> current_value), (StgWord)expected, (StgWord)trec);
      result = (StgClosure *)w;
      TRACE("%p : %s", trec, result ? "success" : "failure");
      return (result == expected);
    }
    
    EXTERN_INLINE StgWord
    cas(StgVolatilePtr p, StgWord o, StgWord n)
    {
        return __sync_val_compare_and_swap(p, o, n);
    }
    
    void
    tryWakeupThread (Capability *cap, StgTSO *tso)
    {
        traceEventThreadWakeup (cap, tso, tso->cap->no);
    
    #if defined(THREADED_RTS)
        if (tso->cap != cap)
        {
            MessageWakeup *msg;
            msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
            SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
            msg->tso = tso;
            sendMessage(cap, tso->cap, (Message*)msg);
            debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
                          (W_)tso->id, tso->cap->no);
            return;
        }
    #endif
    
        switch (tso->why_blocked)
        {
        case BlockedOnMVar:
        case BlockedOnMVarRead:
        {
            if (tso->_link == END_TSO_QUEUE) {
                tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
                goto unblock;
            } else {
                return;
            }
        }
    
        case BlockedOnMsgThrowTo:
        {
            const StgInfoTable *i;
    
            i = lockClosure(tso->block_info.closure);
            unlockClosure(tso->block_info.closure, i);
            if (i != &stg_MSG_NULL_info) {
                debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
                              (W_)tso->id, tso->block_info.throwto->header.info);
                return;
            }
    
            // remove the block frame from the stack
            ASSERT(tso->stackobj->sp[0] == (StgWord)&stg_block_throwto_info);
            tso->stackobj->sp += 3;
            goto unblock;
        }
    
        case BlockedOnSTM:
            tso->block_info.closure = &stg_STM_AWOKEN_closure;
            goto unblock;
    
        case BlockedOnBlackHole:
        case ThreadMigrating:
            goto unblock;
    
        default:
            // otherwise, do nothing
            return;
        }
    
    unblock:
        // just run the thread now, if the BH is not really available,
        // we'll block again.
        tso->why_blocked = NotBlocked;
        appendToRunQueue(cap,tso);
    
        // We used to set the context switch flag here, which would
        // trigger a context switch a short time in the future (at the end
        // of the current nursery block).  The idea is that we have just
        // woken up a thread, so we may need to load-balance and migrate
        // threads to other CPUs.  On the other hand, setting the context
        // switch flag here unfairly penalises the current thread by
        // yielding its time slice too early.
        //
        // The synthetic benchmark nofib/smp/chan can be used to show the
        // difference quite clearly.
    
        // cap->context_switch = 1;
    }
    
    

    b. 捕获异常函数之 catchSTM#

    stg_catchSTMzh (P_ code    /* :: STM a */,
                    P_ handler /* :: Exception -> STM a */)
    {
        STK_CHK_GEN();
    
        /* Start a nested transaction to run the body of the try block in */
        W_ cur_trec;
        W_ new_trec;
        cur_trec = StgTSO_trec(CurrentTSO);
        ("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr",
                                                     cur_trec "ptr");
        StgTSO_trec(CurrentTSO) = new_trec;
    
        jump stg_ap_v_fast
            (CATCH_STM_FRAME_FIELDS(,,stg_catch_stm_frame_info, CCCS, 0,
                                    code, handler))
            (code);
    }
    
    INFO_TABLE_RET(stg_catch_stm_frame, CATCH_STM_FRAME,
                   // layout of the frame, and bind the field names
                   CATCH_STM_FRAME_FIELDS(W_,P_,info_ptr,p1,p2,code,handler))
        return (P_ ret)
    {
        W_ r, trec, outer;
    
        trec = StgTSO_trec(CurrentTSO);
        outer  = StgTRecHeader_enclosing_trec(trec);
        (r) = ccall stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr");
        if (r != 0) {
            /* Commit succeeded */
            StgTSO_trec(CurrentTSO) = outer;
            return (ret);
        } else {
            /* Commit failed */
            W_ new_trec;
            ("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr", outer "ptr");
            StgTSO_trec(CurrentTSO) = new_trec;
    
            jump stg_ap_v_fast
                (CATCH_STM_FRAME_FIELDS(,,info_ptr,p1,p2,code,handler))
                (code);
        }
    }
    
    StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
      StgTRecHeader *et;
      ASSERT(trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
      TRACE("%p : stmCommitNestedTransaction() into %p", trec, trec -> enclosing_trec);
      ASSERT((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
    
      lock_stm(trec);
    
      et = trec -> enclosing_trec;
      bool result = validate_and_acquire_ownership(cap, trec, (!config_use_read_phase), true);
      if (result) {
        // We now know that all the updated locations hold their expected values.
    
        if (config_use_read_phase) {
          TRACE("%p : doing read check", trec);
          result = check_read_only(trec);
        }
        if (result) {
          // We now know that all of the read-only locations held their expected values
          // at the end of the call to validate_and_acquire_ownership.  This forms the
          // linearization point of the commit.
    
          TRACE("%p : read-check succeeded", trec);
          FOR_EACH_ENTRY(trec, e, {
            // Merge each entry into the enclosing transaction record, release all
            // locks.
    
            StgTVar *s;
            s = e -> tvar;
            if (entry_is_update(e)) {
                unlock_tvar(cap, trec, s, e -> expected_value, false);
            }
            merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
            ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
          });
        } else {
            revert_ownership(cap, trec, false);
        }
      }
    
      unlock_stm(trec);
    
      free_stg_trec_header(cap, trec);
    
      TRACE("%p : stmCommitNestedTransaction()=%d", trec, result);
    
      return result;
    }
    

    c. 休眠函数之retry#

    stg_retryzh /* no arg list: explicit stack layout */
    {
        W_ frame_type;
        W_ frame;
        W_ trec;
        W_ outer;
        W_ r;
    
        // STM operations may allocate
        MAYBE_GC_ (stg_retryzh); // NB. not MAYBE_GC(), we cannot make a
                                 // function call in an explicit-stack proc
    
        // Find the enclosing ATOMICALLY_FRAME or CATCH_RETRY_FRAME
    retry_pop_stack:
        SAVE_THREAD_STATE();
        (frame_type) = ccall findRetryFrameHelper(MyCapability(), CurrentTSO "ptr");
        LOAD_THREAD_STATE();
        frame = Sp;
        trec = StgTSO_trec(CurrentTSO);
        outer  = StgTRecHeader_enclosing_trec(trec);
    
        if (frame_type == CATCH_RETRY_FRAME) {
            // The retry reaches a CATCH_RETRY_FRAME before the atomic frame
            ASSERT(outer != NO_TREC);
            // Abort the transaction attempting the current branch
            ccall stmAbortTransaction(MyCapability() "ptr", trec "ptr");
            ccall stmFreeAbortedTRec(MyCapability() "ptr", trec "ptr");
            if (!StgCatchRetryFrame_running_alt_code(frame) != 0) {
                // Retry in the first branch: try the alternative
                ("ptr" trec) = ccall stmStartTransaction(MyCapability() "ptr", outer "ptr");
                StgTSO_trec(CurrentTSO) = trec;
                StgCatchRetryFrame_running_alt_code(frame) = 1 :: CInt; // true;
                R1 = StgCatchRetryFrame_alt_code(frame);
                jump stg_ap_v_fast [R1];
            } else {
                // Retry in the alternative code: propagate the retry
                StgTSO_trec(CurrentTSO) = outer;
                Sp = Sp + SIZEOF_StgCatchRetryFrame;
                goto retry_pop_stack;
            }
        }
    
        // We've reached the ATOMICALLY_FRAME: attempt to wait
        ASSERT(frame_type == ATOMICALLY_FRAME);
        ASSERT(outer == NO_TREC);
    
        (r) = ccall stmWait(MyCapability() "ptr", CurrentTSO "ptr", trec "ptr");
        if (r != 0) {
            // Transaction was valid: stmWait put us on the TVars' queues, we now block
            StgHeader_info(frame) = stg_atomically_waiting_frame_info;
            Sp = frame;
            R3 = trec; // passing to stmWaitUnblock()
            jump stg_block_stmwait [R3];
        } else {
            // Transaction was not valid: retry immediately
            ("ptr" trec) = ccall stmStartTransaction(MyCapability() "ptr", outer "ptr");
            StgTSO_trec(CurrentTSO) = trec;
            Sp = frame;
            R1 = StgAtomicallyFrame_code(frame);
            jump stg_ap_v_fast [R1];
        }
    }
    
    StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
      TRACE("%p : stmWait(%p)", trec, tso);
      ASSERT(trec != NO_TREC);
      ASSERT(trec -> enclosing_trec == NO_TREC);
      ASSERT((trec -> state == TREC_ACTIVE) ||
             (trec -> state == TREC_CONDEMNED));
    
      lock_stm(trec);
      bool result = validate_and_acquire_ownership(cap, trec, true, true);
      if (result) {
        // The transaction is valid so far so we can actually start waiting.
        // (Otherwise the transaction was not valid and the thread will have to
        // retry it).
    
        // Put ourselves to sleep.  We retain locks on all the TVars involved
        // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
        // in the TSO, (c) TREC_WAITING in the Trec.
        build_watch_queue_entries_for_trec(cap, tso, trec);
        park_tso(tso);
        trec -> state = TREC_WAITING;
    
        // We haven't released ownership of the transaction yet.  The TSO
        // has been put on the wait queue for the TVars it is waiting for,
        // but we haven't yet tidied up the TSO's stack and made it safe
        // to wake up the TSO.  Therefore, we must wait until the TSO is
        // safe to wake up before we release ownership - when all is well,
        // the runtime will call stmWaitUnlock() below, with the same
        // TRec.
    
      } else {
        unlock_stm(trec);
        free_stg_trec_header(cap, trec);
      }
    
      TRACE("%p : stmWait(%p)=%d", trec, tso, result);
      return result;
    }
    
    

    分支跳转函数之catchRetry#

    stg_catchRetryzh (P_ first_code, /* :: STM a */
                      P_ alt_code    /* :: STM a */)
    {
        W_ new_trec;
    
        // stmStartTransaction may allocate
        MAYBE_GC_PP (stg_catchRetryzh, first_code, alt_code);
    
        STK_CHK_GEN();
    
        /* Start a nested transaction within which to run the first code */
        ("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr",
                                                     StgTSO_trec(CurrentTSO) "ptr");
        StgTSO_trec(CurrentTSO) = new_trec;
    
        // push the CATCH_RETRY stack frame, and apply first_code to realWorld#
        jump stg_ap_v_fast
            (CATCH_RETRY_FRAME_FIELDS(,, stg_catch_retry_frame_info, CCCS, 0,
                                      0, /* not running_alt_code */
                                      first_code,
                                      alt_code))
            (first_code);
    }
    
    INFO_TABLE_RET(stg_catch_retry_frame, CATCH_RETRY_FRAME,
                   CATCH_RETRY_FRAME_FIELDS(W_,P_,
                                            info_ptr, p1, p2,
                                            running_alt_code,
                                            first_code,
                                            alt_code))
        return (P_ ret)
    {
        unwind Sp = Sp + SIZEOF_StgCatchRetryFrame;
        W_ r;
        gcptr trec, outer, arg;
    
        trec = StgTSO_trec(CurrentTSO);
        outer  = StgTRecHeader_enclosing_trec(trec);
        (r) = ccall stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr");
        if (r != 0) {
            // Succeeded (either first branch or second branch)
            StgTSO_trec(CurrentTSO) = outer;
            return (ret);
        } else {
            // Did not commit: re-execute
            P_ new_trec;
            ("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr",
                                                               outer "ptr");
            StgTSO_trec(CurrentTSO) = new_trec;
            if (running_alt_code != 0) {
                jump stg_ap_v_fast
                    (CATCH_RETRY_FRAME_FIELDS(,,info_ptr, p1, p2,
                                              running_alt_code,
                                              first_code,
                                              alt_code))
                    (alt_code);
            } else {
                jump stg_ap_v_fast
                    (CATCH_RETRY_FRAME_FIELDS(,,info_ptr, p1, p2,
                                              running_alt_code,
                                              first_code,
                                              alt_code))
                    (first_code);
            }
        }
    }
    
    

    由于时间关系,代码不作细一步讲述,后面将会补充进来。

    相关文章

      网友评论

        本文标题:函数式内功心法-05: 无锁高并发技术之STM海市蜃楼

        本文链接:https://www.haomeiwen.com/subject/kijatqtx.html