TensorFlow 提供一个包含多个 API 层的编程堆栈
estimator api.png
这个高层API,正是可以提高我们的GPU使用效果。因为如果使用传统的session.run()操作后即使是GPU显存塞满的情况,其利用率依然是30%以下,甚至是10%不到的。
所以Estimator + tf.data 的API进行配合使用,可以将GPU的使用效率直逼100%!因为使用了流水线,使得预处理和模型执行过程重叠到一起。如下图所示:
流水线.png
要做到以上并行处理,可以使用prefetch函数,
1. 将InputExamples to a TFRecord file
def file_based_convert_examples_to_features(examples,
label_list,
max_seq_length,
tokenizer,
output_file):
"""Convert a set of `InputExample`s to a TFRecord file."""
writer = tf.python_io.TFRecordWriter(output_file) ##output_file="train.tf_record"
for (ex_index, example) in enumerate(examples):
if ex_index % 10000 == 0:
tf.logging.info("Writing example %d of %d" % (ex_index, len(examples)))
feature = convert_single_example(ex_index,
example,
label_list,
max_seq_length,
tokenizer)
def create_int_feature(values):
f = tf.train.Feature(int64_list=tf.train.Int64List(value=list(values)))
return f
features = collections.OrderedDict()
features["input_ids"] = create_int_feature(feature.input_ids)
features["input_mask"] = create_int_feature(feature.input_mask)
features["segment_ids"] = create_int_feature(feature.segment_ids)
features["label_ids"] = create_int_feature([feature.label_id])
features["is_real_example"] = create_int_feature([int(feature.is_real_example)])
tf_example = tf.train.Example(features=tf.train.Features(feature=features))
writer.write(tf_example.SerializeToString())
writer.close()
2. 编写模型的输入函数train_input_fn
WX20190215-145003.png
正如以上图中所示,dataset有3个子类,可以直接进行使用。而且这里的模型输入函数完成的是ETL整个系统,包括提取数据 (Extract)、转换数据 (Transform)、装载数据(Load) 。
所以 train_input_fn实现可以如下:
def file_based_input_fn_builder(input_file,
seq_length,
is_training,
drop_remainder):
tf.logging.info("*** file_based_input_fn_builder ***")
name_to_features = {
"input_ids": tf.FixedLenFeature([seq_length], tf.int64),
"input_mask": tf.FixedLenFeature([seq_length], tf.int64),
"segment_ids": tf.FixedLenFeature([seq_length], tf.int64),
"label_ids": tf.FixedLenFeature([], tf.int64),
"is_real_example": tf.FixedLenFeature([], tf.int64),
}
def _decode_record(record, name_to_features):
"""Decodes a record to a TensorFlow example."""
example = tf.parse_single_example(record, name_to_features)
# tf.Example only supports tf.int64, but the TPU only supports tf.int32.
for name in list(example.keys()):
t = example[name]
if t.dtype == tf.int64:
t = tf.to_int32(t)
example[name] = t
return example
def input_fn(params):
tf.logging.info("*** input_fn ***")
batch_size = params["batch_size"]
# For training, we want a lot of parallel reading and shuffling.
# For eval, we want no shuffling and parallel reading doesn't matter.
d = tf.data.TFRecordDataset(input_file, num_parallel_reads=4) ##input_file可以是list形式的
if is_training:
d = d.repeat() ## 默认epoch是无数
d = d.shuffle(buffer_size=100) ## 最好先shuffle,再map
# TODO: d.apply(tf.contrib.data.shuffle_and_repeat(buffer_size=100, count=-1))
d = d.apply(tf.contrib.data.map_and_batch(lambda record: _decode_record(record, name_to_features),
batch_size=batch_size,
drop_remainder=drop_remainder)) ## map是对记录每条都解析,解析方式按照_decode_record
d = d.prefetch(buffer_size=tf.contrib.data.AUTOTUNE) ## 需要加吗?
## TODO: d = d.apply(tf.contrib.data.prefetch_to_device("/gpu:0"))
return d
return input_fn
3. 编写模型函数model_fn
def model_fn_builder(bert_config, num_labels, init_checkpoint, learning_rate,
num_train_steps, num_warmup_steps, use_tpu,
use_one_hot_embeddings):
"""Returns `model_fn` closure for TPUEstimator."""
def model_fn(features, labels, mode, params): # pylint: disable=unused-argument
"""The `model_fn` for TPUEstimator."""
tf.logging.info("*** Features ***")
for name in sorted(features.keys()):
tf.logging.info(" name = %s, shape = %s" % (name, features[name].shape))
input_ids = features["input_ids"]
input_mask = features["input_mask"]
segment_ids = features["segment_ids"]
label_ids = features["label_ids"]
is_real_example = None
if "is_real_example" in features:
is_real_example = tf.cast(features["is_real_example"], dtype=tf.float32)
else:
is_real_example = tf.ones(tf.shape(label_ids), dtype=tf.float32)
is_training = (mode == tf.estimator.ModeKeys.TRAIN) ## mode由estimator.train来体现赋值
"""
以下是model的搭建
"""
(total_loss, per_example_loss, logits, probabilities) = create_model(bert_config,
is_training,
input_ids,
input_mask,
segment_ids,
label_ids,
num_labels,
use_one_hot_embeddings)
tvars = tf.trainable_variables()
initialized_variable_names = {}
scaffold_fn = None
if init_checkpoint:
(assignment_map, initialized_variable_names) = \
modeling.get_assignment_map_from_checkpoint(tvars, init_checkpoint)
if use_tpu:
def tpu_scaffold():
tf.train.init_from_checkpoint(init_checkpoint, assignment_map)
return tf.train.Scaffold()
scaffold_fn = tpu_scaffold
else:
tf.train.init_from_checkpoint(init_checkpoint,
assignment_map) ##assignment_map中key表示ckpt的变量名字,value表示当前图中变量的名字
tf.logging.info("**** Trainable Variables ****")
for var in tvars:
init_string = ""
if var.name in initialized_variable_names:
init_string = ", *INIT_FROM_CKPT*"
tf.logging.info(" name = %s, shape = %s%s", var.name, var.shape, init_string)
tf.logging.info("****Mode Starting ****")
output_spec = None
if mode == tf.estimator.ModeKeys.TRAIN:
train_op = optimization.create_optimizer(total_loss,
learning_rate,
num_train_steps,
num_warmup_steps,
use_tpu)
output_spec = tf.contrib.tpu.TPUEstimatorSpec(mode=mode,
loss=total_loss,
train_op=train_op,
scaffold_fn=scaffold_fn)
elif mode == tf.estimator.ModeKeys.EVAL:
def metric_fn(per_example_loss, label_ids, logits, is_real_example):
predictions = tf.argmax(logits, axis=-1, output_type=tf.int32)
accuracy = tf.metrics.accuracy(labels=label_ids,
predictions=predictions,
weights=is_real_example)
loss = tf.metrics.mean(values=per_example_loss, weights=is_real_example)
return {"eval_accuracy": accuracy,
"eval_loss": loss,
}
eval_metrics = (
metric_fn, [per_example_loss, label_ids, logits, is_real_example]) ##a tuple of metric_fn and tensors
output_spec = tf.contrib.tpu.TPUEstimatorSpec(mode=mode,
loss=total_loss,
eval_metrics=eval_metrics,
scaffold_fn=scaffold_fn)
else:
## predictions是想要得到的字典结果值
output_spec = tf.contrib.tpu.TPUEstimatorSpec(mode=mode,
predictions={"probabilities": probabilities},
scaffold_fn=scaffold_fn)
return output_spec
return model_fn
4. 整体使用TPU的训练流程:
tpu_cluster_resolver = None
if FLAGS.use_tpu and FLAGS.tpu_name:
tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(FLAGS.tpu_name,
zone=FLAGS.tpu_zone,
project=FLAGS.gcp_project)
is_per_host = tf.contrib.tpu.InputPipelineConfig.PER_HOST_V2
run_config = tf.contrib.tpu.RunConfig(
cluster=tpu_cluster_resolver,
master=FLAGS.master,
model_dir=FLAGS.output_dir,
save_checkpoints_steps=FLAGS.save_checkpoints_steps,
tpu_config=tf.contrib.tpu.TPUConfig(iterations_per_loop=FLAGS.iterations_per_loop,
num_shards=FLAGS.num_tpu_cores,
per_host_input_for_training=is_per_host)) ## 设置tpu RunConfig
model_fn = model_fn_builder(
bert_config=bert_config,
num_labels=len(label_list),
init_checkpoint=FLAGS.init_checkpoint,
learning_rate=FLAGS.learning_rate,
num_train_steps=num_train_steps,
num_warmup_steps=num_warmup_steps,
use_tpu=FLAGS.use_tpu,
use_one_hot_embeddings=FLAGS.use_tpu)
estimator = tf.contrib.tpu.TPUEstimator(
use_tpu=FLAGS.use_tpu,
model_fn=model_fn,
config=run_config,
train_batch_size=FLAGS.train_batch_size,
eval_batch_size=FLAGS.eval_batch_size,
predict_batch_size=FLAGS.predict_batch_size)
if FLAGS.do_train:
train_file = os.path.join(FLAGS.output_dir, "train.tf_record")
file_based_convert_examples_to_features(train_examples,
label_list,
FLAGS.max_seq_length,
tokenizer,
train_file) ## 转成tf_record文件
train_input_fn = file_based_input_fn_builder(input_file=train_file,
seq_length=FLAGS.max_seq_length,
is_training=True,
drop_remainder=True) ## 读取tf_records文件
estimator.train(input_fn=train_input_fn, max_steps=num_train_steps)
5. 调试以上高层 API 的 trick
因为已经将tf.estimator替换了以往的session.run() 的形式,所以在具体调试时候就不可能再用session.run来查看tensor的值,只能通过tf.Print函数来串接入整个graph中,从而可以打印出其中tensor的value值,而不是基本的维度等信息。其使用例子如下所示:
Graph.png
node1 = tf.add(input1, input2)
print_node1 = tf.Print(node1, [node1]) ## P 需要大写哦!
output = tf.multiply(print_node1, input3)
参考文献:
Google 开发者大会 2018
数据输入流水线性能
BERT-google
https://zhuanlan.zhihu.com/p/33906227
网友评论