tensorflowOnSpark.version == '1.1.0'
在TFSparkNode.py run方法里下面这段代码
# for ps nodes only, wait indefinitely in foreground thread for a "control" event (None == "stop")
if job_name == 'ps':
queue = TFSparkNode.mgr.get_queue('control')
done = False
while not done:
msg = queue.get(block=True) # 阻塞executor
logging.info("Got msg: {0}".format(msg))
if msg is None:
logging.info("Terminating PS")
TFSparkNode.mgr.set('state', 'stopped')
done = True
queue.task_done()
通过阻塞ps所在的spark executor,在后续训练、预测过程时讲ps所在的executor中的数据转发到其他空闲的executor中计算,这样每个executor的角色只能是tensorflow work / tensorflow ps server中的一个
网友评论