美文网首页
tensorflow on kubernetes实战 分布式深度

tensorflow on kubernetes实战 分布式深度

作者: 夜尽天明时 | 来源:发表于2018-05-19 13:42 被阅读0次

    写在前面

    • 态度决定高度!让优秀成为一种习惯!
    • 世界上没有什么事儿是加一次班解决不了的,如果有,就加两次!(- - -茂强)

    为什么是tensorflow on kubernetes?

    个人觉得最大的优势是:

    • 租户隔离 保证不同的用户能够互不干扰
    • 资源包括GPU调度 能够有效利用资源
    • 扩展能力 能够很容易横向扩展
    • 灵活 整个资源分配比较灵活 管理灵活
      等等

    kubernetes集群的搭建

    本文采用的是kubeadm安装方式,这个安装方式直接自动化安装etcd等依赖的组件
    首先我们介绍一下什么是kubernetes,我们先来理解一下几个概念(一下内容均来自官方中文文档,版权归其所有)

    • 基本概念


      kuberntes特点
      kubernetes能做什么
      kubernetes组件
      kubernetes总体结构
      master节点
      node节点
      什么是pod
      什么是标签
      什么是注解
      什么是RC
      什么是服务
      什么是目录

      以下是kubernetes常见的命令,可参考官方文档进一步学习

    • 常见命里该


      命令
      命令
      命令
      命令
    • kubernetes架构图


      架构图-版权归原作者所有
    • 集群安装
      首先准备环境包,我里边才考了一个技术博客里边的包,版权归原创所有
      https://pan.baidu.com/s/1FfO2saDPkXH7wO_5XTNqpw
    • 环境准备
      centos7
      三台机器:master,node1,node2
    • 解压资源包

    tar -xjvf k8s_images.tar.bz2

    • 安装docker

    yum install docker-ce-selinux-17.03.2.ce-1.el7.centos.noarch.rpm
    yum install docker-ce-17.03.2.ce-1.el7.centos.x86_64.rpm

    • 检测安装情况

    yum list | grep docker

    • 启动docker

    systemctl start docker && systemctl enable docker

    • 检测docker是否启动成功

    ifconfig
    查看是否有docker网络

    • 修改docker镜像源为阿里云

    sudo tee /etc/docker/daemon.json <<-'EOF'
    {
    "registry-mirrors": ["[https://u9ea3fz9.mirror.aliyuncs.com]
    (https://u9ea3fz9.mirror.aliyuncs.com)"]
    }
    EOF

    • 重新启动docker

    sudo systemctl daemon-reload
    sudo systemctl restart docker

    • 关闭防火墙

    systemctl stop firewalld && systemctl disable firewalld
    setenforce 0

    • 配置路由表参数

    echo "
    net.bridge.bridge-nf-call-ip6tables = 1
    net.bridge.bridge-nf-call-iptables = 1
    " >> /etc/sysctl.conf
    sysctl -p

    • 关闭交换设备

    swapoff -a

    • 导入镜像

    docker load <./docker_images/etcd-amd64_v3.1.10.tar
    docker load <./docker_images/flannel:v0.9.1-amd64.tar
    docker load <./docker_images/k8s-dns-dnsmasq-nannyamd64_v1.14.7.tar
    docker load <./docker_images/k8s-dns-kube-dns-amd64_1.14.7.tar
    docker load <./docker_images/k8s-dns-sidecar-amd64_1.14.7.tar
    docker load <./docker_images/kube-apiserver-amd64_v1.9.0.tar
    docker load <./docker_images/kube-controller-manager-amd64_v1.9.0.tar
    docker load <./docker_images/kube-scheduler-amd64_v1.9.0.tar
    docker load < ./docker_images/kube-proxy-amd64_v1.9.0.tar
    docker load <./docker_images/pause-amd64_3.0.tar
    docker load < ./kubernetes-dashboard_v1.8.1.tar

    • 安装安装kubelet kubeadm kubectl

    rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
    rpm -ivh kubernetes-cni-0.6.0-0.x86_64.rpm kubelet-1.9.9-9.x86_64.rpm kubectl-1.9.0-0.x86_64.rpm
    rpm -ivh kubectl-1.9.0-0.x86_64.rpm
    rpm -ivh kubeadm-1.9.0-0.x86_64.rpm

    • 在master节点启动kubelet

    systemctl enable kubelet && sudo systemctl start kubelet

    • check一下kubelet默认的cgroup的driver和docker的是否不一样,docker默认的cgroupfs,kubelet默认为systemd,两个要保证一致

    vim /etc/systemd/system/kubelet.service.d/10-kubeadm.conf

    修改其中的cgroupfs-driver的值systemd为cgroupfs

    • 重启kubelet

    systemctl daemon-reload && systemctl restart kubelet

    以上操作在所有的节点上都需要操作,等操作完成后再进行下边的内容

    • 初始化master

    kubeadm init --kubernetes-version=v1.9.0 --pod-network-cidr=10.244.0.0/16

    初始化master节点

    注意:这里一定要记下

    kubeadm join --token 26c210.acef208514aaf37f 10.255.164.31:6443 --discovery-token-ca-cert-hash-sha256:87ee8b74e3b937f82b5174fa64bd140071cbf9087f41f5b4bec38c22332e6137

    这个命令,后续如果你想在集群中增加该节点,横向扩展的时候这个是必须的命令(记录到你的加密文件里边)
    同时,注意输出的目录里边还有个提示


    提示

    这个是个授权的处理,你需要运行以下,否则kubecttl命令可能用不了

    mkdir -p $HOME/.kube
    sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
    sudo chown $(id -u):$(id -g) $HOME/.kube/config

    • 创建网络
      找到kube-flannel.yml文件

    kubectl create -f kube-flannel.yml

    然后运行

    kubectl get nodes

    出现节点master状态为ready表示成功

    与此同时配置好环境变量:

    echo "export KUBECONFIG=/etc/kubernetes/admin.conf" >> ~/.bash_profile
    source ~/.bash_profile

    • 加入其他节点
      到此,该操作需要在node节点上进行操作,有多少个node节点都需要操作

    kubeadm join --token 99f58e.60c1ad95c0ac7dcd 10.255.164.31:6443 --discovery-token-ca-cert-hash sha256:7be50b18a3697bad6a0477db525a95e4db011f9f1f89384882b53eb85968eab5

    请用你刚才初始化master的那个让你保存的命令进行,不要利用以上命令
    加入完成以后查看是否已经加入成功

    kubectl get nodes

    查看所有加入的节点

    下面查看以下所有的命名空间


    获取所有的命名空间

    如果出现所有的节点的状态都是ready表明已经成功建立 kubernetes集群了

    • kubernetes-dashborad的搭建
      这里我们选用官方最新的yaml文件

    wget https://raw.githubusercontent.com/kubernetes/dashboard/master/src/deploy/recommended/kubernetes-dashboard.yaml

    找到如下图部分,进行修改为如下


    dashborad

    其中的nodePort是链接到该dashborad的端口,type: NodePort是一种端口暴露,表示暴漏给proxy层,外界可以通过该nodePort访问到该服务
    另外还要对其依赖的镜像版本做个修改,因为,前边加载到docker的kubernetes-dashborad的版本与改文件的版本有可能不一样,所以要改动一下


    dashborad文件镜像修改
    创建dashborad

    kubectl create -f kubernetes-dashboard.yaml

    然后访问https://master_ip:32666
    修改上边的master_ip为你物理机的域名或者ip
    通过浏览器可以访问到

    kubernetes-dashborad

    这里我们采用令牌登陆模式
    下边我们一块获取令牌,执行

    kubectl get secret -n kube-system

    安全访问token列表

    以红框内的controller-token为准,执行

    kubectl describe secret/namespace-controller-token-bw9jn -n kube-system

    记得不要直接执行,修改上边的namespace-controller-token-bw9jn,因为每个集群都不一样
    这时就会拿到该token


    获取token

    复制以上token到浏览器,就可以登陆了。


    dashborad内容展示

    这样,我们的dashborad已经创建好了
    到此,kubernetes集群已经搭建完成,如果是正式环境的话,还请搭建HA机制的集群,这个在中文技术文档中都有,这里不再赘述,可参考中文官方文档进行正式环境HA搭建
    下边就让我们一步步的来构建tensorflow on kubernetes环境吧

    tensorflow on kubernetes架构图

    首先我们来了解以下整个平台的架构图是什么样子


    tensorflow on kubernetes架构图

    我们来解释一下,这个架构图是分布式tensorflow的实战图,其中有两个参数服务,多个worker服务,还有个shuffle和抽样的服务,shuffle就是对样根据其标签进行混排,然后对外提供batch抽样服务(可以是有放回和无放回,抽样是一门科学,详情可以参考抽样技术一书),每个batch的抽样是由每个worker去触发,worker拿到抽样的数据样本ID后就去基于kubernetes构建的分布式数据库里边提取该batchSize的样本数据,进行训练计算,由于分布式的tensorflow能够保证异步梯度下降算法,所以每次训练batch数据的时候都会基于最新的参数迭代,然而,更新参数操作就是两个参数服务做的,架构中模型(参数)的存储在NFS中,这样以来,参数服务与worker就可以共享参数了,最后说明一下,我们训练的所有数据都是存储在分布式数据库中(数据库的选型可以根据具体的场景而定)。为什么需要一个shuffle和抽样的服务,因为当数据量很大的时候,我们如果对所有的样本数据进行shuffle和抽样计算的话会浪费很大的资源,因此需要一个这样的服务专门提取数据的(id,label)来进行混排和抽样,这里如果(id, label)的数据量也很大的时候我们可以考虑基于spark 来分布式的进行shuffle和抽样,目前spark2.3已经原生支持kubernetes调度

    NFS服务搭建

    • 什么是NFS(来自百度百科)
      NFS(Network File System)即网络文件系统,是FreeBSD支持的文件系统中的一种,它允许网络中的计算机之间通过TCP/IP网络共享资源。在NFS的应用中,本地NFS的客户端应用可以透明地读写位于远端NFS服务器上的文件,就像访问本地文件一样。
    • NFS服务搭建

    yum install nfs-utils rpcbind -y
    mkdir -p data/nfs
    vim /etc/exports
    加入如下内容
    /data/nfs 192.168.86.0/24(rw,no_root_squash,no_all_squash,sync)
    启动
    /bin/systemctl start rpcbind.service
    /bin/systemctl start nfs.service

    到此,NFS服务已经搭建好了

    tensorflow docker镜像打包

    首先我们准备DockerFile

    FROM ubuntu:16.04
    MAINTAINER yahengsong yahengsong@foxmail.com
    RUN apt-get update \
    && apt-get install -y wget \
    && apt-get install -y lrzsz \
    && apt-get install -y unzip \
    && apt-get install -y zip \
    && apt-get install -y vim \
    && apt-get install -y gcc \
    && apt-get install -y g++ \
    && apt-get install -y automake \
    && apt-get install -y autoconf \
    && apt-get install -y libtool \
    && apt-get install -y make \
    && apt-get install -y openssl \
    && apt-get install -y libssl-dev \
    && apt-get install -y ruby \
    && apt-get install -y zlib1g \
    && apt-get install zlib1g.dev \
    && apt-get install -y bzip2 \
    && apt-get install -y libncurses5-dev \
    && apt-get install -y sqlite sqlite3 \
    && apt-get install -y libgdbm-dev \
    && apt-get install -y libpcap-dev \
    && apt-get install -y xz-utils
    RUN wget https://www.python.org/ftp/python/3.6.0/Python-3.6.0.tar.xz \
    && tar -xvf Python-3.6.0.tar.xz \
    && cd Python-3.6.0 \
    && mkdir -p /usr/local/python3 \
    && ./configure --prefix=/usr/local/python3 \
    && make \
    && make install \
    && rm -rf Python-3.6.0* \
    && ln -s /usr/local/python3/bin/python3 /usr/bin/python3 \
    && ln -s /usr/local/python3/bin/pip3 /usr/bin/pip
    RUN pip install --upgrade pip \
    && pip --no-cache-dir install >https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-1.7.0-cp36-cp36m-linux_x86_64.whl
    # TensorBoard
    EXPOSE 6006
    # IPython
    EXPOSE 8888
    WORKDIR /root

    我们可以通过该DockerFile在阿里云上进行镜像打包
    这样我们就有了自己的环境了(注意,该版本没有安装jupyter)


    阿里云镜像

    详情可以参考如何在阿里云上打包自己的镜像

    tenssorflow on kubernetes实战

    以下将是如何在kubernetes集群上部署tensorflow环境
    首先部署单机CPU版本的
    下面我们来看线tensorflow.yaml文件

     apiVersion: extensions/v1beta1
     kind: Deployment
     metadata: 
       name: tensorflow
     spec:
       replicas: 1
       template:
         metadata:
           labels:
             k8s-app: tensorflow
         spec:
           containers:
           - name: tensorflow
             image: registry.cn- 
      hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3
             ports:
             - containerPort: 8888
             resources:
               limits:
                 cpu: 4
                 memory: 2Gi
               requests:
                 cpu: 2
                 memory: 1Gi
    ---
    apiVersion: v1
    kind: Service
    metadata:
        name: jupyter-service
    spec:
      type: NodePort
      ports:
      - port: 80
        targetPort: 8888
        nodePort: 32001
        name: tensorflow
      selector:
        k8s-app: tensorflow
    

    这里我们依赖的是阿里云的tensorflow的docker镜像registry.cn-hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3(该版本里边有jupyter),我们向kubernetes集群申请2个CPU和1G内存,kubernetes集群给该Deployment最大的CPU限制是4核和2G内存,需要注意的是该环境暴露到外网的端口是32001
    有了改文件,接下来创建环境

    kubectl create -f tensorflow.yaml

    这时候就可以通过,查看该环境了

    kubectl get pods

    到此我们可以通过http://master_ip:32001/来访问该环境的jupyter了

    jupyter

    这时候我们可以通过如下获取token


    get token

    然后通过token就可以登陆了


    jupyter

    然后你就可以愉快的编程了

    分布式tensorflow on kubernetes

    如之前所介绍的分布式深度学习架构
    我们首先创建一个参数服务tf-ps.yaml

    apiVersion: extensions/v1beta1
    kind: Deployment
    metadata: 
      name: tensorflow-ps
    spec:
      replicas: 1
      template:
        metadata:
          labels:
            name: tensorflow-ps
            role: ps
        spec:
          containers:
          - name: ps
            image: registry.cn-hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3
            ports:
            - containerPort: 2222
            resources:
              limits:
                cpu: 4
                memory: 2Gi
              requests:
                cpu: 2
                memory: 1Gi
            volumeMounts:
            - mountPath: /datanfs
              readOnly: false
              name: nfs
          volumes:
          - name: nfs
            nfs:
              server: 你的nfs服务地址
              path: "/data/nfs"   
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: tensorflow-ps-service
      labels:
        name: tensorflow-ps
        role: service
    spec:
      ports:
      - port: 2222
        targetPort: 2222
      selector:
        name: tensorflow-ps
    

    执行

    kubectl create -f tf-ps.yaml

    然后两个参数节点就会被创建
    下面我们创建2个worker节点tf-worker.yaml

    apiVersion: extensions/v1beta1
    kind: Deployment
    metadata:
      name: tensorflow-worker
    spec:
      replicas: 3
      template:
        metadata:
          labels:
            name: tensorflow-worker
            role: worker
        spec:
          containers:
          - name: worker
            image: registry.cn-hangzhou.aliyuncs.com/denverdino/tensorflow:1.6.0-py3
            ports:
            - containerPort: 2222
            resources:
              limits:
                cpu: 4
                memory: 2Gi
              requests:
                cpu: 2
                memory: 1Gi
            volumeMounts:
            - mountPath: /datanfs
              readOnly: false
              name: nfs
          volumes:
          - name: nfs
            nfs:
              server: 你的nfs服务地址
              path: "/data/nfs"   
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: tensorflow-wk-service
      labels:
        name: tensorflow-worker
    spec:
      ports:
      - port: 2222
        targetPort: 2222
      selector:
        name: tensorflow-worker
    

    执行

    kubectl create -f tf-worker.yaml

    这时候2个worker节点就会被创建


    节点
    • 训练
      查看每个pod的ip用于构建集群训练代码


      参数服务
    worker

    然后进去每个节点环境

    kubectl exec -ti tensorflow-ps-77b8d7bc89-87qgp bash
    kubectl exec -ti tensorflow-worker-b7cc4dd66-94ntr bash
    kubectl exec -ti tensorflow-worker-b7cc4dd66-mzqhb bash

    创建以下代码(在此之前请先准备好mnist数据集的csv格式并放到nfs服务的data/nfs目录下)

    from __future__ import print_function
    
    import math
    
    import tensorflow as tf
    
    import collections
    
    import sys,os, time
    
    import numpy as np
    
    # TensorFlow集群描述信息,ps_hosts表示参数服务节点信息,worker_hosts表示worker节点信息
    tf.app.flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs")
    tf.app.flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs")
    
    # TensorFlow Server模型描述信息,包括作业名称,任务编号,隐含层神经元数量,MNIST数据目录以及每次训练数据大小(默认一个批次为100个图片)
    tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
    tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
    tf.app.flags.DEFINE_integer("hidden_units", 100, "Number of units in the hidden layer of the NN")
    tf.app.flags.DEFINE_string("data_dir", "/datanfs", "Directory for storing mnist data")
    tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size")
    FLAGS = tf.app.flags.FLAGS
    #图片像素大小为28*28像素
    IMAGE_PIXELS = 28
    
    class DataSet(object):
        def __init__(self,
                     images,
                     labels,
                     reshape=True):
            """Construct a DataSet.
            one_hot arg is used only if fake_data is true.  `dtype` can be either
            `uint8` to leave the input as `[0, 255]`, or `float32` to rescale into
            `[0, 1]`.
            """
    
            self._num_examples = images.shape[0]
    
            # Convert shape from [num examples, rows, columns, depth]
            # to [num examples, rows*columns] (assuming depth == 1)
            images = images.astype(np.float32)
            images = np.multiply(images, 1.0 / 255.0)
            self._images = images
            self._labels = labels
            self._epochs_completed = 0
            self._index_in_epoch = 0
    
        @property
        def images(self):
            return self._images
    
        @property
        def labels(self):
            return self._labels
    
        @property
        def num_examples(self):
            return self._num_examples
    
        @property
        def epochs_completed(self):
            return self._epochs_completed
    
        def next_batch(self, batch_size, fake_data=False, shuffle=True):
            """Return the next `batch_size` examples from this data set."""
            start = self._index_in_epoch
            # Shuffle for the first epoch
            if self._epochs_completed == 0 and start == 0 and shuffle:
                perm0 = np.arange(self._num_examples)
                np.random.shuffle(perm0)
                self._images = self.images[perm0]
                self._labels = self.labels[perm0]
            # Go to the next epoch
            if start + batch_size > self._num_examples:
                # Finished epoch
                self._epochs_completed += 1
                # Get the rest examples in this epoch
                rest_num_examples = self._num_examples - start
                images_rest_part = self._images[start:self._num_examples]
                labels_rest_part = self._labels[start:self._num_examples]
                # Shuffle the data
                if shuffle:
                    perm = np.arange(self._num_examples)
                    np.random.shuffle(perm)
                    self._images = self.images[perm]
                    self._labels = self.labels[perm]
                    # Start next epoch
                start = 0
                self._index_in_epoch = batch_size - rest_num_examples
                end = self._index_in_epoch
                images_new_part = self._images[start:end]
                labels_new_part = self._labels[start:end]
                return np.concatenate((images_rest_part, images_new_part), axis=0) , \
                       np.concatenate((labels_rest_part, labels_new_part), axis=0)
            else:
                self._index_in_epoch += batch_size
                end = self._index_in_epoch
                return self._images[start:end], self._labels[start:end]
    def dense_to_one_hot(labels_dense, num_classes):
        """Convert class labels from scalars to one-hot vectors."""
        num_labels = labels_dense.shape[0]
        index_offset = np.arange(num_labels) * num_classes
        labels_one_hot = np.zeros((num_labels, num_classes))
        labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
        return labels_one_hot
    
    
    def read_data_sets(train_dir,
                       reshape=True,
                       validation_size=2000):
        trainfile = os.path.join(train_dir, "mnist_train.csv")
        testfile = os.path.join(train_dir, "mnist_test.csv")
        train_images = np.array([], dtype=np.uint8)
        train_labels = np.array([], dtype=np.uint8)
        test_images = np.array([], dtype=np.uint8)
        test_labels = np.array([], dtype=np.uint8)
    
        count = 0
        with open(trainfile) as f:
            for line in f.readlines():
                count+= 1
                line = line.strip()
                line = line.split(",")
                line = [int(x) for x in line]
                one_rray = np.array(line[1:], dtype=np.uint8)
                train_images = np.hstack((train_images, one_rray))
                train_labels = np.hstack((train_labels, np.array(line[0], dtype=np.uint8)))
                if count % 10000 == 0:
                    print(str(count))
                if count == 20000:
                    break
        train_images = train_images.reshape(20000, 28*28)
        train_labels = train_labels.reshape(20000, 1)
        train_labels = dense_to_one_hot(train_labels, 10)
    
        count = 0
        with open(testfile) as f:
            for line in f.readlines():
                count += 1
                line = line.strip()
                line = line.split(",")
                line = [int(x) for x in line]
                one_rray = np.array(line[1:], dtype=np.uint8)
                test_images = np.hstack((test_images, one_rray))
                test_labels = np.hstack((test_labels, np.array(line[0], dtype=np.uint8)))
                if count % 10000 == 0:
                    print(str(count))
        test_images = test_images.reshape(10000, 28*28)
        test_labels = test_labels.reshape(10000, 1)
        test_labels = dense_to_one_hot(test_labels, 10)
    
        if not 0 <= validation_size <= len(train_images):
            raise ValueError(
                'Validation size should be between 0 and {}. Received: {}.'
                    .format(len(train_images), validation_size))
    
        validation_images = train_images[:validation_size]
        validation_labels = train_labels[:validation_size]
        train_images = train_images[validation_size:]
        train_labels = train_labels[validation_size:]
    
        train = DataSet(train_images, train_labels, reshape=reshape)
        validation = DataSet(validation_images, validation_labels, reshape=reshape)
        test = DataSet(test_images, test_labels, reshape=reshape)
    
    
        Datasets = collections.namedtuple('Datasets', ['train', 'validation', 'test'])
        return Datasets(train=train, validation=validation, test=test)
    
    def main(_):
        #从命令行参数中读取TensorFlow集群描述信息
        ps_hosts = FLAGS.ps_hosts.split(",")
        worker_hosts = FLAGS.worker_hosts.split(",")
        # 创建TensorFlow集群描述对象
        cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
        # 为本地执行Task,创建TensorFlow本地Server对象.
        server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
        #如果是参数服务,直接启动即可
        if FLAGS.job_name == "ps":
            server.join()
        elif FLAGS.job_name == "worker":
            #分配操作到指定的worker上执行,默认为该节点上的cpu0
            with tf.device(tf.train.replica_device_setter(
                    worker_device="/job:worker/task:%d" % FLAGS.task_index,
                    ps_device="/job:ps/cpu:0",
                    cluster=cluster)):
                # 定义TensorFlow隐含层参数变量,为全连接神经网络隐含层
                hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units], stddev=1.0 / IMAGE_PIXELS), name="hid_w")
                hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
                # 定义TensorFlow softmax回归层的参数变量
                sm_w = tf.Variable(tf.truncated_normal([FLAGS.hidden_units, 10], stddev=1.0 / math.sqrt(FLAGS.hidden_units)), name="sm_w")
                sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
                #定义模型输入数据变量(x为图片像素数据,y_为手写数字分类)
                x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
                y_ = tf.placeholder(tf.float32, [None, 10])
                #定义隐含层及神经元计算模型
                hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
                hid = tf.nn.relu(hid_lin)
                #定义softmax回归模型,及损失方程
                y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
                loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
                #定义全局步长,默认值为0
                global_step = tf.Variable(0, name="global_step", trainable=False)
                #定义训练模型,采用Adagrad梯度下降法
                train_op = tf.train.AdagradOptimizer(0.01).minimize(loss, global_step=global_step)
                #定义模型精确度验证模型,统计模型精确度
                correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
                accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
                #对模型定期做checkpoint,通常用于模型回复
                saver = tf.train.Saver()
                #定义收集模型统计信息的操作
                summary_op = tf.summary.merge_all()
                #定义操作初始化所有模型变量
                init_op = tf.initialize_all_variables()
                #创建一个监管程序,用于构建模型检查点以及计算模型统计信息。
                is_chief = (FLAGS.task_index == 0)
                if is_chief:
                    print("Worker %d: Initializing session..." % FLAGS.task_index)
                else:
                    print("Worker %d: Waiting for session to be initialized..." % FLAGS.task_index)
    
                sv = tf.train.Supervisor(
                    is_chief= is_chief,
                    logdir="/tmp/train_logs",
                    init_op=init_op,
                    summary_op=summary_op,
                    saver=saver,
                    global_step=global_step,
                    save_model_secs=600)
    
                sess_config = tf.ConfigProto(
                    allow_soft_placement=True,
                    log_device_placement=False,
                    device_filters=["/job:ps",
                                    "/job:worker/task:%d" % FLAGS.task_index])
    
                #读入MNIST训练数据集
                mnist = read_data_sets(FLAGS.data_dir)
                #创建TensorFlow session对象,用于执行TensorFlow图计算
                with sv.managed_session(server.target, config=sess_config) as sess:
                    print("Worker %d: Session initialization complete." % FLAGS.task_index)
                    # Perform training
                    time_begin = time.time()
                    print("Training begins @ %f" % time_begin)
                    local_step = 0
                    step = 0
                    while not sv.should_stop() and step < 10000:
                        # 读入MNIST的训练数据,默认每批次为100个图片
                        batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
                        train_feed = {x: batch_xs, y_: batch_ys}
                        #执行分布式TensorFlow模型训练
                        _, step = sess.run([train_op, global_step], feed_dict=train_feed)
                        local_step = local_step + 1
                        now = time.time()
                        print("%f: Worker %d: training step %d done (global step: %d)" %
                            (now, FLAGS.task_index, local_step, step))
                        #每隔100步长,验证模型精度
                        if step % 100 == 0:
                            print("acc: %g" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
                            print("cross entropy = %g" % sess.run(loss, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
                # 停止TensorFlow Session
                time_end = time.time()
                print("Training ends @ %f" % time_end)
                training_time = time_end - time_begin
                print("Training elapsed time: %f s" % training_time)
                print("acc: %g" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
                print("cross entropy = %g" % sess.run(loss, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
                sv.stop()
    if __name__ == "__main__":
        tf.app.run()
    

    然后在参数服务上执行

    python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="ps" --task_index=0

    则会有,其实是启动了GRPC服务


    参数服务

    在第一个worker节点上执行

    python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="worker" --task_index=0

    在第二个worker节点上执行

    python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="worker" --task_index=1

    这时候等数据加载完成就会有如下训练信息


    第一个工作节点
    第二个工作节点

    两个工作节点的迭代次数合起来就是我们设置的总的迭代次数
    自后的模型都会存在nfs服务中,因为只有这样参数节点和工作节点才能共享模型参数

    • GPU
      GPU的方案整体和以上差不多,只是在原yaml文件中增加GPU支持
      模版:
    apiVersion: v1
    kind: ReplicationController
    metadata:
      name: tensorflow-worker
    spec:
      replicas: 1
      selector:
        name: tensorflow-worker
      template:
        metadata:
          labels:
            name: tensorflow-worker
            role: worker
        spec:  
          containers:
          - name: worker
            image: gcr.io/tensorflow/tensorflow:latest-gpu
            ports:
            - containerPort: 2222
            env:
            - name: PS_KEY
              valueFrom:
                configMapKeyRef:
                  name: tensorflow-cluster-config
                  key: ps
            - name: WORKER_KEY
              valueFrom:
                configMapKeyRef:
                  name: tensorflow-cluster-config
                  key: worker
            securityContext:
              privileged: true
            resources:
              requests:
                alpha.kubernetes.io/nvidia-gpu: 1
              limits:
                alpha.kubernetes.io/nvidia-gpu: 1
            volumeMounts:
            - mountPath: /dev/nvidia0
              name: nvidia0
            - mountPath: /dev/nvidiactl
              name: nvidiactl
            - mountPath: /dev/nvidia-uvm
              name: nvidia-uvm
            - mountPath: /datanfs
              name: tfstorage
            - name: libcuda-so
              mountPath: /usr/lib/x86_64-linux-gnu
            - name: cuda
              mountPath: /usr/local/cuda-8.0
          volumes:
          - name: nfs
            persistentVolumeClaim:
              claimName: nfs-pvc
          - hostPath:
              path: /dev/nvidia0
            name: nvidia0
          - hostPath:
              path: /dev/nvidiactl
            name: nvidiactl
          - hostPath:
              path: /dev/nvidia-uvm
            name: nvidia-uvm
          - name: libcuda-so
            hostPath:
              path: /usr/lib/x86_64-linux-gnu
          - name: cuda
            hostPath:
              path: /usr/local/cuda-8.0
    

    到此我们的实战已经初步结束,当然不排除其中有很多细节,有很多坑要踩,这些细节和坑在这里都不一一再说了,因为太多了,没发写。
    如果你们在实战过程中遇到什么问题,欢迎随时跟我沟通,我们共同成长,共同学习。

    QQ:458798698
    微信号:songwindwind

    或者直接在简书上联系我。
    有兴趣的可以关注本人github
    https://github.com/songyaheng

    相关文章

      网友评论

          本文标题:tensorflow on kubernetes实战 分布式深度

          本文链接:https://www.haomeiwen.com/subject/ylpidftx.html