美文网首页
ChainMaker事件触发源码解析

ChainMaker事件触发源码解析

作者: 冰冰大象 | 来源:发表于2021-08-11 19:09 被阅读0次

    带着问题看源码
    一:事件订阅流程是怎么样的
    二:SubscribeTx,SubscribeBlock我们通过订阅交易来知道异步发送的交易是否上链,那么合约里的SubscribeContractEvent 触发时能否表示已经上链

    SDK消息订阅

    首先在go-sdk里发送需要订阅的消息类型具体代码如下

    ##chainmaker-sdk-go/sdk_subscribe.go
    func (cc *ChainClient) Subscribe(ctx context.Context, txType common.TxType, payloadBytes []byte) (<-chan interface{}, error) {
        ...
    // 从这里看到客户端是调用远程服务端RPC Subscribe 方法 采用GRPC双向流模式
        resp, err := client.rpcNode.Subscribe(ctx, req)
        if err != nil {
            return nil, err
        }
    
        c := make(chan interface{})
        go func() {
            defer close(c)
            for {
                select {
                case <-ctx.Done():
                    return
                default:
    //开一个协程 等服务端返回数据
                    result, err := resp.Recv()
        ...
                    var ret interface{}
                    switch txType {
    //监听block事件
                    case common.TxType_SUBSCRIBE_BLOCK_INFO:
                ...
    //监听交易事件
                    case common.TxType_SUBSCRIBE_TX_INFO:
                ...
    //监听合约触发事件
                    case common.TxType_SUBSCRIBE_CONTRACT_EVENT_INFO:
                        events := &common.ContractEventInfoList{}
                        if err = proto.Unmarshal(result.Data, events); err != nil {
                            cc.logger.Error("[SDK] Subscriber receive contract event failed, %s", err)
                            close(c)
                            return
                        }
                        for _, event := range events.ContractEvents {
                            c <- event
                        }
                        continue
    
                    default:
                        ret = result.Data
                    }
    
                    c <- ret
                }
            }
        }()
    
        return c, nil
    }
    
    

    客户端监听就这么简单

    服务端消息发布和订阅

    先来看下客户端远程调用的Subscribe订阅方法

    ##### module/rpcserver/subscribe_service.go
    // Subscribe - deal block/tx subscribe request
    func (s *ApiService) Subscribe(req *commonPb.TxRequest, server apiPb.RpcNode_SubscribeServer) error {
        ...
        switch req.Header.TxType {
        case commonPb.TxType_SUBSCRIBE_BLOCK_INFO:
            return s.dealBlockSubscription(tx, server)
        case commonPb.TxType_SUBSCRIBE_TX_INFO:
            return s.dealTxSubscription(tx, server)
           //合约消息
        case commonPb.TxType_SUBSCRIBE_CONTRACT_EVENT_INFO:
            return s.dealContractEventSubscription(tx, server)
        }
        return nil
    }
    //dealContractEventSubscription - deal contract event subscribe request
    func (s *ApiService) dealContractEventSubscription(tx *commonPb.Transaction, server apiPb.RpcNode_SubscribeServer) error {
        ...
           //序列化一下数据
        if err = proto.Unmarshal(tx.RequestPayload, &payload); err != nil {
            ...
        }
         // 检查一下数据完整性
        if err = s.checkSubscribeContractEventPayload(&payload); err != nil {
            ...
        }
        s.log.Infof("Recv contractEventInfo subscribe request: [topic:%v]/[contractName:%v]",
            payload.Topic, payload.ContractName)
         // 一层一层找
        return s.doSendContractEvent(tx, server, payload)
    }
    func (s *ApiService) doSendContractEvent(tx *commonPb.Transaction, server apiPb.RpcNode_SubscribeServer, payload commonPb.SubscribeContractEventPayload) error {
    ...
    //  事件接收通道
        eventCh := make(chan model.NewContractEvent)
    
        chainId := tx.Header.ChainId
    // 获取特定链的消息处理对象,一会儿详解
        if eventSubscriber, err = s.chainMakerServer.GetEventSubscribe(chainId); err != nil {
        ...
        }
         // 将通道订阅的特定链的消息处理对象里,一会儿详解
        sub := eventSubscriber.SubscribeContractEvent(eventCh)
        defer sub.Unsubscribe()
        for {
            select {
    // 监听真正的消息
            case ev := <-eventCh:
                contractEventInfoList := ev.ContractEventInfoList.ContractEvents
                sendEventInfoList := &commonPb.ContractEventInfoList{}
                for _, EventInfo := range contractEventInfoList {
                    if EventInfo.ContractName != payload.ContractName || EventInfo.Topic != payload.Topic {
                        continue
                    }
                    sendEventInfoList.ContractEvents = append(sendEventInfoList.ContractEvents, EventInfo)
                }
                if len(sendEventInfoList.ContractEvents) > 0 {
                    if result, err = s.getContractEventSubscribeResult(sendEventInfoList); err != nil {
                        s.log.Error(err.Error())
                        return status.Error(codes.Internal, err.Error())
                    }
    // 将消息结果通过服务端RPC 发送给客户端
                    if err := server.Send(result); err != nil {
                        err = fmt.Errorf("send block info by realtime failed, %s", err)
                        s.log.Error(err.Error())
                        return status.Error(codes.Internal, err.Error())
                    }
                }
            case <-server.Context().Done():
                return nil
            case <-s.ctx.Done():
                return nil
            }
        }
    }
    

    服务端提供的RPC 订阅方法就这些,现在讲一下特定链的消息处理对象eventSubscriber

    ######## module/subscriber/subscriber.go
    package subscriber
    
    import (
    ...
        feed "github.com/ethereum/go-ethereum/event"
    )
    // 消息处理对象
    // EventSubscriber - new EventSubscriber struct
    type EventSubscriber struct {
        // 这里用到以太坊的事件功能,feed.Feed 
        blockFeed         feed.Feed
        contractEventFeed feed.Feed
    }
    
    // 接收msgbus总线发送消息
    // OnMessage - deal msgbus.BlockInfo message
    func (s *EventSubscriber) OnMessage(msg *msgbus.Message) {
    // 根据类型判断发送消息,交易消息 和 区块消息 其实是都是NewBlockEvent类型
        if blockInfo, ok := msg.Payload.(*commonPb.BlockInfo); ok {
            go s.blockFeed.Send(model.NewBlockEvent{BlockInfo: blockInfo})
        }
    // 这里是发送合约事件消息
        if conEventInfoList, ok := msg.Payload.(*commonPb.ContractEventInfoList); ok {
            go s.contractEventFeed.Send(model.NewContractEvent{ContractEventInfoList: conEventInfoList})
        }
    }
    
    // 初始化的时候将消息对象注册到msgBus 消息总线
    // NewSubscriber - new and register msgbus.BlockInfo object
    func NewSubscriber(msgBus msgbus.MessageBus) *EventSubscriber {
        subscriber := &EventSubscriber{}
        msgBus.Register(msgbus.BlockInfo, subscriber)
    
        msgBus.Register(msgbus.ContractEventInfo, subscriber)
        return subscriber
    }
    
    // SubscribeBlockEvent - subscribe block event
    func (s *EventSubscriber) SubscribeBlockEvent(ch chan<- model.NewBlockEvent) feed.Subscription {
        return s.blockFeed.Subscribe(ch)
    }
    
    // SubscribeContractEvent - subscribe contract event
    func (s *EventSubscriber) SubscribeContractEvent(ch chan<- model.NewContractEvent) feed.Subscription {
        return s.contractEventFeed.Subscribe(ch)
    }
    
    

    先来出个图 简绍一下MsgBus


    MsgBus 其实就是一个消息广播,从架构图得知ChainMaker其他模块交互都是异步的,都是通过MsgBus来进行数据互传,各订阅都是通过OnMessage方法来接收MsgBus消息,如下
    #### common/msgbus/message_bus.go
    func (b *messageBusImpl) notify(m *Message, isSafe bool) {
        if m.Topic <= 0 {
            return
        }
    
        // fetch the subscribers for topic
        subs, _ := b.topicMap.Load(m.Topic)
        if subs == nil {
            return
        }
        s := subs.([]Subscriber)
        length := len(s)
    
        // notify each subscriber one by one
        for i := 0; i < length; i++ {
            s := s[i].(Subscriber)
            if isSafe {
    // 发送同步消息
                s.OnMessage(m) // notify in order
            } else {
    // 发送异步消息
                go s.OnMessage(m)
            }
        }
    }
    

    由此可以找到第一个问题答案

    事件订阅流程是如下


    流程了解后,那么对于第二个问题SubscribeContractEvent 触发时能否表示已经上链 我们只要找下 发送消息的地方就行
    直接找到最后上链的方法如下

    ##### module/core/common/committer.go
    //CommitBlock the action that all consensus types do when a block is committed
    func (cb *CommitBlock) CommitBlock(
        block *commonpb.Block,
        rwSetMap map[string]*commonpb.TxRWSet,
        conEventMap map[string][]*commonpb.ContractEvent) (dbLasts, snapshotLasts, confLasts, otherLasts, pubEvent int64, err error) {
        // record block
        rwSet := RearrangeRWSet(block, rwSetMap)
        // record contract event  获取合约事件的数据
        events := rearrangeContractEvent(block, conEventMap)
    
        startDBTick := utils.CurrentTimeMillisSeconds()
        // 上链了
        if err = cb.store.PutBlock(block, rwSet); err != nil {
            ...
        }
        dbLasts = utils.CurrentTimeMillisSeconds() - startDBTick
    
        // clear snapshot 清理快照 因为已经落块了 快照其实就是还未落块的区块,在新增区块的时候 同时也会同步新增快照
        startSnapshotTick := utils.CurrentTimeMillisSeconds()
        if err = cb.snapshotManager.NotifyBlockCommitted(block); err != nil {
            ...
        }
        snapshotLasts = utils.CurrentTimeMillisSeconds() - startSnapshotTick
    
        // notify chainConf to update config when config block committed
        startConfTick := utils.CurrentTimeMillisSeconds()
           // 更新账本缓存为最新的提交块
        cb.ledgerCache.SetLastCommittedBlock(block)
        //如果是配置块则广播给所有监听配置的事件
        if err = NotifyChainConf(block, cb.chainConf); err != nil {
            return 0, 0, 0, 0, 0, err
        }
        confLasts = utils.CurrentTimeMillisSeconds() - startConfTick
    
        // publish contract event   这里就是合约触发的事件
        var startPublishContractEventTick int64
        if len(events) > 0 {
            startPublishContractEventTick = utils.CurrentTimeMillisSeconds()
            cb.log.Infof("start publish contractEventsInfo: block[%d] ,time[%d]", block.Header.BlockHeight, startPublishContractEventTick)
            var eventsInfo []*commonpb.ContractEventInfo
          // 构造合约事件需要传递的数据
            for _, t := range events {
                eventInfo := &commonpb.ContractEventInfo{
                    BlockHeight:     block.Header.BlockHeight,
                    ChainId:         block.Header.GetChainId(),
                    Topic:           t.Topic,
                    TxId:            t.TxId,
                    ContractName:    t.ContractName,
                    ContractVersion: t.ContractVersion,
                    EventData:       t.EventData,
                }
                eventsInfo = append(eventsInfo, eventInfo)
            }
           // 看这里就是通过消息总线去广播合约事件
            cb.msgBus.Publish(msgbus.ContractEventInfo, &commonpb.ContractEventInfoList{ContractEvents: eventsInfo})
            pubEvent = utils.CurrentTimeMillisSeconds() - startPublishContractEventTick
        }
        startOtherTick := utils.CurrentTimeMillisSeconds()
        bi := &commonpb.BlockInfo{
            Block:     block,
            RwsetList: rwSet,
        }
        // synchronize new block height to consensus and sync module
          // 这里是就是广播的区块事件
        cb.msgBus.PublishSafe(msgbus.BlockInfo, bi)
        if err = cb.MonitorCommit(bi); err != nil {
            return 0, 0, 0, 0, 0, err
        }
        otherLasts = utils.CurrentTimeMillisSeconds() - startOtherTick
    
        return
    }
    

    由此可见 第二个问题答案

    二:SubscribeTx,SubscribeBlock我们通过订阅交易来知道异步发送的交易是否上链,那么合约里的SubscribeContractEvent 触发时能否表示已经上链
    答:SubscribeContractEvent,SubscribeTx,SubscribeBlock 都是在区块已经上链后才触发的

    相关文章

      网友评论

          本文标题:ChainMaker事件触发源码解析

          本文链接:https://www.haomeiwen.com/subject/goubvltx.html