geth的启动之整体及p2p服务的启动

作者: 古则 | 来源:发表于2018-04-16 10:40 被阅读36次

    1.入口

    geth命令的实现在文件
    go-ethereum/cmd/geth/main.go中
    调用关系

    main=>app.run()
    

    其中app在main.go的全局变量中初始化

    //go-ethereum/cmd/geth/main.go
    app = utils.NewApp(gitCommit, "the go-ethereum command line interface")
    
    //go-ethereum/cmd/utils/flags.go
    func NewApp(gitCommit, usage string) *cli.App {
        app := cli.NewApp()
        app.Name = filepath.Base(os.Args[0])
        app.Author = ""
        //app.Authors = nil
        app.Email = ""
        app.Version = params.Version
        if len(gitCommit) >= 8 {
            app.Version += "-" + gitCommit[:8]
        }
        app.Usage = usage
        return app
    }
    

    cli的实现采用的是
    https://godoc.org/gopkg.in/urfave/cli.v1该库的用法待学习

    2.init()

    
    func init() {
        // Initialize the CLI app and start Geth
        app.Action = geth
        app.HideVersion = true // we have a command to print the version
        app.Copyright = "Copyright 2013-2017 The go-ethereum Authors"
        app.Commands = []cli.Command{
            // See chaincmd.go:
            initCommand,
            importCommand,
            exportCommand,
            copydbCommand,
            removedbCommand,
            dumpCommand,
            // See monitorcmd.go:
            monitorCommand,
            // See accountcmd.go:
            accountCommand,
            walletCommand,
            // See consolecmd.go:
            consoleCommand,
            attachCommand,
            javascriptCommand,
            // See misccmd.go:
            makecacheCommand,
            makedagCommand,
            versionCommand,
            bugCommand,
            licenseCommand,
            // See config.go
            dumpConfigCommand,
        }
        sort.Sort(cli.CommandsByName(app.Commands))
    
        app.Flags = append(app.Flags, nodeFlags...)
        app.Flags = append(app.Flags, rpcFlags...)
        app.Flags = append(app.Flags, consoleFlags...)
        app.Flags = append(app.Flags, debug.Flags...)
        app.Flags = append(app.Flags, whisperFlags...)
    
        app.Before = func(ctx *cli.Context) error {
            runtime.GOMAXPROCS(runtime.NumCPU())
            if err := debug.Setup(ctx); err != nil {
                return err
            }
            // Start system runtime metrics collection
            go metrics.CollectProcessMetrics(3 * time.Second)
    
            utils.SetupNetwork(ctx)
            return nil
        }
    
        app.After = func(ctx *cli.Context) error {
            debug.Exit()
            console.Stdin.Close() // Resets terminal mode.
            return nil
        }
    }
    

    3.geth(ctx *cli.Context)

    // geth is the main entry point into the system if no special subcommand is ran.
    // It creates a default node based on the command line arguments and runs it in
    // blocking mode, waiting for it to be shut down.
    func geth(ctx *cli.Context) error {
        node := makeFullNode(ctx)
        startNode(ctx, node)
        node.Wait()
        return nil
    }
    
    • makeFullNode 生成(node/node.go) node节点,并注册服务,见4-6
    • startNode(node/node.go所有服务的启动),启动各种服务,如p2p,rpc,http,数据同步等等见7

    4.makeFullNode(ctx *cli.Context) *node.Node

    //cmd/geth/config.go:156
    func makeFullNode(ctx *cli.Context) *node.Node {
       stack, cfg := makeConfigNode(ctx)
          //注册服务
       utils.RegisterEthService(stack, &cfg.Eth)
    
       if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
           utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
       }
       // Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
       shhEnabled := enableWhisper(ctx)
       shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name)
       if shhEnabled || shhAutoEnabled {
           if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
               cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
           }
           if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
               cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
           }
           utils.RegisterShhService(stack, &cfg.Shh)
       }
    
       // Add the Ethereum Stats daemon if requested.
       if cfg.Ethstats.URL != "" {
           utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
       }
       return stack
    }
    

    5.RegisterEthService(stack *node.Node, cfg *eth.Config)

    ./cmd/utils/flags.go:1132
    // RegisterEthService adds an Ethereum client to the stack.
    func RegisterEthService(stack *node.Node, cfg *eth.Config) {
        var err error
        if cfg.SyncMode == downloader.LightSync {
            err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
                return les.New(ctx, cfg)
            })
        } else {
            err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
                fullNode, err := eth.New(ctx, cfg)
                if fullNode != nil && cfg.LightServ > 0 {
                    ls, _ := les.NewLesServer(fullNode, cfg)
                    fullNode.AddLesServer(ls)
                }
                return fullNode, err
            })
        }
        if err != nil {
            Fatalf("Failed to register the Ethereum service: %v", err)
        }
    }
    

    6. eth.new

    //eth/backend.go:104
    // New creates a new Ethereum object (including the
    // initialisation of the common Ethereum object)
    func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
        if config.SyncMode == downloader.LightSync {
            return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
        }
        if !config.SyncMode.IsValid() {
            return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
        }
        chainDb, err := CreateDB(ctx, config, "chaindata")
        if err != nil {
            return nil, err
        }
        stopDbUpgrade := upgradeDeduplicateData(chainDb)
        chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
        if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
            return nil, genesisErr
        }
        log.Info("Initialised chain configuration", "config", chainConfig)
    
        eth := &Ethereum{
            config:         config,
            chainDb:        chainDb,
            chainConfig:    chainConfig,
            eventMux:       ctx.EventMux,
            accountManager: ctx.AccountManager,
            engine:         CreateConsensusEngine(ctx, &config.Ethash, chainConfig, chainDb),
            shutdownChan:   make(chan bool),
            stopDbUpgrade:  stopDbUpgrade,
            networkId:      config.NetworkId,
            gasPrice:       config.GasPrice,
            etherbase:      config.Etherbase,
            bloomRequests:  make(chan chan *bloombits.Retrieval),
            bloomIndexer:   NewBloomIndexer(chainDb, params.BloomBitsBlocks),
        }
    
        log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
    
        if !config.SkipBcVersionCheck {
            bcVersion := core.GetBlockChainVersion(chainDb)
            if bcVersion != core.BlockChainVersion && bcVersion != 0 {
                return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, core.BlockChainVersion)
            }
            core.WriteBlockChainVersion(chainDb, core.BlockChainVersion)
        }
        var (
            vmConfig    = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
            cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout}
        )
        eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig)
        if err != nil {
            return nil, err
        }
        // Rewind the chain in case of an incompatible config upgrade.
        if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
            log.Warn("Rewinding chain to upgrade configuration", "err", compat)
            eth.blockchain.SetHead(compat.RewindTo)
            core.WriteChainConfig(chainDb, genesisHash, chainConfig)
        }
        eth.bloomIndexer.Start(eth.blockchain)
    
        if config.TxPool.Journal != "" {
            config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
        }
        eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)
    
        if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
            return nil, err
        }
        eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
        eth.miner.SetExtra(makeExtraData(config.ExtraData))
    
        eth.ApiBackend = &EthApiBackend{eth, nil}
        gpoParams := config.GPO
        if gpoParams.Default == nil {
            gpoParams.Default = config.GasPrice
        }
        eth.ApiBackend.gpo = gasprice.NewOracle(eth.ApiBackend, gpoParams)
    
        return eth, nil
    }
    

    7.node及startnode

    node中包含了rpc,ipc,http,ws,p2p等各种服务及在5中所注册的服务

    //node/node.go
    // Node is a container on which services can be registered.
    type Node struct {
        eventmux *event.TypeMux // Event multiplexer used between the services of a stack
        config   *Config
        accman   *accounts.Manager
    
        ephemeralKeystore string         // if non-empty, the key directory that will be removed by Stop
        instanceDirLock   flock.Releaser // prevents concurrent use of instance directory
    
        serverConfig p2p.Config
        server       *p2p.Server // Currently running P2P networking layer
            //注册的服务,注册在func (n *Node) Register(constructor ServiceConstructor) error 中完成
        serviceFuncs []ServiceConstructor     // Service constructors (in dependency order)
        services     map[reflect.Type]Service // Currently running services
    
        rpcAPIs       []rpc.API   // List of APIs currently provided by the node
        inprocHandler *rpc.Server // In-process RPC request handler to process the API requests
    
        ipcEndpoint string       // IPC endpoint to listen at (empty = IPC disabled)
        ipcListener net.Listener // IPC RPC listener socket to serve API requests
        ipcHandler  *rpc.Server  // IPC RPC request handler to process the API requests
    
        httpEndpoint  string       // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled)
        httpWhitelist []string     // HTTP RPC modules to allow through this endpoint
        httpListener  net.Listener // HTTP RPC listener socket to server API requests
        httpHandler   *rpc.Server  // HTTP RPC request handler to process the API requests
    
        wsEndpoint string       // Websocket endpoint (interface + port) to listen at (empty = websocket disabled)
        wsListener net.Listener // Websocket RPC listener socket to server API requests
        wsHandler  *rpc.Server  // Websocket RPC request handler to process the API requests
    
        stop chan struct{} // Channel to wait for termination notifications
        lock sync.RWMutex
    
        log log.Logger
    }
    

    startNode如下

    // Start create a live P2P node and starts running it.
    func (n *Node) Start() error {
        n.lock.Lock()
        defer n.lock.Unlock()
    
        // Short circuit if the node's already running
        if n.server != nil {
            return ErrNodeRunning
        }
        if err := n.openDataDir(); err != nil {
            return err
        }
    
        // Initialize the p2p server. This creates the node key and
        // discovery databases.
        n.serverConfig = n.config.P2P
        n.serverConfig.PrivateKey = n.config.NodeKey()
        n.serverConfig.Name = n.config.NodeName()
        n.serverConfig.Logger = n.log
        if n.serverConfig.StaticNodes == nil {
            n.serverConfig.StaticNodes = n.config.StaticNodes()
        }
        if n.serverConfig.TrustedNodes == nil {
            n.serverConfig.TrustedNodes = n.config.TrustedNodes()
        }
        if n.serverConfig.NodeDatabase == "" {
            n.serverConfig.NodeDatabase = n.config.NodeDB()
        }
        running := &p2p.Server{Config: n.serverConfig}
        n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
    
        // Otherwise copy and specialize the P2P configuration
        services := make(map[reflect.Type]Service)
        for _, constructor := range n.serviceFuncs {
            // Create a new context for the particular service
            ctx := &ServiceContext{
                config:         n.config,
                services:       make(map[reflect.Type]Service),
                EventMux:       n.eventmux,
                AccountManager: n.accman,
            }
            for kind, s := range services { // copy needed for threaded access
                ctx.services[kind] = s
            }
            // Construct and save the service
            service, err := constructor(ctx)
            if err != nil {
                return err
            }
            kind := reflect.TypeOf(service)
            if _, exists := services[kind]; exists {
                return &DuplicateServiceError{Kind: kind}
            }
            services[kind] = service
        }
        // Gather the protocols and start the freshly assembled P2P server
        for _, service := range services {
            running.Protocols = append(running.Protocols, service.Protocols()...)
        }
        if err := running.Start(); err != nil {
            return convertFileLockError(err)
        }
        // Start each of the services
        started := []reflect.Type{}
        for kind, service := range services {
            // Start the next service, stopping all previous upon failure
            if err := service.Start(running); err != nil {
                for _, kind := range started {
                    services[kind].Stop()
                }
                running.Stop()
    
                return err
            }
            // Mark the service started for potential cleanup
            started = append(started, kind)
        }
        // Lastly start the configured RPC interfaces
        if err := n.startRPC(services); err != nil {
            for _, service := range services {
                service.Stop()
            }
            running.Stop()
            return err
        }
        // Finish initializing the startup
        n.services = services
        n.server = running
        n.stop = make(chan struct{})
    
        return nil
    }
    

    相关文章

      网友评论

        本文标题:geth的启动之整体及p2p服务的启动

        本文链接:https://www.haomeiwen.com/subject/lpfhkftx.html