美文网首页
Containerd Content 服务

Containerd Content 服务

作者: Xiao_Yang | 来源:发表于2019-10-07 11:11 被阅读0次

    概述

    本文主要针对 Containerd content 服务功能模块的相关代码分析,如下图 Containerd 官方架构图所示:

    containerd-content.png

    如同 Containerd 其它服务功能模块化机制,本文将从 Content 服务相关接口定义、GRPC 和 Service 插件化注册、Plugin 加载相关过程、最后到服务功能的底层实现逻辑进行逐步分析 。

    Content Store接口与方法定义

    Store 接口继承了各 content 内容管理相关的接口集合,后面每个接口将都有详细说明

    !FILENAME content/content.go:136

    type Store interface {
        Manager         // 信息查找、删除管理接口 
        Provider        // 读取接口 
      IngestManager   // 写管理接口(写状态、终止)
        Ingester        // 存写接口
    }
    

    Manager 提供了基础的 content 内容管理方法如内容元信息获取、更新、列表查找、删除

    !FILENAME content/content.go:75

    type Manager interface {
      // 返回内容存储数据库存放 content 元数据信息
        Info(ctx context.Context, dgst digest.Digest) (Info, error)
    
      // 更新 content 相关的可变信息项,如 labels.* 标签项更新
        Update(ctx context.Context, info Info, fieldpaths ...string) (Info, error)
    
      // 遍历内容存储数据库的所有项进行查找匹配指定的过滤条件项
        Walk(ctx context.Context, fn WalkFunc, filters ...string) error
    
        // 从内容存储数据库移除指定的 content
        Delete(ctx context.Context, dgst digest.Digest) error
    }
    

    Provider 提供了 content 的读取接口,返回一个内容读取器对象 ReaderAt

    !FILENAME content/content.go:35

    type Provider interface {
        // ocispec.Descriptor 描述符唯一需要指定 desc.Digest 内容的摘要散列值 
        ReaderAt(ctx context.Context, dec ocispec.Descriptor) (ReaderAt, error)
    }
    
    // 使用标准的 io 接口 io.Closer 、io.ReaderAt,扩展大小计算报告
    type ReaderAt interface {
        io.ReaderAt
        io.Closer
        Size() int64
    }
    

    IngestManager 写管理接口(存写状态获取、中止操作)

    !FILENAME content/content.go:98

    type IngestManager interface {
        // 查看指定 ref 引用 Ingest 操作的状态信息
        Status(ctx context.Context, ref string) (Status, error)
    
      // 列出所有活动的写操作与状态信息,可通过 filters 提供的正则表达式来过滤列出项 
        ListStatuses(ctx context.Context, filters ...string) ([]Status, error)
    
        // 取消操作
        Abort(ctx context.Context, ref string) error
    }
    

    Ingester 提供了 content 的存写接口,返回一个内容写入器对象 Writer

    !FILENAME content/content.go:44

    type Ingester interface {
        //Writer opts 需带指定 ref 来唯一标识活动
        Writer(ctx context.Context, opts ...WriterOpt) (Writer, error)
    }
    

    Content GRPC 注册与 Server 实现

    content GRPC 插件注册,插件 InitFn 最后 contentserver.New(cs.(content.Store)) 返回 api.ContentServer,而GRPC 所依赖的服务插件 "content-service" 实例化对象作为其唯一传参,类型则是前面所详述的 content.Store 接口。

    !FILENAME services/content/service.go:27

    func init() {
        plugin.Register(&plugin.Registration{
            Type: plugin.GRPCPlugin,
            ID:   "content",
            Requires: []plugin.Type{
                plugin.ServicePlugin,
            },
            InitFn: func(ic *plugin.InitContext) (interface{}, error) {
                plugins, err := ic.GetByType(plugin.ServicePlugin)  //获取所有服务插件
                if err != nil {
                    return nil, err
                }
                p, ok := plugins[services.ContentService]  // Key 为 "content-service" 服务插件
                if !ok {
                    return nil, errors.New("content store service not found")
                }
                cs, err := p.Instance()     // "content-service" 插件实例化对象
                if err != nil {
                    return nil, err
                }
          // 传参 cs.(content.Store) 
                return contentserver.New(cs.(content.Store)), nil
            },
        })
    }
    

    ContentServer is the server API for Content service.

    !FILENAME api/services/content/v1/content.pb.go:1230

    type ContentServer interface {
        Info(context.Context, *InfoRequest) (*InfoResponse, error)
        Update(context.Context, *UpdateRequest) (*UpdateResponse, error)
        List(*ListContentRequest, Content_ListServer) error
        Delete(context.Context, *DeleteContentRequest) (*types.Empty, error)
        Read(*ReadContentRequest, Content_ReadServer) error
        Status(context.Context, *StatusRequest) (*StatusResponse, error)
        ListStatuses(context.Context, *ListStatusesRequest) (*ListStatusesResponse, error)
        Write(Content_WriteServer) error
        Abort(context.Context, *AbortRequest) (*types.Empty, error)
    }
    

    New returns the content GRPC server

    !FILENAME services/content/contentserver/contentserver.go:50

    func New(cs content.Store) api.ContentServer {
        return &service{store: cs}   // service
    }
    

    !FILENAME services/content/contentserver/contentserver.go:38

    type service struct {
        store content.Store          // content.Store 接口类型
    }
    

    上层Content Server 包装的 service 类实现了 api.ContentServer 接口,其主要功能是底层所注册的 "content-service" 插件的服务方法,如下读取 Read() 实现方法逻辑则主要调用了底层的 store.ReaderAt() 来实现 content 读取。其它剩余的方法(Info、Update、List、Delete、Status、ListStatuses、WriteAbort、Abort)实现也类似将不再一一展开。

    !FILENAME services/content/contentserver/contentserver.go:144

    func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServer) error {
        if err := req.Digest.Validate(); err != nil {
            return status.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err)
        }
    
      // 调用底层服务方法 s.store.Info()
        oi, err := s.store.Info(session.Context(), req.Digest)
        if err != nil {
            return errdefs.ToGRPC(err)
        }
    
      // 调用底层服务方法 s.store.ReaderAt() 
        ra, err := s.store.ReaderAt(session.Context(), ocispec.Descriptor{Digest: req.Digest})
        if err != nil {
            return errdefs.ToGRPC(err)
        }
      //...
    
        return errdefs.ToGRPC(err)
    }
    

    Content service 服务注册与实现

    注册

    !FILENAME services/content/store.go:37

    func init() {
        plugin.Register(&plugin.Registration{
            Type: plugin.ServicePlugin,            // 服务插件类型
            ID:   services.ContentService,         // ID 为"content-service"
            Requires: []plugin.Type{
                plugin.MetadataPlugin,               // 依赖元数据插件
            },
            InitFn: func(ic *plugin.InitContext) (interface{}, error) {
                m, err := ic.Get(plugin.MetadataPlugin)  //获取元数据库注册的插件初始化对象
                if err != nil {
                    return nil, err
                }
          // +创建 content.Store 实例对象,其输入的参数为重点关注 
          // +m.(metadata.DB).ContentStore() 为元数据库指定的内容存储对象(后面详述)
                s, err := newContentStore(m.(*metadata.DB).ContentStore(), ic.Events)
                return s, err
            },
        })
    }
    

    !FILENAME services/content/store.go:56

    func newContentStore(cs content.Store, publisher events.Publisher) (content.Store, error) {
        return &store{
            Store:     cs,              // 内容存储对象
            publisher: publisher,
        }, nil
    }
    

    store 类结构定义,实际上包装了 content.Store 增加事件的推送

    !FILENAME services/content/store.go:32

    type store struct {
        content.Store
        publisher events.Publisher
    }
    

    元数据库及内容存储库

    MetadataPlugin 元数据库在 containerd server 创建过程中对所有插件进行加载时被指定,同时指定了内容存储和snapshotter 实现类对象

    !FILENAME services/server/server.go:304

    func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]*plugin.Registration, error) {
        // load all plugins into containerd
        plugin.Register(
        //...
      plugin.Register(&plugin.Registration{
            Type: plugin.ContentPlugin,             // 内容插件类型
            ID:   "content",
            InitFn: func(ic *plugin.InitContext) (interface{}, error) {
                ic.Meta.Exports["root"] = ic.Root
                return local.NewStore(ic.Root)       // 插件初始化func,创建与返回内容存储实例化对象
            },
        })
        //...
        plugin.Register(&plugin.Registration{
            Type: plugin.MetadataPlugin,
            ID:   "bolt",
            Requires: []plugin.Type{
                plugin.ContentPlugin,
                plugin.SnapshotPlugin,
            },
            Config: &srvconfig.BoltConfig{
                ContentSharingPolicy: srvconfig.SharingPolicyShared,
            },
            InitFn: func(ic *plugin.InitContext) (interface{}, error) {
          //...
          
                path := filepath.Join(ic.Root, "meta.db")
                ic.Meta.Exports["path"] = path
    
          // 创建bolt DB,文件名为 "meta.db"
                db, err := bolt.Open(path, 0644, nil)
                if err != nil {
                    return nil, err
                }
    
                var dbopts []metadata.DBOpt
                if !shared {
                    dbopts = append(dbopts, metadata.WithPolicyIsolated)
                }
          // 创建元数据库对象,关注三个关键的输入参数:
          // db为boltdb对象
          // cs为内容存储对象
          // snapshotters为快照管理器对象
                mdb := metadata.NewDB(db, cs.(content.Store), snapshotters, dbopts...)
                if err := mdb.Init(ic.Context); err != nil {
                    return nil, err
                }
                return mdb, nil
            },
        })
        
    //...
    }
    

    NewStore 本地内容存储构建,实际调用 NewLabeledStore ,返回 content.Store 接口实现类对象 store{root, ls}

    !FILENAME content/local/store.go:74

    // NewStore returns a local content store
    func NewStore(root string) (content.Store, error) {
        return NewLabeledStore(root, nil)
    }
    
    func NewLabeledStore(root string, ls LabelStore) (content.Store, error) {
        if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil {
            return nil, err
        }
    
        return &store{
            root: root,
            ls:   ls,
        }, nil
    }
    

    下面我们将重点分析 content store 的实现类方法逻辑,本文主要分析 Writer 和 ReaderAt 两个方法,其它的方法可以查看源码。

    store.Writer() 返回一个配置好的内容 writer 对象可供内容数据的写入

    // `ref` 参数用于为写事务生命周期管理的唯一标识,必须指定 ref
    func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
        var wOpts content.WriterOpts
      // 加载 writer 配置选项
        for _, opt := range opts {
            if err := opt(&wOpts); err != nil {
                return nil, err
            }
        }
    
      // ref 配置选项检查不能为空
        if wOpts.Ref == "" {
            return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
        }
      
      //...
    
      // +实标调用 writer()方法(下面详述)
        w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
        if err != nil {
            unlock(wOpts.Ref)
            return nil, err
        }
    
        return w, nil // lock is now held by w.
    }
    

    !FILENAME content/local/store.go:511

    func (s *store) writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
    
        if expected != "" {
        // 通过摘要散列值生成 blob 文件对象路径
            p := s.blobPath(expected)
            if _, err := os.Stat(p); err == nil {
                return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
            }
        }
      
      // 基于 ref 定义生成数据处理的路径,返回三个路径:
      // path 为整个 ingest 的目录路径 $root/ingest/$digest(ref)/
      // refp  以 ref 为名文件路径 $root/ingest/$digest(ref)/ref
      // data 数据文件路径 $root/ingest/$digest(ref)/data
      
        path, refp, data := s.ingestPaths(ref)
    
        var (
            digester  = digest.Canonical.Digester()
            offset    int64
            startedAt time.Time
            updatedAt time.Time
        )
    
        foundValidIngest := false
      // 确保 ingest 目录被创建
        if err := os.Mkdir(path, 0755); err != nil {
            if !os.IsExist(err) {
                return nil, err
            }
        // 获取原 ref 及数据状态信息
            status, err := s.resumeStatus(ref, total, digester)
            if err == nil {
                foundValidIngest = true
                updatedAt = status.UpdatedAt
                startedAt = status.StartedAt
                total = status.Total
                offset = status.Offset
            } else {   
                logrus.Infof("failed to resume the status from path %s: %s. will recreate them", path, err.Error())
            }
        }
    
      // 如果不存在则创建相关文件
        if !foundValidIngest {
            startedAt = time.Now()
            updatedAt = startedAt
    
            // ref 文件写入内容为 ref 指定的字符串信息
            if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil {
                return nil, err
            }
    
        // 开始时间
            if err := writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil {
                return nil, err
            }
        // 更新时间
            if err := writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil {
                return nil, err
            }
    
        // 大小
            if total > 0 {
                if err := ioutil.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil {
                    return nil, err
                }
            }
        }
      
      // 打开数据文件句柄
        fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666)
        if err != nil {
            return nil, errors.Wrap(err, "failed to open data file")
        }
     
      // 定位偏移位置
        if _, err := fp.Seek(offset, io.SeekStart); err != nil {
            return nil, errors.Wrap(err, "could not seek to current write offset")
        }
    
      // 最后返回一个 writer 对象
        return &writer{
            s:         s,
            fp:        fp,
            ref:       ref,
            path:      path,
            offset:    offset,
            total:     total,
            digester:  digester,
            startedAt: startedAt,
            updatedAt: updatedAt,
        }, nil
    }
    

    !FILENAME content/local/store.go:480

    func (s *store) resumeStatus(ref string, total int64, digester digest.Digester) (content.Status, error) {
        path, _, data := s.ingestPaths(ref)   // 基于 ref 定义生成数据处理的路径
        status, err := s.status(path)         // 获取 ingest 目录的元状态信息
        if err != nil {
            return status, errors.Wrap(err, "failed reading status of resume write")
        }
      // ref 值与 ingest 目录下检验是否一致
        if ref != status.Ref {
            return status, errors.Wrapf(err, "ref key does not match: %v != %v", ref, status.Ref)
        }
    
      // 大小检验
        if total > 0 && status.Total > 0 && total != status.Total {
            return status, errors.Errorf("provided total differs from status: %v != %v", total, status.Total)
        }
    
        // 打开 blob 数据文件句柄
        fp, err := os.Open(data)
        if err != nil {
            return status, err
        }
    
        p := bufPool.Get().(*[]byte)
        status.Offset, err = io.CopyBuffer(digester.Hash(), fp, *p)
        bufPool.Put(p)
        fp.Close()
        return status, err   //返回状态信息
    }
    

    store ReaderAt 返回 blob 的 io.ReaderAt ,其代码实现为标准的文件打开句柄加上文件大小。 ocispec.Descriptor OCI 标准格式描述符指定了需要读取的文件 blob 内容摘要散列值,通过在路径 $root/blobs/$digest 查找文件。

    !FILENAME content/local/store.go:125

    // ReaderAt returns an io.ReaderAt for the blob.
    func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
        p := s.blobPath(desc.Digest)     // 通过指定的摘要散列值生成 blob 文件对象路径
        fi, err := os.Stat(p)
        if err != nil {
            if !os.IsNotExist(err) {
                return nil, err
            }
            return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
        }
    
        fp, err := os.Open(p)           // 打开文件获取文件句柄
        if err != nil {
            if !os.IsNotExist(err) {
                return nil, err
            }
    
            return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
        }
    
        return sizeReaderAt{size: fi.Size(), fp: fp}, nil  
    }
    

    其它剩余的 content store 方法(Info、Update、List、Delete、Status、ListStatuses、WriteAbort、Abort)实现也类似,将不再一一展开。

    附录

    ctr content 命令

    Name:        "get",
    Usage:       "get the data for an object",
    ArgsUsage:   "[<digest>, ...]",
    Description: "display the image object",
    
    
    Name:        "ingest",
    Usage:       "accept content into the store",
    ArgsUsage:   "[flags] <key>",
    Description: "ingest objects into the local content store",
    
    
    Name:        "active",
    Usage:       "display active transfers",
    ArgsUsage:   "[flags] [<regexp>]",
    Description: "display the ongoing transfers",
    
    
    Name:        "list",
    Aliases:     []string{"ls"},
    Usage:       "list all blobs in the store",
    ArgsUsage:   "[flags]",
    Description: "list blobs in the content store",
    
    
    Name:        "label",
    Usage:       "add labels to content",
    ArgsUsage:   "<digest> [<label>=<value> ...]",
    Description: "labels blobs in the content store",
    
    
    Name:        "edit",
    Usage:       "edit a blob and return a new digest",
    ArgsUsage:   "[flags] <digest>",
    Description: "edit a blob and return a new digest",
    
    
    Name:      "delete",
    Aliases:   []string{"del", "remove", "rm"},
    Usage:     "permanently delete one or more blobs",
    ArgsUsage: "[<digest>, ...]",
    Description: `Delete one or more blobs permanently. Successfully deleted
    blobs are printed to stdout.`,
    
    
    Name:        "fetch-object",
    Usage:       "retrieve objects from a remote",
    ArgsUsage:   "[flags] <remote> <object> [<hint>, ...]",
    Description: `Fetch objects by identifier from a remote.`,
    Flags:       commands.RegistryFlags,
    
    
    Name:        "push-object",
    Usage:       "push an object to a remote",
    ArgsUsage:   "[flags] <remote> <object> <type>",
    Description: `Push objects by identifier to a remote.`,
    
    
    Name:      "fetch",
    Usage:     "fetch all content for an image into containerd",
    ArgsUsage: "[flags] <remote> <object>",
    Description: `Fetch an image into containerd.
    

    ~~ 本文 END ~~

    相关文章

      网友评论

          本文标题:Containerd Content 服务

          本文链接:https://www.haomeiwen.com/subject/bwnxpctx.html