美文网首页
37-分布式tensorflow

37-分布式tensorflow

作者: jxvl假装 | 来源:发表于2019-10-05 11:18 被阅读0次

分布式tensorflow是由高性能的gRPC框架作为支持的。
这是一个通信框架gRPC(google remote prcedure call),是一个高性能、跨平台的RPC框架。RPC协议,即远程过程调用协议,是指通过网络从远程计算机程序上请求服务。相当于是对底层协议的封装,解决传输错误,同步的问题

分布式原理:

  • 单机多卡:一台服务器上有很多的设备(一般指GPU)
    • 单机单GPU:数据是一个batch一个batch的训练。
    • 单机多GPU:数据一次处理多个batch,每个GPU处理一个batch的数据计算
  • 多机多卡:多台服务器上有多个设备

计算速度提高,设备多

ps:GPU的运算速度比CPU快

怎么进行分布式—分布式的架构

服务器:一个服务其上通常有多个GPU

服务器可分为参数服务器和工作服务器。参数服务器专门负责更新参数,保存参数;工作服务器的主要功能就是进行计算

分布式架构

worker节点(工作服务器)中需要一个主节点来进行会话初始化,创建文件等操作,其他节点等待进行计算。

分布式更新参数的模式

  1. 同步模式更新

  2. 异步模式更新

更新模式

tensorflow中设备命名的规则

参数服务器:可以有多台
/job:ps/task:0
/job:ps/task:1

/job:ps/task:0/cpu:0
/job:ps/task:0/gpu:0

工作服务器:可以有多台
/job:worker/task:0
/job:worker/task:1

api

1、创建一个tf.train.ClusterSpec,用于对集群中的所有任务进行描述,该描述内容对所有任务应该是相同的

2、创建一个tf.train.Server,用于创建一个任务(ps,worker),并运行相应作业上的计算任务。

创建集群

"""1. 创建集群"""
cluster = tf.train.ClusterSpec({"ps": ps_spec, "worker": worker_spec})

#或者
cluster = tf.train.ClusterSpec(
{“worker”: [“worker0.example.com:2222”,    /job:worker/task:0 “worker1.example.com:2222”,    /job:worker/task:1 “worker2.example.com:2222”],    /job:worker/task:2 
"ps": 
[“ps0.example.com:2222”,    /job:ps/task:0 “ps1.example.com:2222”]   /job:ps/task:1
})

创建服务

"""2. 创建服务"""
tf.train.Server(server_or_cluster_def, job_name=None, task_index=None, 
protocol=None, config=None, start=True)
    创建服务(ps,worker)
    server_or_cluster_def: 集群描述
    job_name: 任务类型名称
    task_index: 任务数
    
    attribute:target
    返回tf.Session连接到此服务器的目标
    method:join()
    参数服务器端,直到服务器等待接受参数任务关闭

工作节点指定设备运行

tf.device(device_name_or_function)
    选择指定设备或者设备函数
    if device_name:
    指定设备
    例如:"/job:worker/task:0/cpu:0”

    if function:
    tf.train.replica_device_setter(worker_device=worker_device,
    cluster=cluster)
    作用:通过此函数协调不同设备上的初始化操作
    worker_device:为指定设备, “/job:worker/task:0/cpu:0” or
    "/job:worker/task:0/gpu:0"
    cluster:集群描述对象

注:使用with tf.device(),使不同工作节点工作在不同的设备上

流程

  1. 对集群中的一些ps,worker进行指定
  2. 创建对应的服务。ps创建ps服务(ps服务要使用join等待worker服务);worker创建worker服务
  3. 指定一个默认的worker去运行模型,程序,初始化会话等等

注意:tf.Session()不支持分布式会话。应使用分布式会话函数

MonitoredTrainingSession(master="", is_chief=True, checkpoint_dir=None, hooks=None,config=None)
"""
master:指定运行会话协议IP和端口
is_chief:是否为主worker,作用与master类似,如果为True,就负责初始化和恢复Tensorflow会话。如果为False,它将等待一位负责人初始化或恢复Tensorflow会话
checkpoint:检查点文件目录,同时也是events目录
config:会话运行的配置项,tf.ConfigProto(log_device_placement=True)
hooks:钩子函数,可选SessionRunHook对象列表。用于对模型进行训练
    should_stop:是否异常停止
    run():跟session一样可以运行op
"""
分布式会话函数.png

案例:

"""
python 文件名.py --job_name="ps/worker“ --task_index=0
"""
import tensorflow as tf

FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string("job_name", "", "启动服务的类型:ps or worker")  # 没有默认值
tf.app.flags.DEFINE_integer("task_index", 0, "指定ps或worker当中的哪一台服务器,以task:0, task:1进行标记")  # 默认值为0


def main(argv):
    # 定义一个全局计数的op,给钩子函数中的训练步数计数。定义即可,不需要手动使用
    global_step = tf.contrib.framework.get_or_create_global_step()
    # 指定集群描述对象:ps、worker。这里的集群中有一个ps和一个worker
    cluster = tf.train.ClusterSpec({
        "ps": ["192.168.31.128:2223"],
        "worker": ["192.168.3.135:2222"]  # 端口只要是没有人用过的就可以。
    })
    # 创建不同的服务,pa、worker。eg:job_name="ps", task_index=0
    server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
    # 根据不同的服务做不同的事情:ps:去更新保存参数,worker:指定设备去运行模型计算
    if FLAGS.job_name == "ps":
        # 参数服务器什么都不用干,只需要等待worker传递参数
        server.join()
    else:  # 可以指定设备运行
        with tf.device(tf.train.replica_device_setter(
                worker_device="/job:worker/task:0/cpu:0",
                cluster=cluster
        )):  # 然后在这个设备的上下文环境里面进行运算
            # 简单的做一个矩阵乘法运算
            x = tf.Variable([[1, 2, 3, 4]])
            w = tf.Variable([[2], [3], [4], [5]])
            mat = tf.matmul(x, w)
        # 创建分布式会话
        # 初始化会话
        with tf.train.MonitoredTrainingSession(
                master="grpc://192.168.3.135:2222",  # 因为只有这一个worker,所以就让这个worker去初始化会话
                is_chief=(FLAGS.task_index == 0),  # 判断是否是以主worker启动。如果是0,就是主worker
                config=tf.ConfigProto(log_device_placement=True),  # 打印设备信息
                hooks=[tf.train.StopAtStepHook(last_step=20)]
                # 这里的钩子函数表示运行它运行1k次;注意,只要使用了这个钩子函数的时候,就必须在全局指定一个globalstep,进行全局计数
        ) as mon_sess:
            while not mon_sess.should_stop():  # 只要没有报异常
                print(mon_sess.run(mat))  # 打印结果以便观察


if __name__ == "__main__":
    tf.app.run()  # 会默认去调用main函数

注意:不知道为什么,在本次计算中出现了计算无法停止的问题。。。

相关文章

网友评论

      本文标题:37-分布式tensorflow

      本文链接:https://www.haomeiwen.com/subject/mqfxpctx.html