美文网首页
信标链POS的思考

信标链POS的思考

作者: ttblack | 来源:发表于2021-05-12 14:31 被阅读0次

如何给验证者分配任务

信标链客户端在创建的时候会注册一个服务(registerValidatorService),此函数会创建一个ValidatorService(NewValidatorService),此Service启动(Start)后,会启动 一个run协程,这个协程主要做以下工作:

  1. 初始化validator 数据
  2. 等待validator 激活
  3. 等待下一个slot
  4. 给validator分配工作
  5. 确定自己当前slot的工作角色(proposer,Attester,Aggregator,unknown)
  6. 根据自己的角色,执行相应的任务

以下就是代码

func run(ctx context.Context, v iface.Validator) {
    cleanup := v.Done
    defer cleanup()
    if err := v.WaitForWalletInitialization(ctx); err != nil {
        // log.Fatalf will prevent defer from being called
        cleanup()
        log.Fatalf("Wallet is not ready: %v", err)
    }
    if featureconfig.Get().SlasherProtection {
        if err := v.SlasherReady(ctx); err != nil {
            log.Fatalf("Slasher is not ready: %v", err)
        }
    }
    ticker := time.NewTicker(backOffPeriod)
    defer ticker.Stop()

    var headSlot types.Slot
    firstTime := true
    for {
        if !firstTime {
            if ctx.Err() != nil {
                log.Info("Context canceled, stopping validator")
                return // Exit if context is canceled.
            }
            <-ticker.C
        } else {
            firstTime = false
        }
        err := v.WaitForChainStart(ctx)
        if isConnectionError(err) {
            log.Warnf("Could not determine if beacon chain started: %v", err)
            continue
        }
        if err != nil {
            log.Fatalf("Could not determine if beacon chain started: %v", err)
        }
        err = v.WaitForSync(ctx)
        if isConnectionError(err) {
            log.Warnf("Could not determine if beacon chain started: %v", err)
            continue
        }
        if err != nil {
            log.Fatalf("Could not determine if beacon node synced: %v", err)
        }
        err = v.WaitForActivation(ctx, nil /* accountsChangedChan */)
        if isConnectionError(err) {
            log.Warnf("Could not wait for validator activation: %v", err)
            continue
        }
        if err != nil {
            log.Fatalf("Could not wait for validator activation: %v", err)
        }
        headSlot, err = v.CanonicalHeadSlot(ctx)
        if isConnectionError(err) {
            log.Warnf("Could not get current canonical head slot: %v", err)
            continue
        }
        if err != nil {
            log.Fatalf("Could not get current canonical head slot: %v", err)
        }
        break
    }

    connectionErrorChannel := make(chan error, 1)
    go v.ReceiveBlocks(ctx, connectionErrorChannel)
    if err := v.UpdateDuties(ctx, headSlot); err != nil {
        handleAssignmentError(err, headSlot)
    }

    accountsChangedChan := make(chan [][48]byte, 1)
    sub := v.GetKeymanager().SubscribeAccountChanges(accountsChangedChan)
    for {
        slotCtx, cancel := context.WithCancel(ctx)
        ctx, span := trace.StartSpan(ctx, "validator.processSlot")

        select {
        case <-ctx.Done():
            log.Info("Context canceled, stopping validator")
            span.End()
            cancel()
            sub.Unsubscribe()
            close(accountsChangedChan)
            return // Exit if context is canceled.
        case blocksError := <-connectionErrorChannel:
            if blocksError != nil {
                log.WithError(blocksError).Warn("block stream interrupted")
                go v.ReceiveBlocks(ctx, connectionErrorChannel)
                continue
            }
        case newKeys := <-accountsChangedChan:
            anyActive, err := v.HandleKeyReload(ctx, newKeys)
            if err != nil {
                log.WithError(err).Error("Could not properly handle reloaded keys")
            }
            if !anyActive {
                log.Info("No active keys found. Waiting for activation...")
                err := v.WaitForActivation(ctx, accountsChangedChan)
                if err != nil {
                    log.Fatalf("Could not wait for validator activation: %v", err)
                }
            }
        case slot := <-v.NextSlot():
            span.AddAttributes(trace.Int64Attribute("slot", int64(slot)))

            remoteKm, ok := v.GetKeymanager().(remote.RemoteKeymanager)
            if ok {
                _, err := remoteKm.ReloadPublicKeys(ctx)
                if err != nil {
                    log.WithError(err).Error(msgCouldNotFetchKeys)
                }
            }

            allExited, err := v.AllValidatorsAreExited(ctx)
            if err != nil {
                log.WithError(err).Error("Could not check if validators are exited")
            }
            if allExited {
                log.Info("All validators are exited, no more work to perform...")
                continue
            }

            deadline := v.SlotDeadline(slot)
            slotCtx, cancel = context.WithDeadline(ctx, deadline)
            log := log.WithField("slot", slot)
            log.WithField("deadline", deadline).Debug("Set deadline for proposals and attestations")

            // Keep trying to update assignments if they are nil or if we are past an
            // epoch transition in the beacon node's state.
            if err := v.UpdateDuties(ctx, slot); err != nil {
                handleAssignmentError(err, slot)
                cancel()
                span.End()
                continue
            }

            // Start fetching domain data for the next epoch.
            if helpers.IsEpochEnd(slot) {
                go v.UpdateDomainDataCaches(ctx, slot+1)
            }

            var wg sync.WaitGroup

            allRoles, err := v.RolesAt(ctx, slot)
            if err != nil {
                log.WithError(err).Error("Could not get validator roles")
                span.End()
                continue
            }
            for pubKey, roles := range allRoles {
                wg.Add(len(roles))
                for _, role := range roles {
                    go func(role iface.ValidatorRole, pubKey [48]byte) {
                        defer wg.Done()
                        switch role {
                        case iface.RoleAttester:
                            v.SubmitAttestation(slotCtx, slot, pubKey)
                        case iface.RoleProposer:
                            v.ProposeBlock(slotCtx, slot, pubKey)
                        case iface.RoleAggregator:
                            v.SubmitAggregateAndProof(slotCtx, slot, pubKey)
                        case iface.RoleUnknown:
                            log.WithField("pubKey", fmt.Sprintf("%#x", bytesutil.Trunc(pubKey[:]))).Trace("No active roles, doing nothing")
                        default:
                            log.Warnf("Unhandled role %v", role)
                        }
                    }(role, pubKey)
                }
            }
            // Wait for all processes to complete, then report span complete.

            go func() {
                wg.Wait()
                // Log this client performance in the previous epoch
                v.LogAttestationsSubmitted()
                if err := v.LogValidatorGainsAndLosses(slotCtx, slot); err != nil {
                    log.WithError(err).Error("Could not report validator's rewards/penalties")
                }
                if err := v.LogNextDutyTimeLeft(slot); err != nil {
                    log.WithError(err).Error("Could not report next count down")
                }
                span.End()
            }()
        }
    }
    }

通过上面代码可以看到在 NextSlot中,也就是每更新一次slot,就会重新分配验证者的工作,使用的方法是UpdateDuties。然后根据自己的角色,再执行相应的职责:

  • RoleAttester进行区块投票
  • RoleProposer 生产区块
  • RoleAggregator 收集聚合证明并提交
  • RoleUnknown 暂时没职责

如何对链头投票

信标链的区块号用的是slot,表示当前最新区块由两个值表示,一个是slot,一个是区 块 hash. 两者结合起来,表示一个链在某个位置的状态。投票就是对这两个值进行投票。

信标链中区块hash用blockroot表示

这些投票都会打包进区块里,所以就有了聚合者(Aggregator),将这些投票合在一起打包进区块.

注意: 这个投票是对当前链的状态进行投票,表示认可当前的链,proposer才能在当前状态上继续出块,然后,proposer再出块的时候,将这些投票再打包进新出的块中。

如何防止分叉

  1. 既然大家会对当前区块状态投票,所以如果有分叉的时候,当然验证者会选择一个链头做当前状态进行投票,相当于,选择了一个分叉表示当前状态。和POW会选择最长链做为链头一样,POS会选择权重最高的链做为链头。

  2. 另外,防止验证者控制一个分片链,每一个slot,都会重新分配任务,并混洗。

  3. 就是惩罚了。如果长时间确定不了,就会 至少1/3的验证者都会受到惩罚

信标链如何选择分叉

beacon-chain/blockahin/head.go 这个文件实现了当前链的链头服务, updateHead方法就是每个slot,或每收到一个区块的时候,进行分叉的选择。

// Determined the head from the fork choice service and saves its new data
// (head root, head block, and head state) to the local service cache.
func (s *Service) updateHead(ctx context.Context, balances []uint64) error {
    ctx, span := trace.StartSpan(ctx, "blockChain.updateHead")
    defer span.End()

    // To get the proper head update, a node first checks its best justified
    // can become justified. This is designed to prevent bounce attack and
    // ensure head gets its best justified info.
    if s.bestJustifiedCheckpt.Epoch > s.justifiedCheckpt.Epoch {
        s.justifiedCheckpt = s.bestJustifiedCheckpt
        if err := s.cacheJustifiedStateBalances(ctx, bytesutil.ToBytes32(s.justifiedCheckpt.Root)); err != nil {
            return err
        }
    }

    // Get head from the fork choice service.
    f := s.finalizedCheckpt
    j := s.justifiedCheckpt
    // To get head before the first justified epoch, the fork choice will start with genesis root
    // instead of zero hashes.
    headStartRoot := bytesutil.ToBytes32(j.Root)
    if headStartRoot == params.BeaconConfig().ZeroHash {
        headStartRoot = s.genesisRoot
    }

    // In order to process head, fork choice store requires justified info.
    // If the fork choice store is missing justified block info, a node should
    // re-initiate fork choice store using the latest justified info.
    // This recovers a fatal condition and should not happen in run time.
    if !s.cfg.ForkChoiceStore.HasNode(headStartRoot) {
        jb, err := s.cfg.BeaconDB.Block(ctx, headStartRoot)
        if err != nil {
            return err
        }
        s.cfg.ForkChoiceStore = protoarray.New(j.Epoch, f.Epoch, bytesutil.ToBytes32(f.Root))
        if err := s.insertBlockToForkChoiceStore(ctx, jb.Block, headStartRoot, f, j); err != nil {
            return err
        }
    }

    headRoot, err := s.cfg.ForkChoiceStore.Head(ctx, j.Epoch, headStartRoot, balances, f.Epoch)
    if err != nil {
        return err
    }

    // Save head to the local service cache.
    return s.saveHead(ctx, headRoot)

节点间如何判断一个slot是否有效

func (s *Service) expired(slot types.Slot) bool {
    expirationSlot := slot + params.BeaconConfig().SlotsPerEpoch
    expirationTime := s.genesisTime + uint64(expirationSlot.Mul(params.BeaconConfig().SecondsPerSlot))
    currentTime := uint64(timeutils.Now().Unix())
    return currentTime >= expirationTime
}

计算一个slot是否有效,是通过创建区块的时间戳加上一个eopch的时间,再加上当前 slot的时间,做为最大时间,看是否超过了此值,来判断这个slot是否有效。

Slot转换时间戳

// SlotToTime takes the given slot and genesis time to determine the start time of the slot.
func SlotToTime(genesisTimeSec uint64, slot types.Slot) (time.Time, error) {
    timeSinceGenesis, err := slot.SafeMul(params.BeaconConfig().SecondsPerSlot)
    if err != nil {
        return time.Unix(0, 0), fmt.Errorf("slot (%d) is in the far distant future: %w", slot, err)
    }
    sTime, err := timeSinceGenesis.SafeAdd(genesisTimeSec)
    if err != nil {
        return time.Unix(0, 0), fmt.Errorf("slot (%d) is in the far distant future: %w", slot, err)
    }
    return time.Unix(int64(sTime), 0), nil
}

区块结构

type BeaconBlock struct {
    Slot                 github_com_prysmaticlabs_eth2_types.Slot           
    ProposerIndex        github_com_prysmaticlabs_eth2_types.ValidatorIndex 
    ParentRoot           []byte                                             
    StateRoot            []byte                                             
    Body                 *BeaconBlockBody                                  
    XXX_NoUnkeyedLiteral struct{}                                        
    XXX_unrecognized     []byte                                            
    XXX_sizecache        int32                                           
}

type BeaconBlockBody struct {
    RandaoReveal         []byte                 
    Eth1Data             *Eth1Data             
    Graffiti             []byte             
    ProposerSlashings    []*ProposerSlashing    
    AttesterSlashings    []*AttesterSlashing   
    Attestations         []*Attestation  
    Deposits             []*Deposit          
    VoluntaryExits       []*SignedVoluntaryExit 
    XXX_NoUnkeyedLiteral struct{}           
    XXX_unrecognized     []byte         
    XXX_sizecache        int32                
}

节点收到区块后做了什么

var processingPipeline = []processFunc{
    b.ProcessBlockHeader,
    b.ProcessRandao,
    processEth1DataFunc,
    VerifyOperationLengths,
    processProposerSlashingFunc,
    processAttesterSlashingFunc,
    b.ProcessAttestations,
    processDepositsFunc,
    processExitFunc,
}

func ProcessBlock(
    ctx context.Context,
    state iface.BeaconState,
    signed *ethpb.SignedBeaconBlock,
) (iface.BeaconState, error) {
    ctx, span := trace.StartSpan(ctx, "core.state.ProcessBlock")
    defer span.End()

    var err error
    if err = helpers.VerifyNilBeaconBlock(signed); err != nil {
        return nil, err
    }

    for _, p := range processingPipeline {
        state, err = p(ctx, state, signed)
        if err != nil {
            return nil, errors.Wrap(err, "Could not process block")
        }
    }

    return state, nil
}

根据上面的代码可以看到,收到区块后,依次处理

  • ProcessBlockHeader 验证区块头和proposer的签名
  • ProcessRandao 通过 rando验证proposer的正确性,并生成新的 rando mix
  • processEth1DataFunc 统计ETH1的投票
  • VerifyOperationLengths 验证相应验证者的操作数据的长度是否有效
  • processProposerSlashingFunc 处理对proposer的罚没
  • processAttesterSlashingFunc 处理对投票者的罚没 根据Casper FFG slashing条件来验证处理
  • b.ProcessAttestations 根据当前状态处理 对当前状态的投票和验签
  • processDepositsFunc 处理质押金,并验证是否可以成为验证者
  • processExitFunc 处理验证者的退出

通过上面可以看出。信标链的区块没有相应的交易数据。具体的交易数据在相应的分片链实现。

相关文章

网友评论

      本文标题:信标链POS的思考

      本文链接:https://www.haomeiwen.com/subject/xosadltx.html