带着问题看源码
一:事件订阅流程是怎么样的
二:SubscribeTx,SubscribeBlock我们通过订阅交易来知道异步发送的交易是否上链,那么合约里的SubscribeContractEvent 触发时能否表示已经上链
SDK消息订阅
首先在go-sdk里发送需要订阅的消息类型具体代码如下
##chainmaker-sdk-go/sdk_subscribe.go
func (cc *ChainClient) Subscribe(ctx context.Context, txType common.TxType, payloadBytes []byte) (<-chan interface{}, error) {
...
// 从这里看到客户端是调用远程服务端RPC Subscribe 方法 采用GRPC双向流模式
resp, err := client.rpcNode.Subscribe(ctx, req)
if err != nil {
return nil, err
}
c := make(chan interface{})
go func() {
defer close(c)
for {
select {
case <-ctx.Done():
return
default:
//开一个协程 等服务端返回数据
result, err := resp.Recv()
...
var ret interface{}
switch txType {
//监听block事件
case common.TxType_SUBSCRIBE_BLOCK_INFO:
...
//监听交易事件
case common.TxType_SUBSCRIBE_TX_INFO:
...
//监听合约触发事件
case common.TxType_SUBSCRIBE_CONTRACT_EVENT_INFO:
events := &common.ContractEventInfoList{}
if err = proto.Unmarshal(result.Data, events); err != nil {
cc.logger.Error("[SDK] Subscriber receive contract event failed, %s", err)
close(c)
return
}
for _, event := range events.ContractEvents {
c <- event
}
continue
default:
ret = result.Data
}
c <- ret
}
}
}()
return c, nil
}
客户端监听就这么简单
服务端消息发布和订阅
先来看下客户端远程调用的Subscribe
订阅方法
##### module/rpcserver/subscribe_service.go
// Subscribe - deal block/tx subscribe request
func (s *ApiService) Subscribe(req *commonPb.TxRequest, server apiPb.RpcNode_SubscribeServer) error {
...
switch req.Header.TxType {
case commonPb.TxType_SUBSCRIBE_BLOCK_INFO:
return s.dealBlockSubscription(tx, server)
case commonPb.TxType_SUBSCRIBE_TX_INFO:
return s.dealTxSubscription(tx, server)
//合约消息
case commonPb.TxType_SUBSCRIBE_CONTRACT_EVENT_INFO:
return s.dealContractEventSubscription(tx, server)
}
return nil
}
//dealContractEventSubscription - deal contract event subscribe request
func (s *ApiService) dealContractEventSubscription(tx *commonPb.Transaction, server apiPb.RpcNode_SubscribeServer) error {
...
//序列化一下数据
if err = proto.Unmarshal(tx.RequestPayload, &payload); err != nil {
...
}
// 检查一下数据完整性
if err = s.checkSubscribeContractEventPayload(&payload); err != nil {
...
}
s.log.Infof("Recv contractEventInfo subscribe request: [topic:%v]/[contractName:%v]",
payload.Topic, payload.ContractName)
// 一层一层找
return s.doSendContractEvent(tx, server, payload)
}
func (s *ApiService) doSendContractEvent(tx *commonPb.Transaction, server apiPb.RpcNode_SubscribeServer, payload commonPb.SubscribeContractEventPayload) error {
...
// 事件接收通道
eventCh := make(chan model.NewContractEvent)
chainId := tx.Header.ChainId
// 获取特定链的消息处理对象,一会儿详解
if eventSubscriber, err = s.chainMakerServer.GetEventSubscribe(chainId); err != nil {
...
}
// 将通道订阅的特定链的消息处理对象里,一会儿详解
sub := eventSubscriber.SubscribeContractEvent(eventCh)
defer sub.Unsubscribe()
for {
select {
// 监听真正的消息
case ev := <-eventCh:
contractEventInfoList := ev.ContractEventInfoList.ContractEvents
sendEventInfoList := &commonPb.ContractEventInfoList{}
for _, EventInfo := range contractEventInfoList {
if EventInfo.ContractName != payload.ContractName || EventInfo.Topic != payload.Topic {
continue
}
sendEventInfoList.ContractEvents = append(sendEventInfoList.ContractEvents, EventInfo)
}
if len(sendEventInfoList.ContractEvents) > 0 {
if result, err = s.getContractEventSubscribeResult(sendEventInfoList); err != nil {
s.log.Error(err.Error())
return status.Error(codes.Internal, err.Error())
}
// 将消息结果通过服务端RPC 发送给客户端
if err := server.Send(result); err != nil {
err = fmt.Errorf("send block info by realtime failed, %s", err)
s.log.Error(err.Error())
return status.Error(codes.Internal, err.Error())
}
}
case <-server.Context().Done():
return nil
case <-s.ctx.Done():
return nil
}
}
}
服务端提供的RPC 订阅方法就这些,现在讲一下特定链的消息处理对象eventSubscriber
######## module/subscriber/subscriber.go
package subscriber
import (
...
feed "github.com/ethereum/go-ethereum/event"
)
// 消息处理对象
// EventSubscriber - new EventSubscriber struct
type EventSubscriber struct {
// 这里用到以太坊的事件功能,feed.Feed
blockFeed feed.Feed
contractEventFeed feed.Feed
}
// 接收msgbus总线发送消息
// OnMessage - deal msgbus.BlockInfo message
func (s *EventSubscriber) OnMessage(msg *msgbus.Message) {
// 根据类型判断发送消息,交易消息 和 区块消息 其实是都是NewBlockEvent类型
if blockInfo, ok := msg.Payload.(*commonPb.BlockInfo); ok {
go s.blockFeed.Send(model.NewBlockEvent{BlockInfo: blockInfo})
}
// 这里是发送合约事件消息
if conEventInfoList, ok := msg.Payload.(*commonPb.ContractEventInfoList); ok {
go s.contractEventFeed.Send(model.NewContractEvent{ContractEventInfoList: conEventInfoList})
}
}
// 初始化的时候将消息对象注册到msgBus 消息总线
// NewSubscriber - new and register msgbus.BlockInfo object
func NewSubscriber(msgBus msgbus.MessageBus) *EventSubscriber {
subscriber := &EventSubscriber{}
msgBus.Register(msgbus.BlockInfo, subscriber)
msgBus.Register(msgbus.ContractEventInfo, subscriber)
return subscriber
}
// SubscribeBlockEvent - subscribe block event
func (s *EventSubscriber) SubscribeBlockEvent(ch chan<- model.NewBlockEvent) feed.Subscription {
return s.blockFeed.Subscribe(ch)
}
// SubscribeContractEvent - subscribe contract event
func (s *EventSubscriber) SubscribeContractEvent(ch chan<- model.NewContractEvent) feed.Subscription {
return s.contractEventFeed.Subscribe(ch)
}
先来出个图 简绍一下MsgBus
MsgBus 其实就是一个消息广播,从架构图得知ChainMaker其他模块交互都是异步的,都是通过MsgBus来进行数据互传,各订阅都是通过
OnMessage
方法来接收MsgBus消息,如下
#### common/msgbus/message_bus.go
func (b *messageBusImpl) notify(m *Message, isSafe bool) {
if m.Topic <= 0 {
return
}
// fetch the subscribers for topic
subs, _ := b.topicMap.Load(m.Topic)
if subs == nil {
return
}
s := subs.([]Subscriber)
length := len(s)
// notify each subscriber one by one
for i := 0; i < length; i++ {
s := s[i].(Subscriber)
if isSafe {
// 发送同步消息
s.OnMessage(m) // notify in order
} else {
// 发送异步消息
go s.OnMessage(m)
}
}
}
由此可以找到第一个问题答案
事件订阅流程是如下
流程了解后,那么对于第二个问题SubscribeContractEvent 触发时能否表示已经上链 我们只要找下 发送消息的地方就行
直接找到最后上链的方法如下
##### module/core/common/committer.go
//CommitBlock the action that all consensus types do when a block is committed
func (cb *CommitBlock) CommitBlock(
block *commonpb.Block,
rwSetMap map[string]*commonpb.TxRWSet,
conEventMap map[string][]*commonpb.ContractEvent) (dbLasts, snapshotLasts, confLasts, otherLasts, pubEvent int64, err error) {
// record block
rwSet := RearrangeRWSet(block, rwSetMap)
// record contract event 获取合约事件的数据
events := rearrangeContractEvent(block, conEventMap)
startDBTick := utils.CurrentTimeMillisSeconds()
// 上链了
if err = cb.store.PutBlock(block, rwSet); err != nil {
...
}
dbLasts = utils.CurrentTimeMillisSeconds() - startDBTick
// clear snapshot 清理快照 因为已经落块了 快照其实就是还未落块的区块,在新增区块的时候 同时也会同步新增快照
startSnapshotTick := utils.CurrentTimeMillisSeconds()
if err = cb.snapshotManager.NotifyBlockCommitted(block); err != nil {
...
}
snapshotLasts = utils.CurrentTimeMillisSeconds() - startSnapshotTick
// notify chainConf to update config when config block committed
startConfTick := utils.CurrentTimeMillisSeconds()
// 更新账本缓存为最新的提交块
cb.ledgerCache.SetLastCommittedBlock(block)
//如果是配置块则广播给所有监听配置的事件
if err = NotifyChainConf(block, cb.chainConf); err != nil {
return 0, 0, 0, 0, 0, err
}
confLasts = utils.CurrentTimeMillisSeconds() - startConfTick
// publish contract event 这里就是合约触发的事件
var startPublishContractEventTick int64
if len(events) > 0 {
startPublishContractEventTick = utils.CurrentTimeMillisSeconds()
cb.log.Infof("start publish contractEventsInfo: block[%d] ,time[%d]", block.Header.BlockHeight, startPublishContractEventTick)
var eventsInfo []*commonpb.ContractEventInfo
// 构造合约事件需要传递的数据
for _, t := range events {
eventInfo := &commonpb.ContractEventInfo{
BlockHeight: block.Header.BlockHeight,
ChainId: block.Header.GetChainId(),
Topic: t.Topic,
TxId: t.TxId,
ContractName: t.ContractName,
ContractVersion: t.ContractVersion,
EventData: t.EventData,
}
eventsInfo = append(eventsInfo, eventInfo)
}
// 看这里就是通过消息总线去广播合约事件
cb.msgBus.Publish(msgbus.ContractEventInfo, &commonpb.ContractEventInfoList{ContractEvents: eventsInfo})
pubEvent = utils.CurrentTimeMillisSeconds() - startPublishContractEventTick
}
startOtherTick := utils.CurrentTimeMillisSeconds()
bi := &commonpb.BlockInfo{
Block: block,
RwsetList: rwSet,
}
// synchronize new block height to consensus and sync module
// 这里是就是广播的区块事件
cb.msgBus.PublishSafe(msgbus.BlockInfo, bi)
if err = cb.MonitorCommit(bi); err != nil {
return 0, 0, 0, 0, 0, err
}
otherLasts = utils.CurrentTimeMillisSeconds() - startOtherTick
return
}
由此可见 第二个问题答案
二:SubscribeTx,SubscribeBlock我们通过订阅交易来知道异步发送的交易是否上链,那么合约里的SubscribeContractEvent 触发时能否表示已经上链
答:SubscribeContractEvent,SubscribeTx,SubscribeBlock 都是在区块已经上链后才触发的
网友评论