概述
本文主要为从代码层面分析 Containerd diff 服务模块的实现逻辑,如下图 containerd 架构图所示:
containerd-diff.pngContainerd diff 服务模块,实现了 diff 和 apply 两个主要的服务管理功能 :
-
Diff 服务计算所提供的上层/下层 mount 目录的差异,遵从 OCI 规范 Changesets (变化集)打包 tar diff 镜像层存储。
-
Apply 服务将所指定描述器( ocispec.Descriptor )的参照内容应用至指定的挂载目录。通常情况下,描述器指向一个 tar 格式的文件系统差异,此差异文件系统 tar 将被应用于挂载点的顶层。
简单点描述为 " Diff 功能实现 diff layer 生成 , Apply 功能实现 diff layer 挂载 "。
Containerd 被设计为一个类似微服务的架构, 针对各个组件都提供了 gRPC 接口来进行访问。Containerd 各组件以插件机制来组织管理,本文 diff 的服务将从插件注册开始展开。
Diff 服务注册链关系
上层 GRPC Plugin 注册 diff , 获取已注册 Diff 服务插件列表,返回 Diff RPC 服务实现类 service{} 对象
!FILENAME services/diff/service.go:29
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "diff",
Requires: []plugin.Type{
plugin.ServicePlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
plugins, err := ic.GetByType(plugin.ServicePlugin) // 获取所有服务插件
if err != nil {
return nil, err
}
p, ok := plugins[services.DiffService] // 获取 diff 服务插件
if !ok {
return nil, errors.New("diff service not found")
}
i, err := p.Instance() // 返回 diff 服务插件 initFn 的实例化对象
if err != nil {
return nil, err
}
return &service{local: i.(diffapi.DiffClient)}, nil
},
})
}
Service Plugin 服务插件 diff 服务注册,返回 Local{} 服务实现类对象
!FILENAME services/diff/local.go:49
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin,
ID: services.DiffService,
Requires: []plugin.Type{
plugin.DiffPlugin,
},
Config: defaultDifferConfig, // unix 默认 Order: []string{"walking"}
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
differs, err := ic.GetByType(plugin.DiffPlugin)
if err != nil {
return nil, err
}
orderedNames := ic.Config.(*config).Order
ordered := make([]differ, len(orderedNames))
for i, n := range orderedNames {
differp, ok := differs[n]
if !ok {
return nil, errors.Errorf("needed differ not loaded: %s", n)
}
d, err := differp.Instance()
if err != nil {
return nil, errors.Wrapf(err, "could not load required differ due plugin init error: %s", n)
}
ordered[i], ok = d.(differ)
if !ok {
return nil, errors.Errorf("differ does not implement Comparer and Applier interface: %s", n)
}
}
return &local{ // 返回服务实现类 local{} 实例,即上层 RPC 注册时 p.Instance()
differs: ordered, // 指定了 differs 对象列表,底层实现 differ 接口方法 (walking)
}, nil
},
})
}
底层实现类插件 Diff Plugin 注册 "walking" , 返回实例 diffPlugin{} 实现类对象
!FILENAME diff/walking/plugin/plugin.go:28
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.DiffPlugin,
ID: "walking",
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
md, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
ic.Meta.Platforms = append(ic.Meta.Platforms, platforms.DefaultSpec())
// 创建 metadata.DB 类型内容存储库,作为后面插件传入参数,用于保存 diff layer 元数据项。
cs := md.(*metadata.DB).ContentStore()
return diffPlugin{
Comparer: walking.NewWalkingDiff(cs), // 差异比对器操作对象
Applier: apply.NewFileSystemApplier(cs), // 文件系统应用器操作对象
}, nil
},
})
}
Diff gRPC Service 和 local diff 实现
前面 diff 服务插件注册时返回为 local{} 类对象,与此同时 diff 服务提供了下面两个方法 Apply() 、Diff()。而 service.local 属性为 diffapi.DiffClient 类型,代表diff client GRPC 请求来实现功能调用。而 diff 服务插件的注册时返回的实现了 diff 服务的类对象则是 local{} ,所有 s.local.Diff 的调用即是 local 类的 Diff 方法。此处应该注意 s.local 的 local 是一个 client 接口,而实现类也是一个同名的 local 类。
!FILENAME services/diff/service.go:65
func (s *service) Apply(ctx context.Context, er *diffapi.ApplyRequest) (*diffapi.ApplyResponse, error) {
return s.local.Apply(ctx, er) // RPC 请求 "/containerd.services.diff.v1.Diff/Apply"
}
func (s *service) Diff(ctx context.Context, dr *diffapi.DiffRequest) (*diffapi.DiffResponse, error) {
return s.local.Diff(ctx, dr) // RPC 请求 "/containerd.services.diff.v1.Diff/Diff"
}
local.Apply
!FILENAME services/diff/local.go:94
func (l *local) Apply(ctx context.Context, er *diffapi.ApplyRequest, _ ...grpc.CallOption) (*diffapi.ApplyResponse, error) {
var (
ocidesc ocispec.Descriptor
err error
desc = toDescriptor(er.Diff)
mounts = toMounts(er.Mounts)
)
var opts []diff.ApplyOpt
if er.Payloads != nil {
opts = append(opts, diff.WithPayloads(er.Payloads))
}
// differ 应用调用
for _, differ := range l.differs {
ocidesc, err = differ.Apply(ctx, desc, mounts, opts...)
if !errdefs.IsNotImplemented(err) {
break
}
}
if err != nil {
return nil, errdefs.ToGRPC(err)
}
// grpc响应,应用返回的 ocidesc.Descriptor
return &diffapi.ApplyResponse{
Applied: fromDescriptor(ocidesc),
}, nil
}
local.Diff
!FILENAME services/diff/local.go:124
func (l *local) Diff(ctx context.Context, dr *diffapi.DiffRequest, _ ...grpc.CallOption) (*diffapi.DiffResponse, error) {
var (
ocidesc ocispec.Descriptor
err error
aMounts = toMounts(dr.Left)
bMounts = toMounts(dr.Right)
)
// 参数配置
var opts []diff.Opt
if dr.MediaType != "" {
opts = append(opts, diff.WithMediaType(dr.MediaType))
}
if dr.Ref != "" {
opts = append(opts, diff.WithReference(dr.Ref))
}
if dr.Labels != nil {
opts = append(opts, diff.WithLabels(dr.Labels))
}
// differ 差异比较调用
for _, d := range l.differs {
ocidesc, err = d.Compare(ctx, aMounts, bMounts, opts...)
if !errdefs.IsNotImplemented(err) {
break
}
}
if err != nil {
return nil, errdefs.ToGRPC(err)
}
// grpc响应,应用返回的 ocidesc.Descriptor
return &diffapi.DiffResponse{
Diff: fromDescriptor(ocidesc),
}, nil
}
我们可以看到上面的 gRPC 实现服务层 Local 调用 Apply 和 Diff , 最终会调用底层 diffPlugin 类实例化的两个属性值对象: walking.walkingDiff 和 apply.fsApplier ,下面将一一分解分析。
Comparer 接口和 walking 实现
Diff 的差异比对器的相关接口与结构定义
!FILENAME diff/diff.go:46
// Comparer allows creation of filesystem diffs between mounts
// Comparer 允许创建两个挂载点的文件系统差异
type Comparer interface {
// Compare 计算两个挂载点的不同,返回计算差异 diff 的描述符。
// 参数opts (可选项):
// ref 参照ID -- 可被用于定位所创建的 diff 的实际内容
// media 类型 -- 用于决定所创建实际内容的格式
Compare(ctx context.Context, lower, upper []mount.Mount, opts ...Opt) (ocispec.Descriptor, error)
}
!FILENAME diff/diff.go:28
// Opt 用于配置 diff 操作,通过 func 的方式来操作 diff config
type Opt func(*Config) error
// Config is used to hold parameters needed for a diff operation
type Config struct {
// 生成的 diff 层的类型如:“application/vnd.oci.image.layer.v1.tar+gzip”
MediaType string
// 内容的上传参照
// 默认为随机字符串
Reference string
// 对生成的内容应用标签
Labels map[string]string
}
Walking.NewWalkingDiff(cs) 返回 walkingDiff ,此类对象实现了 diff.Comparer 接口。walkingDiff store 属性为内容存储 content.Store 。
!FILENAME diff/walking/differ.go:52
func NewWalkingDiff(store content.Store) diff.Comparer {
return &walkingDiff{
store: store, // 内容存储
}
}
// 类结构定义
type walkingDiff struct {
store content.Store
}
Compare() 基于指定的两个挂载目录创建 diff layer,并将其 diff 内容上传到内容存储库
walkingDiff Compare 方法实现:
- upper 上层为变化层挂载目录,lower 下层为基线层挂载目录,上/下进行差异比对;
- 按照 OCI changesets layer 规范打包计算差异结果集 diff tar 并存储至content store(注意两块存储: content 和 content matedata );
- 通过对目录路径对比、文件属性对比、文件内容字节对比来分析与计算变化类型;
- 变化类型主要包含:add 、modify 、delete 、unmodified。
!FILENAME diff/walking/differ.go:60
func (s *walkingDiff) Compare(ctx context.Context, lower, upper []mount.Mount, opts ...diff.Opt) (d ocispec.Descriptor, err error) {
// 根据 opts 设置 diff 操作 Config
var config diff.Config
for _, opt := range opts {
if err := opt(&config); err != nil {
return emptyDesc, err
}
}
// 设置默认媒体类型
if config.MediaType == "" {
config.MediaType = ocispec.MediaTypeImageLayerGzip //压缩镜像层类型
}
var isCompressed bool
switch config.MediaType { // 类型判断与 isCompressed 压缩标识
case ocispec.MediaTypeImageLayer: // 非压缩镜像层类型
case ocispec.MediaTypeImageLayerGzip:
isCompressed = true
default:
return emptyDesc, errors.Wrapf(errdefs.ErrNotImplemented, "unsupported diff media type: %v", config.MediaType)
}
var ocidesc ocispec.Descriptor
// 临时目录挂载上 和 下镜像层
if err := mount.WithTempMount(ctx, lower, func(lowerRoot string) error {
return mount.WithTempMount(ctx, upper, func(upperRoot string) error {
var newReference bool
if config.Reference == "" {
newReference = true
config.Reference = uniqueRef() // 生成 ref 唯一值
}
// 获取 store.Writer
cw, err := s.store.Writer(ctx,
content.WithRef(config.Reference),
content.WithDescriptor(ocispec.Descriptor{
MediaType: config.MediaType, // most contentstore implementations just ignore this
}))
if err != nil {
return errors.Wrap(err, "failed to open writer")
}
//...
if !newReference {
if err = cw.Truncate(0); err != nil {
return err
}
}
// 压缩类型镜像层处理写入 diff 内容数据
if isCompressed {
dgstr := digest.SHA256.Digester() // SHA256 算法摘要串生成器
var compressed io.WriteCloser
// 压缩 Gzip
compressed, err = compression.CompressStream(cw, compression.Gzip)
if err != nil {
return errors.Wrap(err, "failed to get compressed stream")
}
// +计算差异并写入 diff 内容数据 tar
err = archive.WriteDiff(ctx, io.MultiWriter(compressed, dgstr.Hash()), lowerRoot, upperRoot)
compressed.Close()
if err != nil {
return errors.Wrap(err, "failed to write compressed diff")
}
if config.Labels == nil {
config.Labels = map[string]string{}
} // 压缩标签,存入摘要字串
config.Labels[uncompressed] = dgstr.Digest().String()
} else {
// 非压缩类型 tar 镜像层处理写入 diff 内容数据
if err = archive.WriteDiff(ctx, cw, lowerRoot, upperRoot); err != nil {
return errors.Wrap(err, "failed to write diff")
}
}
// commit 标签选项的设置
var commitopts []content.Opt
if config.Labels != nil {
commitopts = append(commitopts, content.WithLabels(config.Labels))
}
// 生成内容摘要值,并接交至内容元数据存储
dgst := cw.Digest()
if err := cw.Commit(ctx, 0, dgst, commitopts...); err != nil {
if !errdefs.IsAlreadyExists(err) {
return errors.Wrap(err, "failed to commit")
}
}
// 获取 dgst 对应的内容存储 info 信息
info, err := s.store.Info(ctx, dgst)
if err != nil {
return errors.Wrap(err, "failed to get info from content store")
}
if info.Labels == nil {
info.Labels = make(map[string]string)
}
// 内容存储 dgst 存在但无压缩标签时,设置 info 压缩标签
if _, ok := info.Labels[uncompressed]; !ok {
info.Labels[uncompressed] = config.Labels[uncompressed]
if _, err := s.store.Update(ctx, info, "labels."+uncompressed); err != nil {
return errors.Wrap(err, "error setting uncompressed label")
}
}
// 返回 diff 内容数据的描述器 Descriptor
ocidesc = ocispec.Descriptor{
MediaType: config.MediaType,
Size: info.Size,
Digest: info.Digest,
}
return nil
})
}); err != nil {
return emptyDesc, err
}
return ocidesc, nil
}
WriteDiff 计算所提供的两个目录(a/b)的差异并写入 tar 包数据流。fs.Changes() 遍历计算差异,回调函数HandleChange为 ChangeSets 差异集 tar 打包处理逻辑, 其所生成的 tar 使用 OCI 标准规范的的文件标记用于对删除的文件表示。删除的文件基于 AUFS whiteouts 规范以 ".wh." 作为前缀扩充命名文件。详细规范可以参考官方 image-spec 或 image-spec 中文
!FILENAME archive/tar.go:72
func WriteDiff(ctx context.Context, w io.Writer, a, b string) error {
cw := newChangeWriter(w, b) // +创建变化 writer
err := fs.Changes(ctx, a, b, cw.HandleChange)
if err != nil {
return errors.Wrap(err, "failed to create diff tar stream")
}
return cw.Close()
}
!FILENAME archive/tar.go:451
func newChangeWriter(w io.Writer, source string) *changeWriter {
return &changeWriter{
tw: tar.NewWriter(w), // 创建 Writer 写数据
source: source, // 变化目录
whiteoutT: time.Now(),
inodeSrc: map[uint64]string{},
inodeRefs: map[uint64][]string{},
addedDirs: map[string]struct{}{},
}
}
Changes 计算调用给定的变化计算函数 func 为每个变化类型进行处理。'a' 是指基线目录和 'b' 是更改的目录。变化回调按路径名排顺序调用和应用。正因此顺序,下面是情况为正确的:
-
删除的目录树只为根目录创建一个更改目录已删除项。其余的更改是隐含的。
-
一个目录修改为文件将不会有删除子路径项的条目,通过父目录删除条目来暗指这些条目删除。
隐藏目录不会被特殊处理,每个文件从基本目录中删除将显示为删除。
将对具有时间戳的文件进行文件内容比较可能存在被截断情况。如果正在比较的任意一个文件存在有一个零值纳秒值,将比较每个字节差异。如果两个文件具有相同的秒值但不同纳秒值如果其中一个值为零,则文件将如果内容相同,则视为未更改。这种行为是为了说明打包处理期间对时间戳截断的引起。
!FILENAME vendor/github.com/containerd/continuity/fs/diff.go:101
func Changes(ctx context.Context, a, b string, changeFn ChangeFunc) error {
if a == "" {
// 如果 'a' 基线目录为空,则直接以增加类型变化处理所有文件
logrus.Debugf("Using single walk diff for %s", b)
return addDirChanges(ctx, changeFn, b)
} else if diffOptions := detectDirDiff(b, a); diffOptions != nil {
// 有配置 diffOptions,当前版本 detectDirDiff() 直接返回 nil ,可忽略此逻辑(TODO项)
logrus.Debugf("Using single walk diff for %s from %s", diffOptions.diffDir, a)
return diffDirChanges(ctx, changeFn, a, diffOptions)
}
// 使用上/下双目录同时遍历与计算 diff
logrus.Debugf("Using double walk diff for %s from %s", b, a)
return doubleWalkDiff(ctx, changeFn, a, b)
}
HandleChange 处理逻辑 func ,作为 Changes 计算的输入参数,为每个改变类型进行处理。变化类型主要包含:add 、modify 、delete 、unmodified(未更改)、空。
针对删除类型处理方式为 whiteOut ,则对删除的文件或目录创建前缀 ".wh."+原始名的 whiteOut 文件。其它类型将内容进行 tar 打包。
!FILENAME archive/tar.go:462
func (cw *changeWriter) HandleChange(k fs.ChangeKind, p string, f os.FileInfo, err error) error {
if err != nil {
return err
}
if k == fs.ChangeKindDelete {
// 变化类型为"删除"
whiteOutDir := filepath.Dir(p)
whiteOutBase := filepath.Base(p)
// whiteOut 前缀 ".wh." + 原始文件名
whiteOut := filepath.Join(whiteOutDir, whiteoutPrefix+whiteOutBase)
hdr := &tar.Header{
Typeflag: tar.TypeReg,
Name: whiteOut[1:],
Size: 0,
ModTime: cw.whiteoutT,
AccessTime: cw.whiteoutT,
ChangeTime: cw.whiteoutT,
}
// whiteOut 父级目录不为 root 目录则作为修改变化类型处理
if err := cw.includeParents(hdr); err != nil {
return err
}
// tar 写入 whiteout Header
if err := cw.tw.WriteHeader(hdr); err != nil {
return errors.Wrap(err, "failed to write whiteout header")
}
} else {
// 其它变化类型
var (
link string
err error
source = filepath.Join(cw.source, p)
)
switch {
case f.Mode()&os.ModeSocket != 0: // 忽略 sockets 文件
return nil
case f.Mode()&os.ModeSymlink != 0: // 链接文件则获取链接的目标位置
if link, err = os.Readlink(source); err != nil {
return err
}
}
// 创建一个部分填充的头,如果f为链接,将链接记录为链接目标。如果f为目录,则在名称后附加斜杠。
hdr, err := tar.FileInfoHeader(f, link)
if err != nil {
return err
}
// 设置 header 权限与模式位
hdr.Mode = int64(chmodTarEntry(os.FileMode(hdr.Mode)))
name := p
if strings.HasPrefix(name, string(filepath.Separator)) {
name, err = filepath.Rel(string(filepath.Separator), name)
if err != nil {
return errors.Wrap(err, "failed to make path relative")
}
}
// 返回系统支持的 Name
name, err = tarName(name)
if err != nil {
return errors.Wrap(err, "cannot canonicalize path")
}
// suffix with '/' for directories
if f.IsDir() && !strings.HasSuffix(name, "/") {
name += "/"
}
// 设置 header Name
hdr.Name = name
if err := setHeaderForSpecialDevice(hdr, name, f); err != nil {
return errors.Wrap(err, "failed to set device headers")
}
// 链接处理
var additionalLinks []string
inode, isHardlink := fs.GetLinkInfo(f)
if isHardlink {
// 硬链接
// If the inode has a source, always link to it
if source, ok := cw.inodeSrc[inode]; ok {
hdr.Typeflag = tar.TypeLink
hdr.Linkname = source
hdr.Size = 0
} else {
if k == fs.ChangeKindUnmodified {
cw.inodeRefs[inode] = append(cw.inodeRefs[inode], name)
return nil
}
cw.inodeSrc[inode] = name
additionalLinks = cw.inodeRefs[inode]
delete(cw.inodeRefs, inode)
}
} else if k == fs.ChangeKindUnmodified {
// 无改变类型,直接返回
return nil
}
// header 设置 security.capability
if capability, err := getxattr(source, "security.capability"); err != nil {
return errors.Wrap(err, "failed to get capabilities xattr")
} else if capability != nil {
if hdr.PAXRecords == nil {
hdr.PAXRecords = map[string]string{}
}
hdr.PAXRecords[paxSchilyXattr+"security.capability"] = string(capability)
}
if err := cw.includeParents(hdr); err != nil {
return err
}
// tar 写入文件头
if err := cw.tw.WriteHeader(hdr); err != nil {
return errors.Wrap(err, "failed to write file header")
}
if hdr.Typeflag == tar.TypeReg && hdr.Size > 0 {
file, err := open(source) // 打开文件
if err != nil {
return errors.Wrapf(err, "failed to open path: %v", source)
}
defer file.Close()
// tar 写入文件内容数据
n, err := copyBuffered(context.TODO(), cw.tw, file)
if err != nil {
return errors.Wrap(err, "failed to copy")
}
if n != hdr.Size {
return errors.New("short write copying file")
}
}
// 写入 tar header 附加链信息
if additionalLinks != nil {
source = hdr.Name
for _, extra := range additionalLinks {
hdr.Name = extra
hdr.Typeflag = tar.TypeLink
hdr.Linkname = source
hdr.Size = 0
if err := cw.includeParents(hdr); err != nil {
return err
}
if err := cw.tw.WriteHeader(hdr); err != nil {
return errors.Wrap(err, "failed to write file header")
}
}
}
}
return nil
}
addDirChanges 直接以 " add " 增加类型变化处理所有文件
!FILENAME vendor/github.com/containerd/continuity/fs/diff.go:114
func addDirChanges(ctx context.Context, changeFn ChangeFunc, root string) error {
return filepath.Walk(root, func(path string, f os.FileInfo, err error) error {
if err != nil {
return err
}
// Rebase path
path, err = filepath.Rel(root, path)
if err != nil {
return err
}
path = filepath.Join(string(os.PathSeparator), path)
// Skip root
if path == string(os.PathSeparator) {
return nil
}
return changeFn(ChangeKindAdd, path, f, nil) // 增加变化类型处理
})
}
doubleWalkDiff 遍历两个 A 、B 目录进行对比分析差异,并调用 changeFn 处理
!FILENAME vendor/github.com/containerd/continuity/fs/diff.go:234
// doubleWalkDiff walks both directories to create a diff
func doubleWalkDiff(ctx context.Context, changeFn ChangeFunc, a, b string) (err error) {
g, ctx := errgroup.WithContext(ctx)
var (
c1 = make(chan *currentPath)
c2 = make(chan *currentPath)
f1, f2 *currentPath
rmdir string
)
g.Go(func() error {
defer close(c1)
return pathWalk(ctx, a, c1) // 遍历 A 目录,写入 chan C1
})
g.Go(func() error {
defer close(c2)
return pathWalk(ctx, b, c2) // 遍历 B 目录,写入 chan C2
})
g.Go(func() error {
for c1 != nil || c2 != nil { //循环比对
if f1 == nil && c1 != nil {
f1, err = nextPath(ctx, c1)
if err != nil {
return err
}
if f1 == nil {
c1 = nil
}
}
if f2 == nil && c2 != nil {
f2, err = nextPath(ctx, c2)
if err != nil {
return err
}
if f2 == nil {
c2 = nil
}
}
if f1 == nil && f2 == nil {
continue
}
var f os.FileInfo
k, p := pathChange(f1, f2) // +计算路径并返回变化类型
switch k {
case ChangeKindAdd: // 增加变化类型
if rmdir != "" {
rmdir = ""
}
f = f2.f
f2 = nil
case ChangeKindDelete:// 删除变化类型
if rmdir != "" && strings.HasPrefix(f1.path, rmdir) {
f1 = nil
continue
} else if f1.f.IsDir() { // 如果是目录则赋值 rmdir
rmdir = f1.path + string(os.PathSeparator)
} else if rmdir != "" {
rmdir = ""
}
f1 = nil
case ChangeKindModify: // 修改变化类型
same, err := sameFile(f1, f2) // 计算两者文件是否有修改
if err != nil {
return err
}
if f1.f.IsDir() && !f2.f.IsDir() { // 如果文件修改为目录,删除目录
rmdir = f1.path + string(os.PathSeparator)
} else if rmdir != "" {
rmdir = ""
}
f = f2.f
f1 = nil
f2 = nil
if same { // 一致则为 Unmodified 未修改变化类型
if !isLinked(f) {
continue
}
k = ChangeKindUnmodified
}
}
// 调用 changeFn 处理变化
if err := changeFn(k, p, f, nil); err != nil {
return err
}
}
return nil
})
return g.Wait()
}
pathChange() 计算路径是否改变
vendor/github.com/containerd/continuity/fs/path.go:39
func pathChange(lower, upper *currentPath) (ChangeKind, string) {
// lower 不存在, upper 存在,则为 ADD 增加类型变化
if lower == nil {
if upper == nil {
panic("cannot compare nil paths")
}
return ChangeKindAdd, upper.path
}
// 上面反之则为 Delete 删除类型变化
if upper == nil {
return ChangeKindDelete, lower.path
}
// 都存在的两者目录的路径长度大小比较或路径字符大小逐一比较
switch i := directoryCompare(lower.path, upper.path); {
case i < 0:
// upper 不存在,lower 存在
return ChangeKindDelete, lower.path
case i > 0:
// upper 存在,lower 不存在
return ChangeKindAdd, upper.path
default:
// 默认
return ChangeKindModify, upper.path
}
}
func directoryCompare(a, b string) int {
l := len(a)
if len(b) < l {
l = len(b)
}
// 路径字符逐一比较
for i := 0; i < l; i++ {
c1, c2 := a[i], b[i]
if c1 == filepath.Separator {
c1 = byte(0)
}
if c2 == filepath.Separator {
c2 = byte(0)
}
if c1 < c2 {
return -1
}
if c1 > c2 {
return +1
}
}
// 路径字符长度比较
if len(a) < len(b) {
return -1
}
if len(a) > len(b) {
return +1
}
return 0
}
sameFile 计算文件是否被修改。
!FILENAME vendor/github.com/containerd/continuity/fs/path.go:91
func sameFile(f1, f2 *currentPath) (bool, error) {
// fileStat 比较是否一致
if os.SameFile(f1.f, f2.f) {
return true, nil
}
// 系统 Stat 是否一致
equalStat, err := compareSysStat(f1.f.Sys(), f2.f.Sys())
if err != nil || !equalStat {
return equalStat, err
}
// security.capability 是否一致
if eq, err := compareCapabilities(f1.fullPath, f2.fullPath); err != nil || !eq {
return eq, err
}
// 如果不是目录检测文件大小、修改时间、和内容
if !f1.f.IsDir() {
if f1.f.Size() != f2.f.Size() { // 文件大小
return false, nil
}
t1 := f1.f.ModTime()
t2 := f2.f.ModTime()
if t1.Unix() != t2.Unix() { // 修改时间比对
return false, nil
}
// 纳秒值存在截断情况,进一步检测两者内容差异
if t1.Nanosecond() == 0 && t2.Nanosecond() == 0 {
var eq bool
// 链接类型比对链接目录是否一致
if (f1.f.Mode() & os.ModeSymlink) == os.ModeSymlink {
eq, err = compareSymlinkTarget(f1.fullPath, f2.fullPath)
} else if f1.f.Size() > 0 {
// 比对文件内容,两个文件内容每个字节进行对比是否存在差异
eq, err = compareFileContent(f1.fullPath, f2.fullPath)
}
if err != nil || !eq {
return eq, err
}
} else if t1.Nanosecond() != t2.Nanosecond() {
return false, nil
}
}
return true, nil
}
Applier 接口和 fsApplier 应用实现
Apply 将所指定描述器( ocispec.Descriptor )的参照内容应用至指定的挂载目录。 例如:通常情况下,描述器是一个 tar 格式的文件系统差异,此差异文件系统 tar 将被应用于挂载点的顶层。
OCI image-spec 规范对 Changesets (变化集)应用描述:
- 媒体类型为 "application/vnd.oci.image.layer.v1.tar" 的变化集 layer 被应用不仅仅是解包 tar 文件
- 变化集 layer 应用需要重点考虑 whiteout 文件处理
- 在无 whiteout 文件的Changesets (变化集) 如同正常 tar 文件解包
应用 Changesets (变化集) 实体,如果目标路径文件已存在,则:
- 移除文件路径
- 基于变化集实体项内容与属性,重建文件路径
Applier 接口定义如下:
!FILENAME diff/diff.go:65
type Applier interface {
Apply(ctx context.Context, desc ocispec.Descriptor, mount []mount.Mount, opts ...ApplyOpt) (ocispec.Descriptor, error)
}
fsApplier 是 Applier 接口的实现类,定义与对象构造如下,注意唯一参数 content.Provider 内容读取器
!FILENAME diff/apply/apply.go:37
func NewFileSystemApplier(cs content.Provider) diff.Applier {
return &fsApplier{
store: cs, // 内容读取器
}
}
type fsApplier struct {
store content.Provider
}
fsApplier.Apply 实现 Applier 接口的 Apply 方法。Apply 将所指定摘要值关联的内容应用至指定的挂载目录,打包文件将被解包或解压缩释放。
!FILENAME diff/apply/apply.go:52
func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts []mount.Mount, opts ...diff.ApplyOpt) (d ocispec.Descriptor, err error) {
t1 := time.Now()
//...
// apply 操作 opts 配置
var config diff.ApplyConfig
for _, o := range opts {
if err := o(ctx, desc, &config); err != nil {
return emptyDesc, errors.Wrap(err, "failed to apply config opt")
}
}
// 基于指定的 desc 描述符读取 content 内容
ra, err := s.store.ReaderAt(ctx, desc)
if err != nil {
return emptyDesc, errors.Wrap(err, "failed to get reader from content store")
}
defer ra.Close()
var processors []diff.StreamProcessor
// 创建内容处理器
processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra))
processors = append(processors, processor)
for {
// 获取 config 指定的类型内容处理器
if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil {
return emptyDesc, errors.Wrapf(err, "failed to get stream processor for %s", desc.MediaType)
}
processors = append(processors, processor)
if processor.MediaType() == ocispec.MediaTypeImageLayer {
break
}
}
defer processor.Close()
// 创建摘要生成器
digester := digest.Canonical.Digester()
// 从 processor 读写入至 digester.Hash() ,返回 reader;
// readCounter 包含读取器对象和读取大小统计
rc := &readCounter{
r: io.TeeReader(processor, digester.Hash()),
}
// +将内容应用至挂载目录上
if err := apply(ctx, mounts, rc); err != nil {
return emptyDesc, err
}
// Read any trailing data
if _, err := io.Copy(ioutil.Discard, rc); err != nil {
return emptyDesc, err
}
for _, p := range processors {
if ep, ok := p.(interface {
Err() error
}); ok {
if err := ep.Err(); err != nil {
return emptyDesc, err
}
}
}
// 返回镜像层的 Descriptor
return ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageLayer,
Size: rc.c,
Digest: digester.Digest(),
}, nil
}
此处理关注于 linux 系统版本和 aufs 挂载文件系统方式 apply 实现逻辑。此处 aufs 联合文件系统基础知识可以参考
!FILENAME diff/apply/apply_linux.go:32
func apply(ctx context.Context, mounts []mount.Mount, r io.Reader) error {
switch {
// overlay 文件系统; mounts 长度为1
case len(mounts) == 1 && mounts[0].Type == "overlay":
path, parents, err := getOverlayPath(mounts[0].Options)
if err != nil {
if errdefs.IsInvalidArgument(err) {
break
}
return err
}
opts := []archive.ApplyOpt{
archive.WithConvertWhiteout(archive.OverlayConvertWhiteout),
}
if len(parents) > 0 {
opts = append(opts, archive.WithParents(parents))
}
_, err = archive.Apply(ctx, path, r, opts...)
return err
// aufs 文件系统;mounts 长度为1
case len(mounts) == 1 && mounts[0].Type == "aufs":
// 返回 mounts 的(上层 path )读写层和(下层 parents )只读层
path, parents, err := getAufsPath(mounts[0].Options)
if err != nil {
if errdefs.IsInvalidArgument(err) {
break
}
return err
}
// 转化 whiteout 文件配置处理 func
opts := []archive.ApplyOpt{
archive.WithConvertWhiteout(archive.AufsConvertWhiteout),
}
if len(parents) > 0 {
opts = append(opts, archive.WithParents(parents))
}
// +diff tar 数据流应用至一个目录
_, err = archive.Apply(ctx, path, r, opts...)
return err
}
// +other
return mount.WithTempMount(ctx, mounts, func(root string) error {
_, err := archive.Apply(ctx, root, r)
return err
})
}
应用 OCI 规范的 diff tar 数据流, OCI 规范变化集应用详情可参考applying-changesets
!FILENAME archive/tar.go:101
// Apply applies a tar stream of an OCI style diff tar.
func Apply(ctx context.Context, root string, r io.Reader, opts ...ApplyOpt) (int64, error) {
root = filepath.Clean(root)
// 应用选项配置
var options ApplyOptions
for _, opt := range opts {
if err := opt(&options); err != nil {
return 0, errors.Wrap(err, "failed to apply option")
}
}
if options.Filter == nil {
options.Filter = all
}
if options.applyFunc == nil {
options.applyFunc = applyNaive // 应用操作 func
}
// +调用 applyFunc 为 applyNaive()
return options.applyFunc(ctx, root, tar.NewReader(r), options)
}
applyNaive 为原生 diff Apply 实现处理逻辑,将 OCI 规范 diff tar 数据流应用至一个目录上。 主要对 whiteout 文件进行删除处理 ,对其它文件进行 tar 解包并更新属性。
!FILENAME archive/tar.go:123
func applyNaive(ctx context.Context, root string, tr *tar.Reader, options ApplyOptions) (size int64, err error) {
var (
dirs []*tar.Header
// Used for handling opaque directory markers which
// may occur out of order
unpackedPaths = make(map[string]struct{})
convertWhiteout = options.ConvertWhiteout
)
// 当为nil时, 定义 Whiteout 转化 func 实现
if convertWhiteout == nil {
// convertWhiteout func , 通过删除目标文件作为 whiteouts 处理
convertWhiteout = func(hdr *tar.Header, path string) (bool, error) {
base := filepath.Base(path)
dir := filepath.Dir(path)
// 匹配 ".wh..wh..opq" ,处理隐式 whitout 目录
if base == whiteoutOpaqueDir {
_, err := os.Lstat(dir)
if err != nil {
return false, err
}
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
if os.IsNotExist(err) {
err = nil // parent was deleted
}
return err
}
if path == dir {
return nil
}
if _, exists := unpackedPaths[path]; !exists {
err := os.RemoveAll(path) // 删除操作
return err
}
return nil
})
return false, err
}
// 匹配前缀 ' .wh.* '
if strings.HasPrefix(base, whiteoutPrefix) {
originalBase := base[len(whiteoutPrefix):]
originalPath := filepath.Join(dir, originalBase)
return false, os.RemoveAll(originalPath) // 删除操作
}
return true, nil
}
}
// 迭代包内( tar.Reader )所有文件进行处理
for {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
hdr, err := tr.Next() //下一个文件
//...
// Split 根目录的名称并解析符号链接。
ppath, base := filepath.Split(hdr.Name)
ppath, err = fs.RootPath(root, ppath)
if err != nil {
return 0, errors.Wrap(err, "failed to get root path")
}
// 在连接到父路径之前连接到 root,以确保在添加到父路径之前已经基于根解析了相对链接。
path := filepath.Join(ppath, filepath.Join("/", base))
if path == root {
log.G(ctx).Debugf("file %q ignored: resolved to root", hdr.Name)
continue
}
// 如果文件不在根目录下,请确保父目录存在或已创建。
if ppath != root {
parentPath := ppath
if base == "" {
parentPath = filepath.Dir(path)
}
if err := mkparent(ctx, parentPath, root, options.Parents); err != nil {
return 0, err
}
}
// 原生 whiteout 转化处理 func 实现是直接移除目标文件
if err := validateWhiteout(path); err != nil {
return 0, err
}
writeFile, err := convertWhiteout(hdr, path)
//...
// 内容读取 reader
srcData := io.Reader(tr)
srcHdr := hdr
// +创建 tar 文件
if err := createTarFile(ctx, path, root, srcHdr, srcData); err != nil {
return 0, err
}
// 必须在最后处理目录 mtime,以避免在其中进一步创建文件来修改目录 mtime
if hdr.Typeflag == tar.TypeDir {
dirs = append(dirs, hdr)
}
unpackedPaths[path] = struct{}{}
}
for _, hdr := range dirs {
path, err := fs.RootPath(root, hdr.Name)
if err != nil {
return 0, err
}
if err := chtimes(path, boundTime(latestTime(hdr.AccessTime, hdr.ModTime)), boundTime(hdr.ModTime)); err != nil {
return 0, err
}
}
return size, nil
}
createTarFile 创建与写入 tar 的内容文件及目录, 此方法为正常 tar 解包文件处理逻辑
!FILENAME archive/tar.go:288
func createTarFile(ctx context.Context, path, extractDir string, hdr *tar.Header, reader io.Reader) error {
hdrInfo := hdr.FileInfo()
// 根据文件类型创建和写入内容
switch hdr.Typeflag {
case tar.TypeDir: // Dir
// Create directory unless it exists as a directory already.
// In that case we just want to merge the two
if fi, err := os.Lstat(path); !(err == nil && fi.IsDir()) {
if err := mkdir(path, hdrInfo.Mode()); err != nil {
return err
}
}
case tar.TypeReg, tar.TypeRegA: // Geg
file, err := openFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, hdrInfo.Mode())
if err != nil {
return err
}
_, err = copyBuffered(ctx, file, reader)
if err1 := file.Close(); err == nil {
err = err1
}
if err != nil {
return err
}
case tar.TypeBlock, tar.TypeChar: // FIFO
// Handle this is an OS-specific way
if err := handleTarTypeBlockCharFifo(hdr, path); err != nil {
return err
}
case tar.TypeFifo: // FIFO
// Handle this is an OS-specific way
if err := handleTarTypeBlockCharFifo(hdr, path); err != nil {
return err
}
case tar.TypeLink: // Hard Link
targetPath, err := hardlinkRootPath(extractDir, hdr.Linkname)
if err != nil {
return err
}
if err := os.Link(targetPath, path); err != nil {
return err
}
case tar.TypeSymlink: // Symlink
if err := os.Symlink(hdr.Linkname, path); err != nil {
return err
}
case tar.TypeXGlobalHeader:
log.G(ctx).Debug("PAX Global Extended Headers found and ignored")
return nil
default:
return errors.Errorf("unhandled tar header type %d\n", hdr.Typeflag)
}
//...
// xattr 属性设置
for key, value := range hdr.PAXRecords {
if strings.HasPrefix(key, paxSchilyXattr) {
key = key[len(paxSchilyXattr):]
if err := setxattr(path, key, value); err != nil {
if errors.Cause(err) == syscall.ENOTSUP {
log.G(ctx).WithError(err).Warnf("ignored xattr %s in archive", key)
continue
}
return err
}
}
}
// change owner 设置
if err := handleLChmod(hdr, path, hdrInfo); err != nil {
return err
}
// 创建时间
return chtimes(path, boundTime(latestTime(hdr.AccessTime, hdr.ModTime)), boundTime(hdr.ModTime))
}
再来看一下 mount.WithTempMount() 的实现,将所有挂载目录列表挂载到临时目录,然后将此临时目录作为传参给回调func f处理,我们可以从上面的代码了解到回调 func 内的处理也就是调用 archive.Apply() 处理,此apply 上面已详细的分析了代码逻辑。此处我们关注如何 mount 所有挂载目录的过程逻辑。
!FILENAME mount/temp.go:33
func WithTempMount(ctx context.Context, mounts []Mount, f func(root string) error) (err error) {
// 生成临时目录"/containerd-mount"
root, uerr := ioutil.TempDir(tempMountLocation, "containerd-mount")
if uerr != nil {
return errors.Wrapf(uerr, "failed to create temp dir")
}
//...
// 挂载所有到上面创建的临时目录
if uerr = All(mounts, root); uerr != nil {
return errors.Wrapf(uerr, "failed to mount %s", root)
}
// 回调func f ,传参为挂载后的目录
return errors.Wrapf(f(root), "mount callback failed on %s", root)
}
All() 遍历挂载所有 mount 到指定的目标目录
!FILENAME mount/mount.go:33
func All(mounts []Mount, target string) error {
for _, m := range mounts {
if err := m.Mount(target); err != nil { // +遍历挂载
return err
}
}
return nil
}
mount.Mount() 为 linux OS 底层对 mount 的调用实现 ,调用 unix.Mount() 来完成挂载操作。
!FILENAME mount/mount_linux.go:38
// Mount to the provided target path
func (m *Mount) Mount(target string) error {
var (
chdir string
options = m.Options
)
// avoid hitting one page limit of mount argument buffer
//
// NOTE: 512 is a buffer during pagesize check.
if m.Type == "overlay" && optionsSize(options) >= pagesize-512 {
chdir, options = compactLowerdirOption(options)
}
flags, data := parseMountOptions(options)
if len(data) > pagesize {
return errors.Errorf("mount options is too long")
}
// propagation types.
const ptypes = unix.MS_SHARED | unix.MS_PRIVATE | unix.MS_SLAVE | unix.MS_UNBINDABLE
// Ensure propagation type change flags aren't included in other calls.
oflags := flags &^ ptypes
// In the case of remounting with changed data (data != ""), need to call mount (moby/moby#34077).
if flags&unix.MS_REMOUNT == 0 || data != "" {
// Initial call applying all non-propagation flags for mount
// or remount with changed data
if err := mountAt(chdir, m.Source, target, m.Type, uintptr(oflags), data); err != nil {
return err
}
}
if flags&ptypes != 0 {
// Change the propagation type.
const pflags = ptypes | unix.MS_REC | unix.MS_SILENT
if err := unix.Mount("", target, "", uintptr(flags&pflags), ""); err != nil {
return err
}
}
const broflags = unix.MS_BIND | unix.MS_RDONLY
if oflags&broflags == broflags {
// Remount the bind to apply read only.
return unix.Mount("", target, "", uintptr(oflags|unix.MS_REMOUNT), "")
}
return nil
}
前面已解析了 diff 服务( diff / apply )的底层实现代码逻辑,我们再来看看上层的对 diff 应用实例的分析,加深对 diff 的应用场景的理解。下面给出了对 contained cli 工具 ctr 命令 snapshot diff 的代码分析。
Snapshot diff 应用
ctr 工具的 snapshot diff 命令实现逻辑
!FILENAME cmd/ctr/commands/snapshots/snapshots.go:112
Action: func(context *cli.Context) error {
var (
idA = context.Args().First()
idB = context.Args().Get(1)
)
if idA == "" {
return errors.New("snapshot id must be provided")
}
client, ctx, cancel, err := commands.NewClient(context)
if err != nil {
return err
}
defer cancel()
ctx, done, err := client.WithLease(ctx)
if err != nil {
return err
}
defer done(ctx)
var desc ocispec.Descriptor
labels := commands.LabelArgs(context.StringSlice("label"))
snapshotter := client.SnapshotService(context.GlobalString("snapshotter"))
fmt.Println(context.String("media-type"))
if context.Bool("keep") {
labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339)
}
opts := []diff.Opt{
diff.WithMediaType(context.String("media-type")),
diff.WithReference(context.String("ref")),
diff.WithLabels(labels),
}
if idB == "" {
// 未指定参数 idB 则 CreateDiff 以“父”为比较对象求两者差异
desc, err = rootfs.CreateDiff(ctx, idA, snapshotter, client.DiffService(), opts...)
if err != nil {
return err
}
} else { // 指定参数 idB 则 Compare
// 挂载快照
desc, err = withMounts(ctx, idA, snapshotter, func(a []mount.Mount) (ocispec.Descriptor, error) {
return withMounts(ctx, idB, snapshotter, func(b []mount.Mount) (ocispec.Descriptor, error) {
// DiffService Compare 调用进行比较 A/B 差异,返回 Descriptor
return client.DiffService().Compare(ctx, a, b, opts...)
})
})
if err != nil {
return err
}
}
// 从内容存储库读取 desc 内容
ra, err := client.ContentStore().ReaderAt(ctx, desc)
if err != nil {
return err
}
// 标准输出内容
_, err = io.Copy(os.Stdout, content.NewReader(ra))
return err
},
}
rootfs.Creatediff() 从给定的快照的“父”与自身两者比较创建layer diff(镜像层差异)。
提供内容引用以跟踪内容创建的进度,提供的快照程序和挂载差异用于差异的计算,最后将返回layer diff(镜像层差异)的描述符。
!FILENAME rootfs/diff.go:32
func CreateDiff(ctx context.Context, snapshotID string, sn snapshots.Snapshotter, d diff.Comparer, opts ...diff.Opt) (ocispec.Descriptor, error) {
info, err := sn.Stat(ctx, snapshotID)
if err != nil {
return ocispec.Descriptor{}, err
}
lowerKey := fmt.Sprintf("%s-parent-view", info.Parent) // 指定的“父” key 引用
lower, err := sn.View(ctx, lowerKey, info.Parent) // snapshotter.View
if err != nil {
return ocispec.Descriptor{}, err
}
defer sn.Remove(ctx, lowerKey)
var upper []mount.Mount
if info.Kind == snapshots.KindActive {
upper, err = sn.Mounts(ctx, snapshotID)
if err != nil {
return ocispec.Descriptor{}, err
}
} else {
upperKey := fmt.Sprintf("%s-view", snapshotID) // 指定的 snapshot 引用
upper, err = sn.View(ctx, upperKey, snapshotID)
if err != nil {
return ocispec.Descriptor{}, err
}
defer sn.Remove(ctx, upperKey)
}
return d.Compare(ctx, lower, upper, opts...) // 比较两者差异并返回 layer diff Descriptor
}
~~ 本文 END ~~
网友评论