美文网首页
以太坊源码(1)——geth cli 启动流程

以太坊源码(1)——geth cli 启动流程

作者: Jarvist | 来源:发表于2019-04-15 20:12 被阅读0次

    ethereum 协议

    1、以太坊启动

    (1)控制台程序geth

    通过app.Run()启动程序,run之前需要init。

    
    func init() {
        // Initialize the CLI app and start Geth
        app.Action = geth
        app.HideVersion = true // we have a command to print the version
        app.Copyright = "Copyright 2013-2019 The go-ethereum Authors"
        app.Commands = []cli.Command{
            // See chaincmd.go:
            initCommand,
            importCommand,
            exportCommand,
            importPreimagesCommand,
            exportPreimagesCommand,
            copydbCommand,
            removedbCommand,
            dumpCommand,
            // See accountcmd.go:
            accountCommand,
            walletCommand,
            // See consolecmd.go:
            consoleCommand,
            attachCommand,
            javascriptCommand,
            // See misccmd.go:
            makecacheCommand,
            makedagCommand,
            versionCommand,
            licenseCommand,
            // See config.go
            dumpConfigCommand,
        }
        sort.Sort(cli.CommandsByName(app.Commands))
    
        app.Flags = append(app.Flags, nodeFlags...)
        app.Flags = append(app.Flags, rpcFlags...)
        app.Flags = append(app.Flags, consoleFlags...)
        app.Flags = append(app.Flags, debug.Flags...)
        app.Flags = append(app.Flags, whisperFlags...)
        app.Flags = append(app.Flags, metricsFlags...)
    
        app.Before = func(ctx *cli.Context) error {
            logdir := ""
            if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
                logdir = (&node.Config{DataDir: utils.MakeDataDir(ctx)}).ResolvePath("logs")
            }
            if err := debug.Setup(ctx, logdir); err != nil {
                return err
            }
            // Cap the cache allowance and tune the garbage collector
            var mem gosigar.Mem
            if err := mem.Get(); err == nil {
                allowance := int(mem.Total / 1024 / 1024 / 3)
                if cache := ctx.GlobalInt(utils.CacheFlag.Name); cache > allowance {
                    log.Warn("Sanitizing cache to Go's GC limits", "provided", cache, "updated", allowance)
                    ctx.GlobalSet(utils.CacheFlag.Name, strconv.Itoa(allowance))
                }
            }
            // Ensure Go's GC ignores the database cache for trigger percentage
            cache := ctx.GlobalInt(utils.CacheFlag.Name)
            gogc := math.Max(20, math.Min(100, 100/(float64(cache)/1024)))
    
            log.Debug("Sanitizing Go's GC trigger", "percent", int(gogc))
            godebug.SetGCPercent(int(gogc))
    
            // Start metrics export if enabled
            utils.SetupMetrics(ctx)
    
            // Start system runtime metrics collection
            go metrics.CollectProcessMetrics(3 * time.Second)
    
            return nil
        }
    
        app.After = func(ctx *cli.Context) error {
            debug.Exit()
            console.Stdin.Close() // Resets terminal mode.
            return nil
        }
    }
    

    app.Action=geth 指定了app的默认Action为geth函数

    func geth(ctx *cli.Context) error {
        if args := ctx.Args(); len(args) > 0 {
            return fmt.Errorf("invalid command: %q", args[0])
        }
        node := makeFullNode(ctx)
        defer node.Close()
        startNode(ctx, node)
        node.Wait()
        return nil
    }
    

    geth函数通过makefullNode()和startNode()函数c创建一个node节点以及开启Node的网络服务。

    app.Command指定了geth cli的子命令,通过匹配命令行参数调用不同的函数。

    回到上面所说的app.Run(),在geth/main.go中,main函数调用app.Run()运行geth

    func main() {
        if err := app.Run(os.Args); err != nil {
            fmt.Fprintln(os.Stderr, err)
            os.Exit(1)
        }
    }
    

    Ctrl+leftclick跟一步,来到cli.v1/app.go。

    func (a *App) Run(arguments []string) (err error) {
        a.Setup()
    
        // handle the completion flag separately from the flagset since
        // completion could be attempted after a flag, but before its value was put
        // on the command line. this causes the flagset to interpret the completion
        // flag name as the value of the flag before it which is undesirable
        // note that we can only do this because the shell autocomplete function
        // always appends the completion flag at the end of the command
        shellComplete, arguments := checkShellCompleteFlag(a, arguments)
    
        // parse flags
        set, err := flagSet(a.Name, a.Flags)
        if err != nil {
            return err
        }
    
        set.SetOutput(ioutil.Discard)
        err = set.Parse(arguments[1:])
        nerr := normalizeFlags(a.Flags, set)
        context := NewContext(a, set, nil)
        if nerr != nil {
            fmt.Fprintln(a.Writer, nerr)
            ShowAppHelp(context)
            return nerr
        }
        context.shellComplete = shellComplete
    
        if checkCompletions(context) {
            return nil
        }
    
        if err != nil {
            if a.OnUsageError != nil {
                err := a.OnUsageError(context, err, false)
                HandleExitCoder(err)
                return err
            }
            fmt.Fprintf(a.Writer, "%s %s\n\n", "Incorrect Usage.", err.Error())
            ShowAppHelp(context)
            return err
        }
    
        if !a.HideHelp && checkHelp(context) {
            ShowAppHelp(context)
            return nil
        }
    
        if !a.HideVersion && checkVersion(context) {
            ShowVersion(context)
            return nil
        }
    
        if a.After != nil {
            defer func() {
                if afterErr := a.After(context); afterErr != nil {
                    if err != nil {
                        err = NewMultiError(err, afterErr)
                    } else {
                        err = afterErr
                    }
                }
            }()
        }
    
        if a.Before != nil {
            beforeErr := a.Before(context)
            if beforeErr != nil {
                ShowAppHelp(context)
                HandleExitCoder(beforeErr)
                err = beforeErr
                return err
            }
        }
    
        args := context.Args()
        if args.Present() {
            name := args.First()
            c := a.Command(name)
            if c != nil {
                return c.Run(context)
            }
        }
    
        if a.Action == nil {
            a.Action = helpCommand.Action
        }
    
        // Run default Action
        err = HandleAction(a.Action, context)
    
        HandleExitCoder(err)
        return err
    }
    

    首先调用a.Setup(),setup是为RunAPP所需要的数据做预处理确保数据都准备完毕,包括Author,Email,Commandsd等。这里就不贴出来了。

    然后Run中还对Shell命令进行了处理,因为有可能在flag到之前就将命令放入Context进行处理?(没太看懂),然后还是一堆处理包括是否隐藏help,处理完命令前后的返回的函数等等,然后从Context中获取args

    args := context.Args()
        if args.Present() {
            name := args.First()
            c := a.Command(name)
            if c != nil {
                return c.Run(context)
            }
        }
    

    如果!=nil,就在Command.Run

    
    // Run invokes the command given the context, parses ctx.Args() to generate command-specific flags
    func (c Command) Run(ctx *Context) (err error) {
        if len(c.Subcommands) > 0 {
            return c.startApp(ctx)
        }
    
        if !c.HideHelp && (HelpFlag != BoolFlag{}) {
            // append help to flags
            c.Flags = append(
                c.Flags,
                HelpFlag,
            )
        }
    
        set, err := flagSet(c.Name, c.Flags)
        if err != nil {
            return err
        }
        set.SetOutput(ioutil.Discard)
    
        if c.SkipFlagParsing {
            err = set.Parse(append([]string{"--"}, ctx.Args().Tail()...))
        } else if !c.SkipArgReorder {
            firstFlagIndex := -1
            terminatorIndex := -1
            for index, arg := range ctx.Args() {
                if arg == "--" {
                    terminatorIndex = index
                    break
                } else if arg == "-" {
                    // Do nothing. A dash alone is not really a flag.
                    continue
                } else if strings.HasPrefix(arg, "-") && firstFlagIndex == -1 {
                    firstFlagIndex = index
                }
            }
    
            if firstFlagIndex > -1 {
                args := ctx.Args()
                regularArgs := make([]string, len(args[1:firstFlagIndex]))
                copy(regularArgs, args[1:firstFlagIndex])
    
                var flagArgs []string
                if terminatorIndex > -1 {
                    flagArgs = args[firstFlagIndex:terminatorIndex]
                    regularArgs = append(regularArgs, args[terminatorIndex:]...)
                } else {
                    flagArgs = args[firstFlagIndex:]
                }
    
                err = set.Parse(append(flagArgs, regularArgs...))
            } else {
                err = set.Parse(ctx.Args().Tail())
            }
        } else {
            err = set.Parse(ctx.Args().Tail())
        }
    
        nerr := normalizeFlags(c.Flags, set)
        if nerr != nil {
            fmt.Fprintln(ctx.App.Writer, nerr)
            fmt.Fprintln(ctx.App.Writer)
            ShowCommandHelp(ctx, c.Name)
            return nerr
        }
    
        context := NewContext(ctx.App, set, ctx)
        context.Command = c
        if checkCommandCompletions(context, c.Name) {
            return nil
        }
    
        if err != nil {
            if c.OnUsageError != nil {
                err := c.OnUsageError(context, err, false)
                HandleExitCoder(err)
                return err
            }
            fmt.Fprintln(context.App.Writer, "Incorrect Usage:", err.Error())
            fmt.Fprintln(context.App.Writer)
            ShowCommandHelp(context, c.Name)
            return err
        }
    
        if checkCommandHelp(context, c.Name) {
            return nil
        }
    
        if c.After != nil {
            defer func() {
                afterErr := c.After(context)
                if afterErr != nil {
                    HandleExitCoder(err)
                    if err != nil {
                        err = NewMultiError(err, afterErr)
                    } else {
                        err = afterErr
                    }
                }
            }()
        }
    
        if c.Before != nil {
            err = c.Before(context)
            if err != nil {
                ShowCommandHelp(context, c.Name)
                HandleExitCoder(err)
                return err
            }
        }
    
        if c.Action == nil {
            c.Action = helpSubcommand.Action
        }
    
        err = HandleAction(c.Action, context)
    
        if err != nil {
            HandleExitCoder(err)
        }
        return err
    }
    

    如果Context的子命令为空,就newApp,否则对传入的args进行解析,放入全局Context中,调用HandleAction处理命令。

    func HandleAction(action interface{}, context *Context) (err error) {
        if a, ok := action.(ActionFunc); ok {
            return a(context)
        } else if a, ok := action.(func(*Context) error); ok {
            return a(context)
        } else if a, ok := action.(func(*Context)); ok { // deprecated function signature
            a(context)
            return nil
        } else {
            return errInvalidActionType
        }
    }
    

    如果是ActionFunc就调用第一个if,初始化时候传入的Context不是ActionFunc所以启动第二个if,第三个if舍弃返回值。

    至此,CLI run起来了。Eth开始创建节点。

    (2)看MakeFullNode()。

    
    func makeFullNode(ctx *cli.Context) *node.Node {
        stack, cfg := makeConfigNode(ctx)
        if ctx.GlobalIsSet(utils.ConstantinopleOverrideFlag.Name) {
            cfg.Eth.ConstantinopleOverride = new(big.Int).SetUint64(ctx.GlobalUint64(utils.ConstantinopleOverrideFlag.Name))
        }
        utils.RegisterEthService(stack, &cfg.Eth)
    
        if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
            utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
        }
        // Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
        shhEnabled := enableWhisper(ctx)
        shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name)
        if shhEnabled || shhAutoEnabled {
            if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
                cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
            }
            if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
                cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
            }
            if ctx.GlobalIsSet(utils.WhisperRestrictConnectionBetweenLightClientsFlag.Name) {
                cfg.Shh.RestrictConnectionBetweenLightClients = true
            }
            utils.RegisterShhService(stack, &cfg.Shh)
        }
    
        // Configure GraphQL if required
        if ctx.GlobalIsSet(utils.GraphQLEnabledFlag.Name) {
            if err := graphql.RegisterGraphQLService(stack, cfg.Node.GraphQLEndpoint(), cfg.Node.GraphQLCors, cfg.Node.GraphQLVirtualHosts, cfg.Node.HTTPTimeouts); err != nil {
                utils.Fatalf("Failed to register the Ethereum service: %v", err)
            }
        }
    
        // Add the Ethereum Stats daemon if requested.
        if cfg.Ethstats.URL != "" {
            utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
        }
        return stack
    }
    

    这个函数会创建Node,然后register Eth Service,然后还有shh服务。先看第一个makeConfigNode(),

    
    func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
        // Load defaults.
        cfg := gethConfig{
            Eth:       eth.DefaultConfig,
            Shh:       whisper.DefaultConfig,
            Node:      defaultNodeConfig(),
            Dashboard: dashboard.DefaultConfig,
        }
    
        // Load config file.
        if file := ctx.GlobalString(configFileFlag.Name); file != "" {
            if err := loadConfig(file, &cfg); err != nil {
                utils.Fatalf("%v", err)
            }
        }
    
        // Apply flags.
        utils.SetULC(ctx, &cfg.Eth)
        utils.SetNodeConfig(ctx, &cfg.Node)
        stack, err := node.New(&cfg.Node)
        if err != nil {
            utils.Fatalf("Failed to create the protocol stack: %v", err)
        }
        utils.SetEthConfig(ctx, stack, &cfg.Eth)
        if ctx.GlobalIsSet(utils.EthStatsURLFlag.Name) {
            cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name)
        }
    
        utils.SetShhConfig(ctx, stack, &cfg.Shh)
        utils.SetDashboardConfig(ctx, &cfg.Dashboard)
    
        return stack, cfg
    }
    

    首先初始化gethConfig,然后用node.New()创建Node。

    
    // New creates a new P2P node, ready for protocol registration.
    func New(conf *Config) (*Node, error) {
        // Copy config and resolve the datadir so future changes to the current
        // working directory don't affect the node.
        confCopy := *conf
        conf = &confCopy
        if conf.DataDir != "" {
            absdatadir, err := filepath.Abs(conf.DataDir)
            if err != nil {
                return nil, err
            }
            conf.DataDir = absdatadir
        }
        // Ensure that the instance name doesn't cause weird conflicts with
        // other files in the data directory.
        if strings.ContainsAny(conf.Name, `/\`) {
            return nil, errors.New(`Config.Name must not contain '/' or '\'`)
        }
        if conf.Name == datadirDefaultKeyStore {
            return nil, errors.New(`Config.Name cannot be "` + datadirDefaultKeyStore + `"`)
        }
        if strings.HasSuffix(conf.Name, ".ipc") {
            return nil, errors.New(`Config.Name cannot end in ".ipc"`)
        }
        // Ensure that the AccountManager method works before the node has started.
        // We rely on this in cmd/geth.
        am, ephemeralKeystore, err := makeAccountManager(conf)
        if err != nil {
            return nil, err
        }
        if conf.Logger == nil {
            conf.Logger = log.New()
        }
        // Note: any interaction with Config that would create/touch files
        // in the data directory or instance directory is delayed until Start.
        return &Node{
            accman:            am,
            ephemeralKeystore: ephemeralKeystore,
            config:            conf,
            serviceFuncs:      []ServiceConstructor{},
            ipcEndpoint:       conf.IPCEndpoint(),
            httpEndpoint:      conf.HTTPEndpoint(),
            wsEndpoint:        conf.WSEndpoint(),
            eventmux:          new(event.TypeMux),
            log:               conf.Logger,
        }, nil
    }
    

    主要是配置数据文件保存路径,验证config文件是否合法。然后调用makeAccountManger,返回Node对象。点makeAccountManger看看。

    
    func makeAccountManager(conf *Config) (*accounts.Manager, string, error) {
        scryptN, scryptP, keydir, err := conf.AccountConfig()
        var ephemeral string
        if keydir == "" {
            // There is no datadir.
            keydir, err = ioutil.TempDir("", "go-ethereum-keystore")
            ephemeral = keydir
        }
    
        if err != nil {
            return nil, "", err
        }
        if err := os.MkdirAll(keydir, 0700); err != nil {
            return nil, "", err
        }
        // Assemble the account manager and supported backends
        var backends []accounts.Backend
        if len(conf.ExternalSigner) > 0 {
            log.Info("Using external signer", "url", conf.ExternalSigner)
            if extapi, err := external.NewExternalBackend(conf.ExternalSigner); err == nil {
                backends = append(backends, extapi)
            } else {
                return nil, "", fmt.Errorf("error connecting to external signer: %v", err)
            }
        }
        if len(backends) == 0 {
            // For now, we're using EITHER external signer OR local signers.
            // If/when we implement some form of lockfile for USB and keystore wallets,
            // we can have both, but it's very confusing for the user to see the same
            // accounts in both externally and locally, plus very racey.
            backends = append(backends, keystore.NewKeyStore(keydir, scryptN, scryptP))
            if !conf.NoUSB {
                // Start a USB hub for Ledger hardware wallets
                if ledgerhub, err := usbwallet.NewLedgerHub(); err != nil {
                    log.Warn(fmt.Sprintf("Failed to start Ledger hub, disabling: %v", err))
                } else {
                    backends = append(backends, ledgerhub)
                }
                // Start a USB hub for Trezor hardware wallets
                if trezorhub, err := usbwallet.NewTrezorHub(); err != nil {
                    log.Warn(fmt.Sprintf("Failed to start Trezor hub, disabling: %v", err))
                } else {
                    backends = append(backends, trezorhub)
                }
            }
            // Start a smart card hub
            if schub, err := scwallet.NewHub(scwallet.Scheme, keydir); err != nil {
                log.Warn(fmt.Sprintf("Failed to start smart card hub, disabling: %v", err))
            } else {
                backends = append(backends, schub)
            }
        }
    
        return accounts.NewManager(&accounts.Config{InsecureUnlockAllowed: conf.InsecureUnlockAllowed}, backends...), ephemeral, nil
    }
    

    上面这个函数主要是对keystore的存储路径进行了配置,然后调用NewKeyStore()。

    func NewKeyStore(keydir string, scryptN, scryptP int) *KeyStore {
        keydir, _ = filepath.Abs(keydir)
        ks := &KeyStore{storage: &keyStorePassphrase{keydir, scryptN, scryptP, false}}
        ks.init(keydir)
        return ks
    }
    

    调用了ks.init,点进去看看。

    func (ks *KeyStore) init(keydir string) {
        // Lock the mutex since the account cache might call back with events
        ks.mu.Lock()
        defer ks.mu.Unlock()
    
        // Initialize the set of unlocked keys and the account cache
        ks.unlocked = make(map[common.Address]*unlocked)
        ks.cache, ks.changes = newAccountCache(keydir)
    
        // TODO: In order for this finalizer to work, there must be no references
        // to ks. addressCache doesn't keep a reference but unlocked keys do,
        // so the finalizer will not trigger until all timed unlocks have expired.
        runtime.SetFinalizer(ks, func(m *KeyStore) {
            m.cache.close()
        })
        // Create the initial list of wallets from the cache
        accs := ks.cache.accounts()
        ks.wallets = make([]accounts.Wallet, len(accs))
        for i := 0; i < len(accs); i++ {
            ks.wallets[i] = &keystoreWallet{account: accs[i], keystore: ks}
        }
    }
    

    多线程lock,调用ks.cache.accounts()读取账户,然后创建wallets,账户存储在keystore中。

    看ks.cache.accounts()。

    func (ac *accountCache) accounts() []accounts.Account {
        ac.maybeReload()
        ac.mu.Lock()
        defer ac.mu.Unlock()
        cpy := make([]accounts.Account, len(ac.all))
        copy(cpy, ac.all)
        return cpy
    }
    

    调用ac.maybeReload()加载,防止修改copy一份。
    看maybeReload()是怎么加载的。

    func (ac *accountCache) maybeReload() {
        ac.mu.Lock()
    
        if ac.watcher.running {
            ac.mu.Unlock()
            return // A watcher is running and will keep the cache up-to-date.
        }
        if ac.throttle == nil {
            ac.throttle = time.NewTimer(0)
        } else {
            select {
            case <-ac.throttle.C:
            default:
                ac.mu.Unlock()
                return // The cache was reloaded recently.
            }
        }
        // No watcher running, start it.
        ac.watcher.start()
        ac.throttle.Reset(minReloadInterval)
        ac.mu.Unlock()
        ac.scanAccounts()
    }
    

    用goroutine启动一个watcher监听变化,然后调用scanAccounts().

    
    // scanAccounts checks if any changes have occurred on the filesystem, and
    // updates the account cache accordingly
    func (ac *accountCache) scanAccounts() error {
        // Scan the entire folder metadata for file changes
        creates, deletes, updates, err := ac.fileC.scan(ac.keydir)
        if err != nil {
            log.Debug("Failed to reload keystore contents", "err", err)
            return err
        }
        if creates.Cardinality() == 0 && deletes.Cardinality() == 0 && updates.Cardinality() == 0 {
            return nil
        }
        // Create a helper method to scan the contents of the key files
        var (
            buf = new(bufio.Reader)
            key struct {
                Address string `json:"address"`
            }
        )
        readAccount := func(path string) *accounts.Account {
            fd, err := os.Open(path)
            if err != nil {
                log.Trace("Failed to open keystore file", "path", path, "err", err)
                return nil
            }
            defer fd.Close()
            buf.Reset(fd)
            // Parse the address.
            key.Address = ""
            err = json.NewDecoder(buf).Decode(&key)
            addr := common.HexToAddress(key.Address)
            switch {
            case err != nil:
                log.Debug("Failed to decode keystore key", "path", path, "err", err)
            case (addr == common.Address{}):
                log.Debug("Failed to decode keystore key", "path", path, "err", "missing or zero address")
            default:
                return &accounts.Account{
                    Address: addr,
                    URL:     accounts.URL{Scheme: KeyStoreScheme, Path: path},
                }
            }
            return nil
        }
        // Process all the file diffs
        start := time.Now()
    
        for _, p := range creates.ToSlice() {
            if a := readAccount(p.(string)); a != nil {
                ac.add(*a)
            }
        }
        for _, p := range deletes.ToSlice() {
            ac.deleteByFile(p.(string))
        }
        for _, p := range updates.ToSlice() {
            path := p.(string)
            ac.deleteByFile(path)
            if a := readAccount(path); a != nil {
                ac.add(*a)
            }
        }
        end := time.Now()
    
        select {
        case ac.notify <- struct{}{}:
        default:
        }
        log.Trace("Handled keystore changes", "time", end.Sub(start))
        return nil
    }
    

    直接从文件中加载账户信息。该方法首先获取该目录下的文件,然后读取文件内容,JSON解析其address字段。
    Node创建完毕。

    (3)看startNode()。

    回到geth/main.go 。

    // startNode boots up the system node and all registered protocols, after which
    // it unlocks any requested accounts, and starts the RPC/IPC interfaces and the
    // miner.
    func startNode(ctx *cli.Context, stack *node.Node) {
        debug.Memsize.Add("node", stack)
    
        // Start up the node itself
        utils.StartNode(stack)
    
        // Unlock any account specifically requested
        unlockAccounts(ctx, stack)
    
        // Register wallet event handlers to open and auto-derive wallets
        events := make(chan accounts.WalletEvent, 16)
        stack.AccountManager().Subscribe(events)
    
        go func() {
            // Create a chain state reader for self-derivation
            rpcClient, err := stack.Attach()
            if err != nil {
                utils.Fatalf("Failed to attach to self: %v", err)
            }
            stateReader := ethclient.NewClient(rpcClient)
    
            // Open any wallets already attached
            for _, wallet := range stack.AccountManager().Wallets() {
                if err := wallet.Open(""); err != nil {
                    log.Warn("Failed to open wallet", "url", wallet.URL(), "err", err)
                }
            }
            // Listen for wallet event till termination
            for event := range events {
                switch event.Kind {
                case accounts.WalletArrived:
                    if err := event.Wallet.Open(""); err != nil {
                        log.Warn("New wallet appeared, failed to open", "url", event.Wallet.URL(), "err", err)
                    }
                case accounts.WalletOpened:
                    status, _ := event.Wallet.Status()
                    log.Info("New wallet appeared", "url", event.Wallet.URL(), "status", status)
    
                    derivationPath := accounts.DefaultBaseDerivationPath
                    if event.Wallet.URL().Scheme == "ledger" {
                        derivationPath = accounts.DefaultLedgerBaseDerivationPath
                    }
                    event.Wallet.SelfDerive(derivationPath, stateReader)
    
                case accounts.WalletDropped:
                    log.Info("Old wallet dropped", "url", event.Wallet.URL())
                    event.Wallet.Close()
                }
            }
        }()
    
        // Spawn a standalone goroutine for status synchronization monitoring,
        // close the node when synchronization is complete if user required.
        if ctx.GlobalBool(utils.ExitWhenSyncedFlag.Name) {
            go func() {
                sub := stack.EventMux().Subscribe(downloader.DoneEvent{})
                defer sub.Unsubscribe()
                for {
                    event := <-sub.Chan()
                    if event == nil {
                        continue
                    }
                    done, ok := event.Data.(downloader.DoneEvent)
                    if !ok {
                        continue
                    }
                    if timestamp := time.Unix(int64(done.Latest.Time), 0); time.Since(timestamp) < 10*time.Minute {
                        log.Info("Synchronisation completed", "latestnum", done.Latest.Number, "latesthash", done.Latest.Hash(),
                            "age", common.PrettyAge(timestamp))
                        stack.Stop()
                    }
    
                }
            }()
        }
    
        // Start auxiliary services if enabled
        if ctx.GlobalBool(utils.MiningEnabledFlag.Name) || ctx.GlobalBool(utils.DeveloperFlag.Name) {
            // Mining only makes sense if a full Ethereum node is running
            if ctx.GlobalString(utils.SyncModeFlag.Name) == "light" {
                utils.Fatalf("Light clients do not support mining")
            }
            var ethereum *eth.Ethereum
            if err := stack.Service(&ethereum); err != nil {
                utils.Fatalf("Ethereum service not running: %v", err)
            }
            // Set the gas price to the limits from the CLI and start mining
            gasprice := utils.GlobalBig(ctx, utils.MinerLegacyGasPriceFlag.Name)
            if ctx.IsSet(utils.MinerGasPriceFlag.Name) {
                gasprice = utils.GlobalBig(ctx, utils.MinerGasPriceFlag.Name)
            }
            ethereum.TxPool().SetGasPrice(gasprice)
    
            threads := ctx.GlobalInt(utils.MinerLegacyThreadsFlag.Name)
            if ctx.GlobalIsSet(utils.MinerThreadsFlag.Name) {
                threads = ctx.GlobalInt(utils.MinerThreadsFlag.Name)
            }
            if err := ethereum.StartMining(threads); err != nil {
                utils.Fatalf("Failed to start mining: %v", err)
            }
        }
    }
    

    函数首先调用了utils.StartNode(stack),追踪一下。

    
    func StartNode(stack *node.Node) {
        if err := stack.Start(); err != nil {
            Fatalf("Error starting protocol stack: %v", err)
        }
        go func() {
            sigc := make(chan os.Signal, 1)
            signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
            defer signal.Stop(sigc)
            <-sigc
            log.Info("Got interrupt, shutting down...")
            go stack.Stop()
            for i := 10; i > 0; i-- {
                <-sigc
                if i > 1 {
                    log.Warn("Already shutting down, interrupt more to panic.", "times", i-1)
                }
            }
            debug.Exit() // ensure trace and CPU profile data is flushed.
            debug.LoudPanic("boom")
        }()
    }
    

    调用stack.Start(),再追踪。

    
    // Start create a live P2P node and starts running it.
    func (n *Node) Start() error {
        n.lock.Lock()
        defer n.lock.Unlock()
    
        // Short circuit if the node's already running
        if n.server != nil {
            return ErrNodeRunning
        }
        if err := n.openDataDir(); err != nil {
            return err
        }
    
        // Initialize the p2p server. This creates the node key and
        // discovery databases.
        n.serverConfig = n.config.P2P
        n.serverConfig.PrivateKey = n.config.NodeKey()
        n.serverConfig.Name = n.config.NodeName()
        n.serverConfig.Logger = n.log
        if n.serverConfig.StaticNodes == nil {
            n.serverConfig.StaticNodes = n.config.StaticNodes()
        }
        if n.serverConfig.TrustedNodes == nil {
            n.serverConfig.TrustedNodes = n.config.TrustedNodes()
        }
        if n.serverConfig.NodeDatabase == "" {
            n.serverConfig.NodeDatabase = n.config.NodeDB()
        }
        running := &p2p.Server{Config: n.serverConfig}
        n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
    
        // Otherwise copy and specialize the P2P configuration
        services := make(map[reflect.Type]Service)
        for _, constructor := range n.serviceFuncs {
            // Create a new context for the particular service
            ctx := &ServiceContext{
                config:         n.config,
                services:       make(map[reflect.Type]Service),
                EventMux:       n.eventmux,
                AccountManager: n.accman,
            }
            for kind, s := range services { // copy needed for threaded access
                ctx.services[kind] = s
            }
            // Construct and save the service
            service, err := constructor(ctx)
            if err != nil {
                return err
            }
            kind := reflect.TypeOf(service)
            if _, exists := services[kind]; exists {
                return &DuplicateServiceError{Kind: kind}
            }
            services[kind] = service
        }
        // Gather the protocols and start the freshly assembled P2P server
        for _, service := range services {
            running.Protocols = append(running.Protocols, service.Protocols()...)
        }
        if err := running.Start(); err != nil {
            return convertFileLockError(err)
        }
        // Start each of the services
        var started []reflect.Type
        for kind, service := range services {
            // Start the next service, stopping all previous upon failure
            if err := service.Start(running); err != nil {
                for _, kind := range started {
                    services[kind].Stop()
                }
                running.Stop()
    
                return err
            }
            // Mark the service started for potential cleanup
            started = append(started, kind)
        }
        // Lastly start the configured RPC interfaces
        if err := n.startRPC(services); err != nil {
            for _, service := range services {
                service.Stop()
            }
            running.Stop()
            return err
        }
        // Finish initializing the startup
        n.services = services
        n.server = running
        n.stop = make(chan struct{})
    
        return nil
    }
    

    该方法首先打开dataDir目录,然后初始化serverConfig,包括node key,发现节点的存储database等,创建一个p2p.server类变量叫running,为service开启空间,创建需要的service,然后将所有Service的protocol收集到running中运行,调用running.Start()。然后,调用n.startRPC(services)开启RPC。
    先看running.Start()。

    
    // Start starts running the server.
    // Servers can not be re-used after stopping.
    func (srv *Server) Start() (err error) {
        srv.lock.Lock()
        defer srv.lock.Unlock()
        if srv.running {
            return errors.New("server already running")
        }
        srv.running = true
        srv.log = srv.Config.Logger
        if srv.log == nil {
            srv.log = log.New()
        }
        if srv.NoDial && srv.ListenAddr == "" {
            srv.log.Warn("P2P server will be useless, neither dialing nor listening")
        }
    
        // static fields
        if srv.PrivateKey == nil {
            return errors.New("Server.PrivateKey must be set to a non-nil key")
        }
        if srv.newTransport == nil {
            srv.newTransport = newRLPX
        }
        if srv.Dialer == nil {
            srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
        }
        srv.quit = make(chan struct{})
        srv.addpeer = make(chan *conn)
        srv.delpeer = make(chan peerDrop)
        srv.posthandshake = make(chan *conn)
        srv.addstatic = make(chan *enode.Node)
        srv.removestatic = make(chan *enode.Node)
        srv.addtrusted = make(chan *enode.Node)
        srv.removetrusted = make(chan *enode.Node)
        srv.peerOp = make(chan peerOpFunc)
        srv.peerOpDone = make(chan struct{})
    
        if err := srv.setupLocalNode(); err != nil {
            return err
        }
        if srv.ListenAddr != "" {
            if err := srv.setupListening(); err != nil {
                return err
            }
        }
        if err := srv.setupDiscovery(); err != nil {
            return err
        }
    
        dynPeers := srv.maxDialedConns()
        dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
        srv.loopWG.Add(1)
        go srv.run(dialer)
        return nil
    }
    

    Start开启p2pserver,srv.newTransport = newRLPX,加密链路使用了RLPX协议。srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}},使用TCPDial,负责在两个node之间建立连接。
    后面紧接着是srv的状态。
    setupLocalNode、setupListening、setupDiscovery,初始化本地节点、开启监听以及节点发现。

    相关文章

      网友评论

          本文标题:以太坊源码(1)——geth cli 启动流程

          本文链接:https://www.haomeiwen.com/subject/oywpwqtx.html