美文网首页Docker容器
docker源码2-卷服务的初始化

docker源码2-卷服务的初始化

作者: wangwDavid | 来源:发表于2018-12-23 11:42 被阅读0次

    谢绝转载

    序言

    上篇我们粗略过了一下docker 命令的调用流程,可知命令的最后运行部分是在服务端的daemon中完成.这一篇主要看一下docker卷服务的初始化和相关卷plugin的加载.

    这部分代码我们需要带着两个疑问去看,第一就是plugin是怎么被加载的,第二就是创建卷的时候怎么与我们自己实现的plugin关联起来的.

    注意事项:
    1.本文共有四篇,每篇都有编号,编号类似1.2.1这种,其中1是文章编号,因为后面的调用关系需要去前面篇幅中找,所以我标注了这个方便寻找.

    2.我是按调用过程列出代码,如果当前函数有多个地方需要讲解,比如函数1.2中有两个地方需要讲解,那么要展开的地方便是1.2.1,1.2.2这样排列.

    3.链接:
    第一篇:https://www.jianshu.com/p/9900ec52f2c1 (命令的调用流程)
    第二篇:https://www.jianshu.com/p/db08b7d57721 (卷服务初始化)
    第三篇:https://www.jianshu.com/p/bbc73f5687a2 (plugin的管理)
    第四篇:https://www.jianshu.com/p/a92b1b11c8dd (卷相关命令的执行)

    Daemon中关于volume服务的初始化

    main函数是万物之源,我们继续从服务端的main函数入手,找到相关的代码(不知道怎么找的朋友请看上篇).我们要找的代码在NewDaemon函数中:

    2.1 NewDaemon函数

    NewDaemon是docker进程的初始化函数,找到NewDaemon中对卷服务的初始化代码:

    path function name line number
    components/engine/daemon/daemon.go NewDaemon 635
    func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.Store) (daemon *Daemon, err error) {
        setDefaultMtu(config)
    
        ...
    
        # 创建deamon
        d := &Daemon{
            configStore: config,
            PluginStore: pluginStore,
            startupDone: make(chan struct{}),
        }
        ...
        # d.pluginManager的初始化中加载了/var/lib/docker/plugins下的所有plugin
        # 此处为重点
        // Plugin system initialization should happen before restore. Do not change order.
        d.pluginManager, err = plugin.NewManager(plugin.ManagerConfig{
            Root:               filepath.Join(config.Root, "plugins"),
            ExecRoot:           getPluginExecRoot(config.Root),
            Store:              d.PluginStore,
            CreateExecutor:     createPluginExec,
            RegistryService:    registryService,
            LiveRestoreEnabled: config.LiveRestoreEnabled,
            LogPluginEvent:     d.LogPluginEvent, // todo: make private
            AuthzMiddleware:    config.AuthzMiddleware,
        })
    
        ...
        # d.volumes 具体执行对卷的管理
        # 此处为重点
        d.volumes, err = volumesservice.NewVolumeService(config.Root, d.PluginStore, rootIDs, d)
        if err != nil {
            return nil, err
        }
    
        ...
    }
    

    在上面NewDaemon函数中标注出来两个重要的环节,首先是pluginManager的初始化,其次是d.volumes的初始化.这块涉及到的接口比较多,我们先根据调用流程过一遍,最后再来总结.

    2.2 plugin的加载过程

    2.2.1 Store 结构体

    这里有一个很重要的参数pluginStore, 在执行NewDaemon的时候传入的pluginStore是一个空的Store结构体,主要作用就是保存plugin:

    path struct name line number
    components/engine/plugin/defs.go Store 12
    // Store manages the plugin inventory in memory and on-disk
    type Store struct {
        sync.RWMutex
        plugins  map[string]*v2.Plugin
        specOpts map[string][]SpecOpt
        /* handlers are necessary for transition path of legacy plugins
         * to the new model. Legacy plugins use Handle() for registering an
         * activation callback.*/
        handlers map[string][]func(string, *plugins.Client)
    }
    

    2.2.2 Manager结构体

    先看d.pluginManager的定义,它是一个结构体,定义如下,用来管理plugin:

    path struct name line number
    components/engine/plugin/manager.go Manager 70
    // Manager controls the plugin subsystem.
    type Manager struct {
        config    ManagerConfig
        mu        sync.RWMutex // protects cMap
        muGC      sync.RWMutex // protects blobstore deletions
        cMap      map[*v2.Plugin]*controller
        blobStore *basicBlobStore
        publisher *pubsub.Publisher
        executor  Executor
    }
    

    2.2.3 NewManager函数

    再来看Manager的新建函数:

    path function name line number
    components/engine/plugin/manager.go NewManager 102
    // NewManager returns a new plugin manager.
    func NewManager(config ManagerConfig) (*Manager, error) {
        ...
        manager := &Manager{
            config: config,
        }
        ...
        # 创建manager.executor
        # 此处为重点
        manager.executor, err = config.CreateExecutor(manager)
        if err != nil {
            return nil, err
        }
        ...
        manager.cMap = make(map[*v2.Plugin]*controller)
        # 此处为重点
        if err := manager.reload(); err != nil {
            return nil, errors.Wrap(err, "failed to restore plugins")
        }
    
        manager.publisher = pubsub.NewPublisher(0, 0)
        return manager, nil
    }
    

    首先,传入的唯一参数config中绑定了d.PluginStore,对应的是config.Store(这部分代码代码赋值在NewDaemon中完成,参考2.1章节的代码).这里重点讲一下manager.executor和manager.reload().

    2.2.4 初始化manager.executor

    • 2.2.4.1 Executor接口

    在上面的函数中,manager.executor是实现了Executor接口的结构体,Executor的主要作用是用来启动和停止plugin(创建对应的容器,此容器不同于正常的docker容器,只能用docker-runc命令查看):

    path interface name line number
    components/engine/plugin/manager.go Executor 38
    // Executor is the interface that the plugin manager uses to interact with for starting/stopping plugins
    type Executor interface {
        Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error
        IsRunning(id string) (bool, error)
        Restore(id string, stdout, stderr io.WriteCloser) (alive bool, err error)
        Signal(id string, signal int) error
    }
    

    而config.CreateExecutor在创建的时候传入值是以下匿名函数:

    • 2.2.4.2 createPluginExec对应的匿名函数
    path function name line number
    components/engine/daemon/daemon.go 811
    createPluginExec := func(m *plugin.Manager) (plugin.Executor, error) {
            var pluginCli *containerd.Client
    
            // Windows is not currently using containerd, keep the
            // client as nil
            if config.ContainerdAddr != "" {
                pluginCli, err = containerd.New(config.ContainerdAddr, containerd.WithDefaultNamespace(pluginexec.PluginNamespace), containerd.WithDialOpts(gopts))
                if err != nil {
                    return nil, errors.Wrapf(err, "failed to dial %q", config.ContainerdAddr)
                }
            }
            # getPluginExecRoot(config.Root) = "/var/lib/docker/plugin"
            return pluginexec.New(ctx, getPluginExecRoot(config.Root), pluginCli, m)
        }
    

    该函数调用pluginexec.New函数,返回一个Executor结构体,如下:

    • 2.2.4.3 New函数
    path function name line number
    components/engine/plugin/executor/containerd/containerd.go New 42
    func New(ctx context.Context, rootDir string, cli *containerd.Client, exitHandler ExitHandler) (*Executor, error) {
        e := &Executor{
            rootDir:     rootDir,
            exitHandler: exitHandler,
        }
    
        client, err := libcontainerd.NewClient(ctx, cli, rootDir, PluginNamespace, e)
        if err != nil {
            return nil, errors.Wrap(err, "error creating containerd exec client")
        }
        e.client = client
        return e, nil
    }
    

    这里创建的结构体Executor实现了上面的Executor接口(定义位置不同),然后初始化libcontainerd的客户端,这里就不深入看了,其实我也不懂太底层的:-).

    • 2.2.4.4 Executor结构体
    path struct name line number
    components/engine/plugin/executor/containerd/containerd.go Executor 57
    // Executor is the containerd client implementation of a plugin executor
    type Executor struct {
        rootDir     string
        client      Client
        exitHandler ExitHandler
    }
    

    看完manager.executor,让我们回到NewManager[2.2.3章节],继续执行manager.reload()方法

    2.2.5 manager.reload()加载plugin

    • 2.2.5.1 reload方法
    path struct name line number
    components/engine/plugin/manager.go reload 182
    func (pm *Manager) reload() error { // todo: restore
        # 从/var/lib/docker/plugins 路径下加载plugin
        dir, err := ioutil.ReadDir(pm.config.Root)
        ...
        plugins := make(map[string]*v2.Plugin)
        for _, v := range dir {
            if validFullID.MatchString(v.Name()) {
                # 从对应的plugin路径的json文件下加载plugin
                p, err := pm.loadPlugin(v.Name())
                if err != nil {
                    handleLoadError(err, v.Name())
                    continue
                }
                plugins[p.GetID()] = p
            }
        ...
        }
    
        pm.config.Store.SetAll(plugins)
        ...
        return nil
    }
    

    加载plugin调用的是pm.loadPlugin方法,这里不仔细看这个方法了,但是v2.Plugin这个结构体比较重要,要看一下:

    • 2.2.5.2 Plugin结构体
    path struct name line number
    components/engine/plugin/v2/plugin.go Plugin 19
    // Plugin represents an individual plugin.
    type Plugin struct {
        mu        sync.RWMutex
        PluginObj types.Plugin `json:"plugin"` // todo: embed struct
        pClient   *plugins.Client
        refCount  int
        Rootfs    string // TODO: make private
    
        Config   digest.Digest
        Blobsums []digest.Digest
    
        modifyRuntimeSpec func(*specs.Spec)
    
        SwarmServiceID string
        timeout        time.Duration
        addr           net.Addr
    }
    

    其中plugins.Client定义如下,是plugin的http客户端:

    • 2.2.5.3 Client结构体
    path struct name line number
    components/engine/pkg/plugins/client.go Client 82
    // Client represents a plugin client.
    type Client struct {
        http           *http.Client // http client to use
        requestFactory transport.RequestFactory
    }
    

    加载plugins后我们看到调用了SetAll方法,看一下SetAll的定义:

    • 2.2.5.4 SetAll方法
    path struct name line number
    components/engine/plugin/store.go SetAll 65
    // SetAll initialized plugins during daemon restore.
    func (ps *Store) SetAll(plugins map[string]*v2.Plugin) {
        ps.Lock()
        defer ps.Unlock()
    
        for _, p := range plugins {
            ps.setSpecOpts(p)
        }
        ps.plugins = plugins
    }
    

    上面的方法最后把plugins保存到config中的Store中,也就是之前绑定的d.PluginStore, 也就是说d.PluginStore.plugins = plugins

    2.3 卷服务的创建

    回到NewDaemon函数[2.1章节],继续看卷服务的创建,volumesservice也就是d.volumes是卷的相关方法的实际执行者,我们先看下它的初始化过程.

    d.volumes, err = volumesservice.NewVolumeService(config.Root, d.PluginStore, rootIDs, d)
    

    2.3.1 VolumesService结构体

    先看VolumesService定义:

    path struct name line number
    components/engine/volume/service/service.go VolumesService 30
    // VolumesService manages access to volumes
    type VolumesService struct {
        vs           *VolumeStore
        ds           ds
        pruneRunning int32
        eventLogger  volumeEventLogger
    }
    

    2.3.2 NewVolumeService方法

    NewVolumeService方法, 实参是上面的d.PluginStore, 虚参是一个PluginGetter接口.注意在NewVolumeService函数中d.PluginStore变成了pg即plugingetter.PluginGetter.

    path func name line number
    components/engine/volume/service/service.go NewVolumeService 38
    // NewVolumeService creates a new volume service
    func NewVolumeService(root string, pg plugingetter.PluginGetter, rootIDs idtools.Identity, logger volumeEventLogger) (*VolumesService, error) {
        # ds是一个接口,负责driver的存储
        ds := drivers.NewStore(pg)
        if err := setupDefaultDriver(ds, root, rootIDs); err != nil {
            return nil, err
        }
    
        # vs 用来存储卷的信息
        vs, err := NewStore(root, ds)
        if err != nil {
            return nil, err
        }
        return &VolumesService{vs: vs, ds: ds, eventLogger: logger}, nil
    }
    

    首先看一下PluginGetter接口

    • 2.3.2.1 PluginGetter接口
    path interface name line number
    components/engine/pkg/plugingetter/getter.go PluginGetter 47
    // PluginGetter is the interface implemented by Store
    type PluginGetter interface {
        Get(name, capability string, mode int) (CompatPlugin, error)
        GetAllByCap(capability string) ([]CompatPlugin, error)
        GetAllManagedPluginsByCap(capability string) []CompatPlugin
        Handle(capability string, callback func(string, *plugins.Client))
    }
    
    • 2.3.2.2 ds接口

    再看ds, ds接口只定义了一个方法GetDriverList

    path interface name line number
    components/engine/volume/service/service.go ds 21
    type ds interface {
        GetDriverList() []string
    }
    

    实现这个接口的是Store结构体,此Store非彼Store,不同于之前的保存Plugin的那个Store,这个store用来保存driver,一定要注意:

    • 2.3.2.3 Store结构体
    path struct name line number
    components/engine/volume/drivers/extpoint.go Store 45
    // Store is an in-memory store for volume drivers
    type Store struct {
        extensions   map[string]volume.Driver
        mu           sync.Mutex
        driverLock   *locker.Locker
        pluginGetter getter.PluginGetter
    }
    

    drivers.NewStore(pg)执行后d.pluginStore被绑定到ds的pluginGetter上,也就是ds.pluginGetter = d.pluginStore.而vs是一个管理卷的结构体,定义如下:

    • 2.3.2.4 VolumeStore结构体
    path struct name line number
    components/engine/volume/service/store.go VolumeStore 186
    // VolumeStore is a struct that stores the list of volumes available and keeps track of their usage counts
    type VolumeStore struct {
        // locks ensures that only one action is being performed on a particular volume at a time without locking the entire store
        // since actions on volumes can be quite slow, this ensures the store is free to handle requests for other volumes.
        locks   *locker.Locker
        drivers *drivers.Store
        // globalLock is used to protect access to mutable structures used by the store object
        globalLock sync.RWMutex
        // names stores the volume name -> volume relationship.
        // This is used for making lookups faster so we don't have to probe all drivers
        names map[string]volume.Volume
        // refs stores the volume name and the list of things referencing it
        refs map[string]map[string]struct{}
        // labels stores volume labels for each volume
        labels map[string]map[string]string
        // options stores volume options for each volume
        options map[string]map[string]string
        db      *bolt.DB
    }
    

    vs的drivers值为ds, vs.drivers.pluginGetter = d.pluginStore

    总结

    这部分代码就是知道在Daemon的初始化中如何加载plugin并初始化相关卷服务,在后续的代码中会调用这篇讲到的内容.

    相关文章

      网友评论

        本文标题:docker源码2-卷服务的初始化

        本文链接:https://www.haomeiwen.com/subject/jdagkqtx.html