美文网首页
kolla代码阅读

kolla代码阅读

作者: 桃之夭夭的简书 | 来源:发表于2020-05-07 18:19 被阅读0次

kolla项目地址:https://github.com/openstack/kolla

kolla项目做什么的

主要是用来制作openstack的docker镜像,因为支持了很多openstack组件,所以镜像很多,也很大(每个镜像几百M),要是每次都去从远程获取是件很耗时间的事,所以kolla项目诞生,意在你可以在本地build出你需要的镜像,还可以进行对镜像定制化;

kolla源码

1. 主要流程

kolla的主要实现代码在kolla/image/build.py中,网上有很多解析,抄了别人的函数流程,加了些自己的理解注释

cmd/build.py main()  ## 入口
  -> image/build.py build.run_build()
    -> common/config.py common_config.parse(...) ## 命令行,配置文件参数的定义配置注册
    -> kolla=KollaWorker()  ## build的任务对象,把命令行,配置文件的参数拿出来给对象。
      ->self.dc = docker.APIClient() ## 创建docker api client 用于调用docker 接口
    -> kolla.setup_working_dir() ## 新建个放dockerfile和构建的工作目录,把docker目录的文件拷贝过来
    -> kolla.find_dockerfiles() ## 检查下工作目录下的dokerfile.j2文件,放到self.docker_build_paths里了格式是(root, dirs, names )
    -> kolla.create_dockerfiles() ## 做镜像文件,里边的变量时kollaWorker类的__init__通过conf拿过来的
      -> jinja2.Environment() ## jinja2的使用流程 见jinja2 介绍
      -> jinja2.FileSystemLoader() 
      -> template.render() ## 所有的*j2模板重新渲染,后面写入新建dockerfie文件
    ->queue=kolla.build_queue() ## 创建多线程放到队列里来执行build操作
      -> self.build_image_list() ## 根据配置生成所有image列表,已经依赖关系
      -> self.find_parents() ## 整理出image的父image
      -> self.filter_images() ## 根据配置的regex和profile生成需要编译的iamge列表
      -> queue.put(BuildTask(self.conf, image, push_queue))
  ->for x in six.moves.range(conf.threads):   #在线程数内开启n个线程
      ->worker=WorkThread(conf, queue)
      ->worker.start()
         -> WorkThread().run()       
           -> task.run()
              -> self.builder()
                 -> make_an_archive ## 需要下载plugin和additions
                -> self.dc.build(*args)  ## 来自docker task, 调用 docker api 创建image,这里的build方法来自docker API client()的build方法
           -> 如果push:PushIntoQueueTask(参数,self.image,参数)
           -> queue.put(task.followups)  ##将镜像的子镜像放入队列
    # push_queue,如果需要push的话才会执行
    ->for x in six.moves.range(conf.push_threads)
       ->WorkerThread(conf, push_queue)
       ->worker.start()
           -> WorkThread().run()       
             -> task.run()
                 ->PushTask.run()
                     -> self.push_image(image)
                     -> self.dc.push  ## 调用 docker api push image到仓库
    ->kolla.summary() #返回结果汇总和处理

主要不是很明白queue是怎么处理带有依赖层级的镜像的,上边流程图queue以后比较详细,方便我理解;

2.从入口开始解析

run_build()

def run_build():
    ...
    kolla = KollaWorker(conf)       #注册kolla实例
    kolla.setup_working_dir()       #创建工作目录
    kolla.find_dockerfiles()        #查找dockerfile
    kolla.create_dockerfiles()      #创建dockerfile
    #template_only参数是只生成dockerfile,不做其他的build镜像操作
    if conf.template_only:
        LOG.info('Dockerfiles are generated in %s', kolla.working_dir)
        return
     ......
    ###重点解析下面这段代码的实现
    push_queue = six.moves.queue.Queue()
    queue = kolla.build_queue(push_queue)
    workers = []
    with join_many(workers):
        try:
            for x in six.moves.range(conf.threads):
                worker = WorkerThread(conf, queue)
                worker.setDaemon(True)
                worker.start()
                workers.append(worker)

            for x in six.moves.range(conf.push_threads):
                worker = WorkerThread(conf, push_queue)
                worker.setDaemon(True)
                worker.start()
                workers.append(worker)

            # sleep until queue is empty
            while queue.unfinished_tasks or push_queue.unfinished_tasks:
                time.sleep(3)

            # ensure all threads exited happily
            push_queue.put(WorkerThread.tombstone)
            queue.put(WorkerThread.tombstone)
        except KeyboardInterrupt:
            for w in workers:
                w.should_stop = True
            push_queue.put(WorkerThread.tombstone)
            queue.put(WorkerThread.tombstone)
            raise

join_many()解析

首先join_many() 代码如下,带上了上下文装饰器,yield字段以上是代表enter()方法,以下是exit()方法,函数主要做的就是exit()方法,主要处理线程人为使用ctrl+c退出,当ctrl+c两次,强制退出;

@contextlib.contextmanager    
def join_many(threads):
    try:
        yield
        for t in threads:
            t.join()
    except KeyboardInterrupt:
        try:
            LOG.info('Waiting for daemon threads exit. Push Ctrl + c again to'
                     ' force exit')
            for t in threads:
                if t.is_alive():
                    LOG.debug('Waiting thread %s to exit', t.name)
                    # NOTE(Jeffrey4l): Python Bug: When join without timeout,
                    # KeyboardInterrupt is never sent.
                    t.join(0xffff)
                LOG.debug('Thread %s exits', t.name)
        except KeyboardInterrupt:
            LOG.warning('Force exits')

build_queue()

queue 队列的定义
six包主要是用来兼容py2和py3的,six.moves里的函数是py3里对模块做了重新规划,改变了位置和名字,跟py2对不上了,所以就有了moves,
有了moves以后,这样我们的代码可以py2下也可以在py3下顺利的执行,不然写起来很丑陋
举例:
six.moves中的html_parser,对应了py2中的HTMLParser,py3中的html.parser
在这我们用的queue:
在six.moves.queue 对应py2里的Queue, py3里的queue
(在github的kolla master分支上,已经不再支持py2, 所以也没用six)
在这里我们初始化了一个push_queue, 然后又创建了一个queue,为啥

    push_queue = six.moves.queue.Queue()
    queue = kolla.build_queue(push_queue)

build_queue()函数返回一个queue,主要做的几件事:

  1. 列出要build的镜像列表,
  2. 判断镜像依赖关系对self.images重新整理,
  3. 使用配置文件带的正则或者profiles里的内容过滤镜像列表;
  4. 将那些没有父镜像的元素加入到队列
  5. queue中put的对象的内容是:BuildTask(self.conf, image, push_queue);
    def build_queue(self, push_queue):
        """Organizes Queue list.

        Return a list of Queues that have been organized into a hierarchy
        based on dependencies
        """
        self.build_image_list()
        self.find_parents()
        self.filter_images()

        queue = six.moves.queue.Queue()

        for image in self.images:
            if image.status in (STATUS_UNMATCHED, STATUS_SKIPPED):
                # Don't bother queuing up build tasks for things that
                # were not matched in the first place... (not worth the
                # effort to run them, if they won't be used anyway).
                continue
            if image.parent is None:
                queue.put(BuildTask(self.conf, image, push_queue))
                LOG.info('Added image %s to queue', image.name)

        return queue

下面开始从队列里取这些放进去的BuildTask放到线程里去build
先看这里:
定义conf.threads个线程,从queue里取出BuildTask任务放到后台

    with join_many(workers):
        try:
            for x in six.moves.range(conf.threads):  #conf.threads是开启的线程数量
                worker = WorkerThread(conf, queue) 
                worker.setDaemon(True)  #设置守护线程,主线程挂了,会把守护线程杀了
                worker.start()  #线程开始,一个线程只能调一次,执行内容在run方法里
                workers.append(worker)
               

WorkerThread(conf, queue).run()

我们去看下WorkerThread(conf, queue)里的run()方法

  1. 总之就是在self.conf.retries次数内开始执行task.run(), 这里的run()是BuildTask.run()
    2.执行完后将task.followups的任务追加到队列中(这里是我主要不理解要往下挖的地方)
    def run(self):
        while not self.should_stop:
            task = self.queue.get()
            if task is self.tombstone:
                # Ensure any other threads also get the tombstone.
                self.queue.put(task)
                break
            try:
                for attempt in six.moves.range(self.conf.retries + 1):
                    if self.should_stop:
                        break
                    LOG.info("Attempt number: %s to run task: %s ",
                             attempt + 1, task.name)
                    try:
                        task.run()
                        if task.success:
                            break
                    except Exception:
                        LOG.exception('Unhandled error when running %s',
                                      task.name)
                    # try again...
                    task.reset()
                if task.success and not self.should_stop:
                    for next_task in task.followups:
                        LOG.info('Added next task %s to queue',
                                 next_task.name)
                        self.queue.put(next_task)
            finally:
                self.queue.task_done()

BuildTask.run() -->self.builder(self.image)

直接看builder方法里有什么

  1. make_an_archives()
  2. build镜像之前的操作
  3. 使用self.dc.build(**)开始build镜像
    def builder(self, image):

        def make_an_archive(items, arcname, item_child_path=None):
            ....主要实现archives和plugins...

        ...一系列的条件处理...

        buildargs = self.update_buildargs()
        try:
            for stream in self.dc.build(path=image.path,
                                        tag=image.canonical_name,
                                        nocache=not self.conf.cache,
                                        rm=True,
                                        decode=True,
                                        network_mode=self.conf.network_mode,
                                        pull=pull,
                                        forcerm=self.forcerm,
                                        buildargs=buildargs):
                if 'stream' in stream:
                    for line in stream['stream'].split('\n'):
                        if line:
                            self.logger.info('%s', line)
                if 'errorDetail' in stream:
                    image.status = STATUS_ERROR
                    self.logger.error('Error\'d with the following message')
                    for line in stream['errorDetail']['message'].split('\n'):
                        if line:
                            self.logger.error('%s', line)
                    return

            if image.status != STATUS_ERROR and self.conf.squash:
                self.squash()
        except docker.errors.DockerException:
            ......
        else:
            image.status = STATUS_BUILT
            self.logger.info('Built')

看看WorkerThread里写往queue里添加BuildTask()的那段代码

1.顺便把task.run()的部分贴了出来

  1. 把task.followups加到队列中,这里的followups是BuildTask的一个属性
class WorkerThread(threading.Thread):
    def run(self):
                .....
               for attempt in six.moves.range(self.conf.retries + 1):
                    try:
                        task.run()
                        if task.success:
                            break
                    except Exception:
                       ......

                if task.success and not self.should_stop:
                    for next_task in task.followups:
                        LOG.info('Added next task %s to queue',
                                 next_task.name)
                        self.queue.put(next_task)

BuildTask().followups()

1.当配置文件里需要push镜像,self.conf.push为True,并且镜像build成功
放BuildTask(self.image)放进followups里
2.如果镜像有子镜像(self.image.children),并且镜像build成功,把镜像放进followups[]里
tips:
我看了下这里的followups.extend([BuildTask(..),])和 followups.append(BuildTask(..))没啥区别,??

class BuildTask(DockerTask):
     ......
    def run(self):
        self.builder(self.image)
        if self.image.status in (STATUS_BUILT, STATUS_SKIPPED):
            self.success = True

    @property
    def followups(self):
        followups = []
        if self.conf.push and self.success:
            followups.extend([
                # If we are supposed to push the image into a docker
                # repository, then make sure we do that...
                PushIntoQueueTask(
                    PushTask(self.conf, self.image),
                    self.push_queue),
            ])
        if self.image.children and self.success:
            for image in self.image.children:
                if image.status == STATUS_UNMATCHED:
                    continue
                followups.append(BuildTask(self.conf, image, self.push_queue))
        return followups

1.能不能有个pdb出的结果,验证下;

镜像的依赖关系:

{
  "base": [
    {
      "ceph-base": [
        "ceph-mds",
        "ceph-mgr",
        "ceph-mon",
        "ceph-nfs",
        "ceph-osd",
        "ceph-rgw",
        "cephfs-fuse"
      ]
    },
    "certmonger",
    "chrony",
    "crane",
    "cron",
    "dnsmasq",
    "elasticsearch",
    "etcd",
    "fluentd",
    "grafana",
    "haproxy",
    "helm-repository",
    "influxdb",
    "iscsid",
    "kafka",
    "keepalived",
    "kibana",
    "kolla-toolbox",
        {
          "nova-base": [
            "nova-api",
            "nova-compute-ironic",
            "nova-compute",
            "nova-conductor",
            "nova-consoleauth",
            "nova-novncproxy",
            "nova-placement-api",
            "nova-scheduler",
            "nova-serialproxy",
            "nova-spicehtml5proxy",
            "nova-ssh"
          ]
        },
        {
          "novajoin-base": [
            "novajoin-notifier",
            "novajoin-server"
          ]
        },
        {
          "octavia-base": [
            "octavia-api",
            "octavia-health-manager",
            "octavia-housekeeping",
            "octavia-worker"
          ]
        },
....省略...
}

相关文章

网友评论

      本文标题:kolla代码阅读

      本文链接:https://www.haomeiwen.com/subject/amzawhtx.html