如何给验证者分配任务
信标链客户端在创建的时候会注册一个服务(registerValidatorService),此函数会创建一个ValidatorService(NewValidatorService),此Service启动(Start)后,会启动 一个run协程,这个协程主要做以下工作:
- 初始化validator 数据
- 等待validator 激活
- 等待下一个slot
- 给validator分配工作
- 确定自己当前slot的工作角色(proposer,Attester,Aggregator,unknown)
- 根据自己的角色,执行相应的任务
以下就是代码
func run(ctx context.Context, v iface.Validator) {
cleanup := v.Done
defer cleanup()
if err := v.WaitForWalletInitialization(ctx); err != nil {
// log.Fatalf will prevent defer from being called
cleanup()
log.Fatalf("Wallet is not ready: %v", err)
}
if featureconfig.Get().SlasherProtection {
if err := v.SlasherReady(ctx); err != nil {
log.Fatalf("Slasher is not ready: %v", err)
}
}
ticker := time.NewTicker(backOffPeriod)
defer ticker.Stop()
var headSlot types.Slot
firstTime := true
for {
if !firstTime {
if ctx.Err() != nil {
log.Info("Context canceled, stopping validator")
return // Exit if context is canceled.
}
<-ticker.C
} else {
firstTime = false
}
err := v.WaitForChainStart(ctx)
if isConnectionError(err) {
log.Warnf("Could not determine if beacon chain started: %v", err)
continue
}
if err != nil {
log.Fatalf("Could not determine if beacon chain started: %v", err)
}
err = v.WaitForSync(ctx)
if isConnectionError(err) {
log.Warnf("Could not determine if beacon chain started: %v", err)
continue
}
if err != nil {
log.Fatalf("Could not determine if beacon node synced: %v", err)
}
err = v.WaitForActivation(ctx, nil /* accountsChangedChan */)
if isConnectionError(err) {
log.Warnf("Could not wait for validator activation: %v", err)
continue
}
if err != nil {
log.Fatalf("Could not wait for validator activation: %v", err)
}
headSlot, err = v.CanonicalHeadSlot(ctx)
if isConnectionError(err) {
log.Warnf("Could not get current canonical head slot: %v", err)
continue
}
if err != nil {
log.Fatalf("Could not get current canonical head slot: %v", err)
}
break
}
connectionErrorChannel := make(chan error, 1)
go v.ReceiveBlocks(ctx, connectionErrorChannel)
if err := v.UpdateDuties(ctx, headSlot); err != nil {
handleAssignmentError(err, headSlot)
}
accountsChangedChan := make(chan [][48]byte, 1)
sub := v.GetKeymanager().SubscribeAccountChanges(accountsChangedChan)
for {
slotCtx, cancel := context.WithCancel(ctx)
ctx, span := trace.StartSpan(ctx, "validator.processSlot")
select {
case <-ctx.Done():
log.Info("Context canceled, stopping validator")
span.End()
cancel()
sub.Unsubscribe()
close(accountsChangedChan)
return // Exit if context is canceled.
case blocksError := <-connectionErrorChannel:
if blocksError != nil {
log.WithError(blocksError).Warn("block stream interrupted")
go v.ReceiveBlocks(ctx, connectionErrorChannel)
continue
}
case newKeys := <-accountsChangedChan:
anyActive, err := v.HandleKeyReload(ctx, newKeys)
if err != nil {
log.WithError(err).Error("Could not properly handle reloaded keys")
}
if !anyActive {
log.Info("No active keys found. Waiting for activation...")
err := v.WaitForActivation(ctx, accountsChangedChan)
if err != nil {
log.Fatalf("Could not wait for validator activation: %v", err)
}
}
case slot := <-v.NextSlot():
span.AddAttributes(trace.Int64Attribute("slot", int64(slot)))
remoteKm, ok := v.GetKeymanager().(remote.RemoteKeymanager)
if ok {
_, err := remoteKm.ReloadPublicKeys(ctx)
if err != nil {
log.WithError(err).Error(msgCouldNotFetchKeys)
}
}
allExited, err := v.AllValidatorsAreExited(ctx)
if err != nil {
log.WithError(err).Error("Could not check if validators are exited")
}
if allExited {
log.Info("All validators are exited, no more work to perform...")
continue
}
deadline := v.SlotDeadline(slot)
slotCtx, cancel = context.WithDeadline(ctx, deadline)
log := log.WithField("slot", slot)
log.WithField("deadline", deadline).Debug("Set deadline for proposals and attestations")
// Keep trying to update assignments if they are nil or if we are past an
// epoch transition in the beacon node's state.
if err := v.UpdateDuties(ctx, slot); err != nil {
handleAssignmentError(err, slot)
cancel()
span.End()
continue
}
// Start fetching domain data for the next epoch.
if helpers.IsEpochEnd(slot) {
go v.UpdateDomainDataCaches(ctx, slot+1)
}
var wg sync.WaitGroup
allRoles, err := v.RolesAt(ctx, slot)
if err != nil {
log.WithError(err).Error("Could not get validator roles")
span.End()
continue
}
for pubKey, roles := range allRoles {
wg.Add(len(roles))
for _, role := range roles {
go func(role iface.ValidatorRole, pubKey [48]byte) {
defer wg.Done()
switch role {
case iface.RoleAttester:
v.SubmitAttestation(slotCtx, slot, pubKey)
case iface.RoleProposer:
v.ProposeBlock(slotCtx, slot, pubKey)
case iface.RoleAggregator:
v.SubmitAggregateAndProof(slotCtx, slot, pubKey)
case iface.RoleUnknown:
log.WithField("pubKey", fmt.Sprintf("%#x", bytesutil.Trunc(pubKey[:]))).Trace("No active roles, doing nothing")
default:
log.Warnf("Unhandled role %v", role)
}
}(role, pubKey)
}
}
// Wait for all processes to complete, then report span complete.
go func() {
wg.Wait()
// Log this client performance in the previous epoch
v.LogAttestationsSubmitted()
if err := v.LogValidatorGainsAndLosses(slotCtx, slot); err != nil {
log.WithError(err).Error("Could not report validator's rewards/penalties")
}
if err := v.LogNextDutyTimeLeft(slot); err != nil {
log.WithError(err).Error("Could not report next count down")
}
span.End()
}()
}
}
}
通过上面代码可以看到在 NextSlot中,也就是每更新一次slot,就会重新分配验证者的工作,使用的方法是UpdateDuties。然后根据自己的角色,再执行相应的职责:
- RoleAttester进行区块投票
- RoleProposer 生产区块
- RoleAggregator 收集聚合证明并提交
- RoleUnknown 暂时没职责
如何对链头投票
信标链的区块号用的是slot,表示当前最新区块由两个值表示,一个是slot,一个是区 块 hash. 两者结合起来,表示一个链在某个位置的状态。投票就是对这两个值进行投票。
信标链中区块hash用blockroot表示
这些投票都会打包进区块里,所以就有了聚合者(Aggregator),将这些投票合在一起打包进区块.
注意: 这个投票是对当前链的状态进行投票,表示认可当前的链,proposer才能在当前状态上继续出块,然后,proposer再出块的时候,将这些投票再打包进新出的块中。
如何防止分叉
-
既然大家会对当前区块状态投票,所以如果有分叉的时候,当然验证者会选择一个链头做当前状态进行投票,相当于,选择了一个分叉表示当前状态。和POW会选择最长链做为链头一样,POS会选择权重最高的链做为链头。
-
另外,防止验证者控制一个分片链,每一个slot,都会重新分配任务,并混洗。
-
就是惩罚了。如果长时间确定不了,就会 至少1/3的验证者都会受到惩罚
信标链如何选择分叉
beacon-chain/blockahin/head.go 这个文件实现了当前链的链头服务, updateHead方法就是每个slot,或每收到一个区块的时候,进行分叉的选择。
// Determined the head from the fork choice service and saves its new data
// (head root, head block, and head state) to the local service cache.
func (s *Service) updateHead(ctx context.Context, balances []uint64) error {
ctx, span := trace.StartSpan(ctx, "blockChain.updateHead")
defer span.End()
// To get the proper head update, a node first checks its best justified
// can become justified. This is designed to prevent bounce attack and
// ensure head gets its best justified info.
if s.bestJustifiedCheckpt.Epoch > s.justifiedCheckpt.Epoch {
s.justifiedCheckpt = s.bestJustifiedCheckpt
if err := s.cacheJustifiedStateBalances(ctx, bytesutil.ToBytes32(s.justifiedCheckpt.Root)); err != nil {
return err
}
}
// Get head from the fork choice service.
f := s.finalizedCheckpt
j := s.justifiedCheckpt
// To get head before the first justified epoch, the fork choice will start with genesis root
// instead of zero hashes.
headStartRoot := bytesutil.ToBytes32(j.Root)
if headStartRoot == params.BeaconConfig().ZeroHash {
headStartRoot = s.genesisRoot
}
// In order to process head, fork choice store requires justified info.
// If the fork choice store is missing justified block info, a node should
// re-initiate fork choice store using the latest justified info.
// This recovers a fatal condition and should not happen in run time.
if !s.cfg.ForkChoiceStore.HasNode(headStartRoot) {
jb, err := s.cfg.BeaconDB.Block(ctx, headStartRoot)
if err != nil {
return err
}
s.cfg.ForkChoiceStore = protoarray.New(j.Epoch, f.Epoch, bytesutil.ToBytes32(f.Root))
if err := s.insertBlockToForkChoiceStore(ctx, jb.Block, headStartRoot, f, j); err != nil {
return err
}
}
headRoot, err := s.cfg.ForkChoiceStore.Head(ctx, j.Epoch, headStartRoot, balances, f.Epoch)
if err != nil {
return err
}
// Save head to the local service cache.
return s.saveHead(ctx, headRoot)
节点间如何判断一个slot是否有效
func (s *Service) expired(slot types.Slot) bool {
expirationSlot := slot + params.BeaconConfig().SlotsPerEpoch
expirationTime := s.genesisTime + uint64(expirationSlot.Mul(params.BeaconConfig().SecondsPerSlot))
currentTime := uint64(timeutils.Now().Unix())
return currentTime >= expirationTime
}
计算一个slot是否有效,是通过创建区块的时间戳加上一个eopch的时间,再加上当前 slot的时间,做为最大时间,看是否超过了此值,来判断这个slot是否有效。
Slot转换时间戳
// SlotToTime takes the given slot and genesis time to determine the start time of the slot.
func SlotToTime(genesisTimeSec uint64, slot types.Slot) (time.Time, error) {
timeSinceGenesis, err := slot.SafeMul(params.BeaconConfig().SecondsPerSlot)
if err != nil {
return time.Unix(0, 0), fmt.Errorf("slot (%d) is in the far distant future: %w", slot, err)
}
sTime, err := timeSinceGenesis.SafeAdd(genesisTimeSec)
if err != nil {
return time.Unix(0, 0), fmt.Errorf("slot (%d) is in the far distant future: %w", slot, err)
}
return time.Unix(int64(sTime), 0), nil
}
区块结构
type BeaconBlock struct {
Slot github_com_prysmaticlabs_eth2_types.Slot
ProposerIndex github_com_prysmaticlabs_eth2_types.ValidatorIndex
ParentRoot []byte
StateRoot []byte
Body *BeaconBlockBody
XXX_NoUnkeyedLiteral struct{}
XXX_unrecognized []byte
XXX_sizecache int32
}
type BeaconBlockBody struct {
RandaoReveal []byte
Eth1Data *Eth1Data
Graffiti []byte
ProposerSlashings []*ProposerSlashing
AttesterSlashings []*AttesterSlashing
Attestations []*Attestation
Deposits []*Deposit
VoluntaryExits []*SignedVoluntaryExit
XXX_NoUnkeyedLiteral struct{}
XXX_unrecognized []byte
XXX_sizecache int32
}
节点收到区块后做了什么
var processingPipeline = []processFunc{
b.ProcessBlockHeader,
b.ProcessRandao,
processEth1DataFunc,
VerifyOperationLengths,
processProposerSlashingFunc,
processAttesterSlashingFunc,
b.ProcessAttestations,
processDepositsFunc,
processExitFunc,
}
func ProcessBlock(
ctx context.Context,
state iface.BeaconState,
signed *ethpb.SignedBeaconBlock,
) (iface.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "core.state.ProcessBlock")
defer span.End()
var err error
if err = helpers.VerifyNilBeaconBlock(signed); err != nil {
return nil, err
}
for _, p := range processingPipeline {
state, err = p(ctx, state, signed)
if err != nil {
return nil, errors.Wrap(err, "Could not process block")
}
}
return state, nil
}
根据上面的代码可以看到,收到区块后,依次处理
- ProcessBlockHeader 验证区块头和proposer的签名
- ProcessRandao 通过 rando验证proposer的正确性,并生成新的 rando mix
- processEth1DataFunc 统计ETH1的投票
- VerifyOperationLengths 验证相应验证者的操作数据的长度是否有效
- processProposerSlashingFunc 处理对proposer的罚没
- processAttesterSlashingFunc 处理对投票者的罚没 根据Casper FFG slashing条件来验证处理
- b.ProcessAttestations 根据当前状态处理 对当前状态的投票和验签
- processDepositsFunc 处理质押金,并验证是否可以成为验证者
- processExitFunc 处理验证者的退出
通过上面可以看出。信标链的区块没有相应的交易数据。具体的交易数据在相应的分片链实现。
网友评论