美文网首页
ChainMaker事件触发源码解析

ChainMaker事件触发源码解析

作者: 冰冰大象 | 来源:发表于2021-08-11 19:09 被阅读0次

带着问题看源码
一:事件订阅流程是怎么样的
二:SubscribeTx,SubscribeBlock我们通过订阅交易来知道异步发送的交易是否上链,那么合约里的SubscribeContractEvent 触发时能否表示已经上链

SDK消息订阅

首先在go-sdk里发送需要订阅的消息类型具体代码如下

##chainmaker-sdk-go/sdk_subscribe.go
func (cc *ChainClient) Subscribe(ctx context.Context, txType common.TxType, payloadBytes []byte) (<-chan interface{}, error) {
    ...
// 从这里看到客户端是调用远程服务端RPC Subscribe 方法 采用GRPC双向流模式
    resp, err := client.rpcNode.Subscribe(ctx, req)
    if err != nil {
        return nil, err
    }

    c := make(chan interface{})
    go func() {
        defer close(c)
        for {
            select {
            case <-ctx.Done():
                return
            default:
//开一个协程 等服务端返回数据
                result, err := resp.Recv()
    ...
                var ret interface{}
                switch txType {
//监听block事件
                case common.TxType_SUBSCRIBE_BLOCK_INFO:
            ...
//监听交易事件
                case common.TxType_SUBSCRIBE_TX_INFO:
            ...
//监听合约触发事件
                case common.TxType_SUBSCRIBE_CONTRACT_EVENT_INFO:
                    events := &common.ContractEventInfoList{}
                    if err = proto.Unmarshal(result.Data, events); err != nil {
                        cc.logger.Error("[SDK] Subscriber receive contract event failed, %s", err)
                        close(c)
                        return
                    }
                    for _, event := range events.ContractEvents {
                        c <- event
                    }
                    continue

                default:
                    ret = result.Data
                }

                c <- ret
            }
        }
    }()

    return c, nil
}

客户端监听就这么简单

服务端消息发布和订阅

先来看下客户端远程调用的Subscribe订阅方法

##### module/rpcserver/subscribe_service.go
// Subscribe - deal block/tx subscribe request
func (s *ApiService) Subscribe(req *commonPb.TxRequest, server apiPb.RpcNode_SubscribeServer) error {
    ...
    switch req.Header.TxType {
    case commonPb.TxType_SUBSCRIBE_BLOCK_INFO:
        return s.dealBlockSubscription(tx, server)
    case commonPb.TxType_SUBSCRIBE_TX_INFO:
        return s.dealTxSubscription(tx, server)
       //合约消息
    case commonPb.TxType_SUBSCRIBE_CONTRACT_EVENT_INFO:
        return s.dealContractEventSubscription(tx, server)
    }
    return nil
}
//dealContractEventSubscription - deal contract event subscribe request
func (s *ApiService) dealContractEventSubscription(tx *commonPb.Transaction, server apiPb.RpcNode_SubscribeServer) error {
    ...
       //序列化一下数据
    if err = proto.Unmarshal(tx.RequestPayload, &payload); err != nil {
        ...
    }
     // 检查一下数据完整性
    if err = s.checkSubscribeContractEventPayload(&payload); err != nil {
        ...
    }
    s.log.Infof("Recv contractEventInfo subscribe request: [topic:%v]/[contractName:%v]",
        payload.Topic, payload.ContractName)
     // 一层一层找
    return s.doSendContractEvent(tx, server, payload)
}
func (s *ApiService) doSendContractEvent(tx *commonPb.Transaction, server apiPb.RpcNode_SubscribeServer, payload commonPb.SubscribeContractEventPayload) error {
...
//  事件接收通道
    eventCh := make(chan model.NewContractEvent)

    chainId := tx.Header.ChainId
// 获取特定链的消息处理对象,一会儿详解
    if eventSubscriber, err = s.chainMakerServer.GetEventSubscribe(chainId); err != nil {
    ...
    }
     // 将通道订阅的特定链的消息处理对象里,一会儿详解
    sub := eventSubscriber.SubscribeContractEvent(eventCh)
    defer sub.Unsubscribe()
    for {
        select {
// 监听真正的消息
        case ev := <-eventCh:
            contractEventInfoList := ev.ContractEventInfoList.ContractEvents
            sendEventInfoList := &commonPb.ContractEventInfoList{}
            for _, EventInfo := range contractEventInfoList {
                if EventInfo.ContractName != payload.ContractName || EventInfo.Topic != payload.Topic {
                    continue
                }
                sendEventInfoList.ContractEvents = append(sendEventInfoList.ContractEvents, EventInfo)
            }
            if len(sendEventInfoList.ContractEvents) > 0 {
                if result, err = s.getContractEventSubscribeResult(sendEventInfoList); err != nil {
                    s.log.Error(err.Error())
                    return status.Error(codes.Internal, err.Error())
                }
// 将消息结果通过服务端RPC 发送给客户端
                if err := server.Send(result); err != nil {
                    err = fmt.Errorf("send block info by realtime failed, %s", err)
                    s.log.Error(err.Error())
                    return status.Error(codes.Internal, err.Error())
                }
            }
        case <-server.Context().Done():
            return nil
        case <-s.ctx.Done():
            return nil
        }
    }
}

服务端提供的RPC 订阅方法就这些,现在讲一下特定链的消息处理对象eventSubscriber

######## module/subscriber/subscriber.go
package subscriber

import (
...
    feed "github.com/ethereum/go-ethereum/event"
)
// 消息处理对象
// EventSubscriber - new EventSubscriber struct
type EventSubscriber struct {
    // 这里用到以太坊的事件功能,feed.Feed 
    blockFeed         feed.Feed
    contractEventFeed feed.Feed
}

// 接收msgbus总线发送消息
// OnMessage - deal msgbus.BlockInfo message
func (s *EventSubscriber) OnMessage(msg *msgbus.Message) {
// 根据类型判断发送消息,交易消息 和 区块消息 其实是都是NewBlockEvent类型
    if blockInfo, ok := msg.Payload.(*commonPb.BlockInfo); ok {
        go s.blockFeed.Send(model.NewBlockEvent{BlockInfo: blockInfo})
    }
// 这里是发送合约事件消息
    if conEventInfoList, ok := msg.Payload.(*commonPb.ContractEventInfoList); ok {
        go s.contractEventFeed.Send(model.NewContractEvent{ContractEventInfoList: conEventInfoList})
    }
}

// 初始化的时候将消息对象注册到msgBus 消息总线
// NewSubscriber - new and register msgbus.BlockInfo object
func NewSubscriber(msgBus msgbus.MessageBus) *EventSubscriber {
    subscriber := &EventSubscriber{}
    msgBus.Register(msgbus.BlockInfo, subscriber)

    msgBus.Register(msgbus.ContractEventInfo, subscriber)
    return subscriber
}

// SubscribeBlockEvent - subscribe block event
func (s *EventSubscriber) SubscribeBlockEvent(ch chan<- model.NewBlockEvent) feed.Subscription {
    return s.blockFeed.Subscribe(ch)
}

// SubscribeContractEvent - subscribe contract event
func (s *EventSubscriber) SubscribeContractEvent(ch chan<- model.NewContractEvent) feed.Subscription {
    return s.contractEventFeed.Subscribe(ch)
}

先来出个图 简绍一下MsgBus


MsgBus 其实就是一个消息广播,从架构图得知ChainMaker其他模块交互都是异步的,都是通过MsgBus来进行数据互传,各订阅都是通过OnMessage方法来接收MsgBus消息,如下
#### common/msgbus/message_bus.go
func (b *messageBusImpl) notify(m *Message, isSafe bool) {
    if m.Topic <= 0 {
        return
    }

    // fetch the subscribers for topic
    subs, _ := b.topicMap.Load(m.Topic)
    if subs == nil {
        return
    }
    s := subs.([]Subscriber)
    length := len(s)

    // notify each subscriber one by one
    for i := 0; i < length; i++ {
        s := s[i].(Subscriber)
        if isSafe {
// 发送同步消息
            s.OnMessage(m) // notify in order
        } else {
// 发送异步消息
            go s.OnMessage(m)
        }
    }
}

由此可以找到第一个问题答案

事件订阅流程是如下


流程了解后,那么对于第二个问题SubscribeContractEvent 触发时能否表示已经上链 我们只要找下 发送消息的地方就行
直接找到最后上链的方法如下

##### module/core/common/committer.go
//CommitBlock the action that all consensus types do when a block is committed
func (cb *CommitBlock) CommitBlock(
    block *commonpb.Block,
    rwSetMap map[string]*commonpb.TxRWSet,
    conEventMap map[string][]*commonpb.ContractEvent) (dbLasts, snapshotLasts, confLasts, otherLasts, pubEvent int64, err error) {
    // record block
    rwSet := RearrangeRWSet(block, rwSetMap)
    // record contract event  获取合约事件的数据
    events := rearrangeContractEvent(block, conEventMap)

    startDBTick := utils.CurrentTimeMillisSeconds()
    // 上链了
    if err = cb.store.PutBlock(block, rwSet); err != nil {
        ...
    }
    dbLasts = utils.CurrentTimeMillisSeconds() - startDBTick

    // clear snapshot 清理快照 因为已经落块了 快照其实就是还未落块的区块,在新增区块的时候 同时也会同步新增快照
    startSnapshotTick := utils.CurrentTimeMillisSeconds()
    if err = cb.snapshotManager.NotifyBlockCommitted(block); err != nil {
        ...
    }
    snapshotLasts = utils.CurrentTimeMillisSeconds() - startSnapshotTick

    // notify chainConf to update config when config block committed
    startConfTick := utils.CurrentTimeMillisSeconds()
       // 更新账本缓存为最新的提交块
    cb.ledgerCache.SetLastCommittedBlock(block)
    //如果是配置块则广播给所有监听配置的事件
    if err = NotifyChainConf(block, cb.chainConf); err != nil {
        return 0, 0, 0, 0, 0, err
    }
    confLasts = utils.CurrentTimeMillisSeconds() - startConfTick

    // publish contract event   这里就是合约触发的事件
    var startPublishContractEventTick int64
    if len(events) > 0 {
        startPublishContractEventTick = utils.CurrentTimeMillisSeconds()
        cb.log.Infof("start publish contractEventsInfo: block[%d] ,time[%d]", block.Header.BlockHeight, startPublishContractEventTick)
        var eventsInfo []*commonpb.ContractEventInfo
      // 构造合约事件需要传递的数据
        for _, t := range events {
            eventInfo := &commonpb.ContractEventInfo{
                BlockHeight:     block.Header.BlockHeight,
                ChainId:         block.Header.GetChainId(),
                Topic:           t.Topic,
                TxId:            t.TxId,
                ContractName:    t.ContractName,
                ContractVersion: t.ContractVersion,
                EventData:       t.EventData,
            }
            eventsInfo = append(eventsInfo, eventInfo)
        }
       // 看这里就是通过消息总线去广播合约事件
        cb.msgBus.Publish(msgbus.ContractEventInfo, &commonpb.ContractEventInfoList{ContractEvents: eventsInfo})
        pubEvent = utils.CurrentTimeMillisSeconds() - startPublishContractEventTick
    }
    startOtherTick := utils.CurrentTimeMillisSeconds()
    bi := &commonpb.BlockInfo{
        Block:     block,
        RwsetList: rwSet,
    }
    // synchronize new block height to consensus and sync module
      // 这里是就是广播的区块事件
    cb.msgBus.PublishSafe(msgbus.BlockInfo, bi)
    if err = cb.MonitorCommit(bi); err != nil {
        return 0, 0, 0, 0, 0, err
    }
    otherLasts = utils.CurrentTimeMillisSeconds() - startOtherTick

    return
}

由此可见 第二个问题答案

二:SubscribeTx,SubscribeBlock我们通过订阅交易来知道异步发送的交易是否上链,那么合约里的SubscribeContractEvent 触发时能否表示已经上链
答:SubscribeContractEvent,SubscribeTx,SubscribeBlock 都是在区块已经上链后才触发的

相关文章

网友评论

      本文标题:ChainMaker事件触发源码解析

      本文链接:https://www.haomeiwen.com/subject/goubvltx.html