【推荐算法工程师技术栈系列】分布式&数据库--tensorflow

时间:2021-09-19 17:46:42

目录

TensorFlow 高阶API

Dataset(tf.data)

源码定义:tensorflow/python/data/ops/dataset_ops.py
用户向导:Dataset Input Pipeline

Dataset可以用来表示输入管道元素集合(张量的嵌套结构)和“逻辑计划“对这些元素的转换操作。在Dataset中元素可以是向量,元组或字典等形式。
另外,Dataset需要配合另外一个类Iterator进行使用,Iterator对象是一个迭代器,可以对Dataset中的元素进行迭代提取。

api 说明
from_generator(generator,output_types,output_shapes=None) Creates a Dataset whose elements are generated by generator.
from_tensor_slices(tensors) 创建dataset,其元素是给定张量的切片的元素
from_tensors(tensors) 创建一个Dataset包含给定张量的单个元素。
interleave(map_func,cycle_length,block_length=1) 对数据集进行map转换并合并插入结果
list_files(file_pattern,shuffle=None) A dataset of all files matching a pattern.
range(*args) Creates a Dataset of a step-separated range of values.
skip(count) Creates a Dataset that skips count elements from this dataset.
take(count) Creates a Dataset with at most count elements from this dataset.
zip(datasets) Creates a Dataset by zipping together the given datasets.
prefetch(buffer_size) 创建dataset,在请求输入数据集之前从输入数据集中预提取元素
apply(transformation_func) Apply a transformation function to this dataset.
cache(filename=‘‘) Caches the elements in this dataset.
concatenate(dataset) Creates a Dataset by concatenating given dataset with this dataset.
filter(predicate) Filters this dataset according to predicate.
map(map_func,num_parallel_calls=None) Maps map_func across this dataset.
flat_map(map_func) Maps map_func across this dataset and flattens the result.
batch(batch_size) 将数据集的连续元素合成批次
padded_batch(batch_size,padded_shapes,padding_values=None) 将数据集的连续元素组合到填充批次中,此转换将输入数据集的多个连续元素组合为单个元素。
repeat(count=None) Repeats this dataset count times.
shard(num_shards,index) 将Dataset分割成num_shards个子数据集。这个函数在分布式训练中非常有用,它允许每个设备读取唯一子集。
shuffle(buffer_size,seed=None,reshuffle_each_iteration=None) 随机混洗数据集的元素。
make_initializable_iterator(shared_name=None) Creates an Iterator for enumerating the elements of this dataset.
make_one_shot_iterator() 创建Iterator用于枚举此数据集的元素。(可自动初始化)

Iterator
源码定义:tensorflow/python/data/ops/iterator_ops.py
用户向导:Dataset Input Pipeline > Iterating over datasets

api 说明
get_next(name=None) In graph mode, you should typically call this method once and use its result as the input to another computation.

Estimator(tf.estimator)

源码定义:tensorflow/python/estimator/estimator.py
用户向导:Regression Examples

Estimator对象包装(wraps)由model_fn指定的模型,model_fn由给定输入和其他一些参数,返回需要进行训练、计算,或预测的操作.
所有输出(检查点,事件文件等)都被写入model_dir或其子目录.如果model_dir未设置,则使用临时目录.
可以通过RunConfig对象(包含了有关执行环境的信息)传递config参数.它被传递给model_fn,如果model_fn有一个名为“config”的参数(和输入函数以相同的方式).Estimator使得模型可配置化,并且还使用其一些字段来控制内部,特别是关于检查点的控制.
该params参数包含hyperparameter,如果model_fn有一个名为“PARAMS”的参数,并且以相同的方式传递给输入函数,则将它传递给model_fn. Estimator只是沿着参数传递,并不检查它.因此,params的结构完全取决于开发人员.
不能在子类中重写任何Estimator方法(其构造函数强制执行此操作).子类应使用model_fn来配置基类,并且可以添加实现专门功能的方法.

api 说明
init(model_fn, model_dir=None, config=None, params=None) Estimator的构造方法,返回EstimatorSpec实例
evaluate(input_fn,steps=None,hooks=None,checkpoint_path=None,name=None) 返回一个包含按name为键的model_fn中指定的计算指标的词典。
export_savedmodel(export_dir_base, serving_input_receiver_fn) 将推理图作为SavedModel导出到给定的目录中.该方法通过首先调用serving_input_receiver_fn来获取特征Tensors来构建一个新图,然后调用这个Estimator的model_fn来基于这些特征生成模型图.它在新的会话中将给定的检查点恢复到该图中.最后它会在给定的export_dir_base下面创建一个时间戳导出目录,并在其中写入一个SavedModel,其中包含从此会话保存的单个MetaGraphDef.
get_variable_names() 返回此模型中所有变量名称的列表.
get_variable_value(name) 返回由名称给出的变量的值. 返回值类型:Numpy数组
latest_checkpoint() 查找model_dir中最新保存的检查点文件的文件名.
predict(self,input_fn,predict_keys=None) 对给定的features产生预测,返回predictions字典张量的计算值.
train(input_fn,hooks=None,steps=None,max_steps=None,saving_listeners=None) 训练给定训练数据input_fn的模型.

FeatureColumns(tf.feature_column)

源码定义:tensorflow/python/feature_column/feature_column.py
用户向导:feature_columns
feature_columns2

Feature cloumns是原始数据和Estimator模型之间的桥梁,它们被用来把各种形式的原始数据转换为模型能够使用的格式

api 返回值 用法
tf.feature_column.bucketized_column(source_column,boundaries) Represents discretized dense input. Bucketized column用来把numeric column的值按照提供的边界(boundaries)离散化为多个值
tf.feature_column.categorical_column_with_hash_bucket(key,hash_bucket_size,dtype=tf.string) Represents sparse feature where ids are set by hashing. 用户指定类别的总数,通过hash的方式来得到最终的类别ID
tf.feature_column.categorical_column_with_identity(key,num_buckets,default_value=None) A _CategoricalColumn that returns identity values. 与Bucketized column类似,Categorical identity column用单个唯一值表示bucket。
tf.feature_column.categorical_column_with_vocabulary_file(key,vocabulary_file) A _CategoricalColumn with a vocabulary file. 类别特征做one-hot编码,字典通过文件给出
tf.feature_column.categorical_column_with_vocabulary_list(key,vocabulary_list) A _CategoricalColumn with in-memory vocabulary. 类别特征做one-hot编码,字典通过list给出
tf.feature_column.crossed_column(keys,hash_bucket_size,hash_key=None) Returns a column for performing crosses of categorical features. 对keys的笛卡尔积执行hash操作,再把hash的结果对hash_bucket_size取模得到最终的结果
tf.feature_column.embedding_column(categorical_column,dimension,combiner=‘mean‘) _DenseColumn that converts from sparse, categorical input. 把原始特征映射为一个低维稠密的实数向量
tf.feature_column.indicator_column(categorical_column) Represents multi-hot representation of given categorical column. 把categorical column得到的稀疏tensor转换为one-hot或者multi-hot形式的稠密tensor
tf.feature_column.input_layer(features,feature_columns) Returns a dense Tensor as input layer based on given feature_columns.
tf.feature_column.linear_model(features,feature_columns) Returns a linear prediction Tensor based on given feature_columns.
tf.feature_column.make_parse_example_spec(feature_columns) Creates parsing spec dictionary from input feature_columns.
tf.feature_column.numeric_column(key,shape=(1,),default_value=None,dtype=tf.float32,normalizer_fn=None) Represents real valued or numerical features.
tf.feature_column.shared_embedding_columns(categorical_columns,dimension,combiner=‘mean‘) List of dense columns that convert from sparse, categorical input. 把多个特征共享相同的embeding映射空间
tf.feature_column.weighted_categorical_column(categorical_column,weight_feature_key,dtype=tf.float32) 给一个类别特征赋予一定的权重,给一个类别特征赋予一定的权重

tf.nn

激活函数op,loss部分抽象

源码定义:tensorflow/python/ops/gen_nn_ops.py
用户向导:Neural Network

api 说明
tf.nn.relu(...) Computes rectified linear: max(features, 0).
tf.nn.sigmoid(...) Computes sigmoid of x element-wise.
tf.nn.bias_add(...)
tf.nn.tanh(...) Computes hyperbolic tangent of x element-wise.
tf.nn.dropout()
tf.nn.softmax()
tf.nn.sigmoid_cross_entropy_with_logits(...) Computes sigmoid cross entropy given logits.
tf.nn.top_k(...) Finds values and indices of the k largest entries for the last dimension.
tf.nn.embedding_lookup(params,ids,partition_strategy=‘mod‘) 按照ids查找params里面的vector然后输出
tf.nn.embedding_lookup_sparse(params,sp_ids,sp_weights) Looks up ids in a list of embedding tensors,Computes embeddings for the given ids and weights.
tf.nn.zero_fraction(value,name=None) Returns the fraction of zeros in value.
tf.nn.nce_loss() 负采样的NCE loss实现

tf.layers

网络模块抽象

api 说明
average_pooling1d(...) Average Pooling layer for 1D inputs.
average_pooling2d(...) Average pooling layer for 2D inputs (e.g. images).
average_pooling3d(...) Average pooling layer for 3D inputs (e.g. volumes).
batch_normalization(...) Functional interface for the batch normalization layer.
conv1d(...) Functional interface for 1D convolution layer (e.g. temporal convolution).
conv2d(...) Functional interface for the 2D convolution layer.
conv2d_transpose(...) Functional interface for transposed 2D convolution layer.
conv3d(...) Functional interface for the 3D convolution layer.
conv3d_transpose(...) Functional interface for transposed 3D convolution layer.
dense(...) Functional interface for the densely-connected layer.
dropout(...) Applies Dropout to the input.
flatten(...) Flattens an input tensor while preserving the batch axis (axis 0).
max_pooling1d(...) Max Pooling layer for 1D inputs.
max_pooling2d(...) Max pooling layer for 2D inputs (e.g. images).
max_pooling3d(...) Max pooling layer for 3D inputs (e.g. volumes).
separable_conv1d(...) Functional interface for the depthwise separable 1D convolution layer.
separable_conv2d(...) Functional interface for the depthwise separable 2D convolution layer.

tf.train

tf.train provides a set of classes and functions that help train models.

用户向导:Training

api 说明
tf.train.Optimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.GradientDescentOptimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.AdadeltaOptimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.AdagradOptimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.AdamOptimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.SyncReplicasOptimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.SessionRunHook Training Hooks,Hook to extend calls to MonitoredSession.run().
tf.train.SessionRunArgs Training Hooks,Represents arguments to be added to a Session.run() call.
tf.train.global_step sess每执行完一个batch,就给global_step加1,用来统计目前执行的batch数
tf.train.basic_train_loop
tf.train.get_global_step() 返回global_step作为name的tensor
tf.train.get_or_create_global_step() Returns and create (if necessary) the global step tensor.
tf.train.write_graph
tf.train.string_input_producer 输出字符串到一个输入管道队列
tf.train.Saver(max_to_keep=5) 保存模型
tf.train.latest_checkpoint(checkpoint_dir,latest_filename=None) Finds the filename of latest saved checkpoint file.
tf.train.Int64List Int64List
tf.train.FloatList FloatList
tf.train.Features Features
tf.train.Example Example
tf.train.batch(tensors,batch_size) Creates batches of tensors in tensors.
tf.trainable_variables()
tf.train.Scaffold Scaffold

tf.linalg

TensorFlow 的线性代数库。
w3cschool TensorFlow官方文档-linalg

api 说明
adjoint(...) 转置最后两个维度和共轭张量matrix.
band_part(...) 复制张量设置每个最内层矩阵中心带外的所有内容
cholesky(...) 计算一个或多个方阵的Cholesky分解.
cholesky_solve(...) A X = RHS给出的Cholesky因子分解,求解线性方程组.
det(...) 计算一个或多个方阵的行列式.
diag(...) 返回具有给定批处理对角线值的批处理对角线张量.
diag_part(...) 返回批处理张量的批处理对角线部分.
eigh(...) 计算了一批自共轭矩阵的特征分解.
eigvalsh(...) 计算一个或多个自共轭矩阵的特征值.
einsum(...) 任意维度的张量之间的广义收缩.
expm(...) 计算一个或多个方阵的矩阵指数.
eye(...) 构造一个单位矩阵或批矩阵.
inv(...) 计算一个或多个平方可逆矩阵或它们的倒数
logdet(...) 计算hermitian正定矩阵的行列式的对数.
logm(...) 计算一个或多个方阵的矩阵对数:
lstsq(...) 解决一个或多个线性最小二乘问题.
norm(...) 计算向量,矩阵和张量的范数.(不赞成的参数)
qr(...) 计算一个或多个矩阵的QR分解.
set_diag(...) 返回具有新批处理对角线值的批处理矩阵张量.
slogdet(...) 计算行列式的绝对值的符号和日志
solve(...) 求解线性方程组.
svd(...) 计算一个或多个矩阵的奇异值分解.
tensordot(...) a和b沿指定轴的张量收缩.
trace(...) 计算张量x的轨迹.
transpose(...) 转置张量a的最后两个维度.
triangular_solve(...) 求解具有上三角矩阵或下三角矩阵的线性方程组.

checkpoint(模型保存与恢复)

Estimator 将一个检查点保存到 model_dir 中后,每次调用 Estimator 的 train、eval 或 predict 方法时,都会发生下列情况:

a) Estimator 通过运行 model_fn() 构建模型图。(要详细了解 model_fn(),请参阅创建自定义 Estimator。)

b) Estimator 根据最近写入的检查点中存储的数据来初始化新模型的权重。
换言之,一旦存在检查点,TensorFlow 就会在您每次调用 train()、evaluate() 或 predict() 时重建模型。

Tensorflow Serving

官方例子

half_plus_two的例子

# Download the TensorFlow Serving Docker image and repo
docker pull tensorflow/serving

git clone https://github.com/tensorflow/serving
# Location of demo models
TESTDATA="$(pwd)/serving/tensorflow_serving/servables/tensorflow/testdata"

# Start TensorFlow Serving container and open the REST API port
# docker run -t 表示是否允许伪TTY
# docker run --rm表示如果实例已经存在,则先remove掉该实例再重新启动新实例
# docker -p设置端口映射
# docker -v设置磁盘映射
# docker -e设置环境变量
docker run -t --rm -p 8501:8501     -v "$TESTDATA/saved_model_half_plus_two_cpu:/models/half_plus_two"     -e MODEL_NAME=half_plus_two     tensorflow/serving &

# Query the model using the predict API
curl -d '{"instances": [1.0, 2.0, 5.0]}'     -X POST http://localhost:8501/v1/models/half_plus_two:predict

# Returns => { "predictions": [2.5, 3.0, 4.5] }

serving镜像提供了两种调用方式:gRPC和HTTP请求。gRPC默认端口是8500,HTTP请求的默认端口是8501,serving镜像中的程序会自动加载镜像内/models下的模型,通过MODEL_NAME指定/models下的哪个模型。

创建自定义镜像

docker run --rm -p 8501:8501     --mount type=bind,source=${model_dir},target=/models/dpp_model     -e MODEL_NAME=dpp_model -t tensorflow/serving &

官方文档

架构

TensorFlow Serving可抽象为一些组件构成,每个组件实现了不同的API任务,其中最重要的是Servable, Loader, Source, 和 Manager,我们看一下组件之间是如何交互的。

Source

TF Serving发现磁盘上的模型文件,该模型服务的生命周期就开始了。Source组件负责发现模型文件,找出需要加载的新模型。实际上,该组件监控文件系统,当发现一个新的模型版本,就为该模型创建一个Loader。

Loader

Loader需要知道模型的相关信息,包括如何加载模型如何估算模型需要的资源,包括需要请求的RAM、GPU内存。Loader带一个指针,连接到磁盘上存储的模型,其中包含加载模型需要的相关元数据。不过记住,Loader现在还不允许加载模型。

Manager

Manager收到待加载模型版本,开始模型服务流程。此处有两种可能性,第一种情况是模型首次推送部署,Manager先确保模型需要的资源可用,一旦获取相应的资源,Manager赋予Loader权限去加载模型。
第二种情况是为已上线模型部署一个新版本。Manager会先查询Version Policy插件,决定加载新模型的流程如何进行。
具体来说,当加载新模型时,可选择保持 (1) 可用性(the Availability Preserving Policy)或 (2) 资源(the Resource Preserving Policy)。

Servable

最后,当用户请求模型的句柄,Manager返回句柄给Servable。

Servables 是 TensorFlow Serving 中最核心的抽象,是客户端用于执行计算 (例如:查找或推断) 的底层对象。

部署服务

模型导出

将TensorFlow构建的模型用作服务,首先需要确保导出为正确的格式,可以采用TensorFlow提供的SavedModel类。TensorFlow Saver提供模型checkpoint磁盘文件的保存/恢复。事实上SavedModel封装了TensorFlow Saver,对于模型服务是一种标准的导出方法。
SignatureDefs定义了一组TensorFlow支持的计算签名,便于在计算图中找到适合的输入输出张量。简单的说,使用这些计算签名,可以准确指定特定的输入输出节点。目前有3个服务API: 分类、预测和回归。每个签名定义关联一个RPC API。分类SignatureDef用于分类RPC API,预测SignatureDef用于RPC API等等。对于分类SignatureDef,需要一个输入张量(接收数据)以及可能的输出张量: 类别或得分。回归SignatureDef需要一个输入张量以及另一个输出张量。最后预测SignatureDef需要一个可变长度的输入输出张量。

提供tensorflow模型

tf.saved_model API

with sess.graph.as_default() as graph:
    builder = tf.saved_model.builder.SavedModelBuilder(saved_model_dir)
    # 1.8版本,1.12之后调整为tf.saved_model.predict_signature_def
    signature = tf.saved_model.signature_def_utils.predict_signature_def(inputs={'image': in_image},
                                      outputs={'prediction': graph.get_tensor_by_name('final_result:0')},)
    builder.add_meta_graph_and_variables(sess=sess,
                                         tags=["serve"],
                                         signature_def_map={'predict':signature,
                           tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:signature
    })
    builder.save()

tf.estimator API

def export_savedmodel(self,
                      export_dir,
                      serving_input_receiver_fn=None,
                      as_text=False,
                      checkpoint_path=None):
    if serving_input_receiver_fn is None and self.feature_cfgs is None:
        raise ValueError('Both serving_input_receiver_fn and feature_cfgs are none.')

    if serving_input_receiver_fn is None:
        feature_spec = self.feature_cfgs.to_feature_spec(exclude_targets=True)
        serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
    # 自己封装的estimator
    savedmodel_path = self.estimator.export_savedmodel(
        export_dir_base=export_dir,
        serving_input_receiver_fn=serving_input_receiver_fn,
        as_text=as_text,
        checkpoint_path=checkpoint_path
    )
    return savedmodel_path

其中self.feature_cfgs.to_feature_spec函数得到如下结果:

{'follow_seq': VarLenFeature(dtype=tf.int64), 'uid': VarLenFeature(dtype=tf.int64)}

说明了输入】

TensorFlow Serving ModelServer 用于发现新导出的模型,并启动 gRPC 用于提供模型服务。

API请求(predict)

TensorFlow ModelServer通过host:port接受下面这种RESTful API请求:
POST http://host:port/ :
URI: /v1/models/ ({MODEL_NAME}[/versions/){MODEL_VERSION}]
VERB: classify|regress|predict

其中“/versions/${MODEL_VERSION}”是可选的,如果省略,则使用最新的版本。

该API基本遵循gRPC版本的PredictionService API。

请求URL的示例:

http://host:port/v1/models/iris:classify
http://host:port/v1/models/mnist/versions/314:predict

请求格式

预测API的请求体必须是如下格式的JSON对象:

{
  // (Optional) Serving signature to use.
  // If unspecifed default serving signature is used.
  "signature_name": <string>,

  // Input Tensors in row ("instances") or columnar ("inputs") format.
  // A request can have either of them but NOT both.
  "instances": <value>|<(nested)list>|<list-of-objects>
  "inputs": <value>|<(nested)list>|<object>
}

以行的形式说明输入的tensor

{
 "instances": [
   {
     "tag": "foo",
     "signal": [1, 2, 3, 4, 5],
     "sensor": [[1, 2], [3, 4]]
   },
   {
     "tag": "bar",
     "signal": [3, 4, 1, 2, 5]],
     "sensor": [[4, 5], [6, 8]]
   }
 ]
}

以列的形式说明输入的tensor

{
 "inputs": {
   "tag": ["foo", "bar"],
   "signal": [[1, 2, 3, 4, 5], [3, 4, 1, 2, 5]],
   "sensor": [[[1, 2], [3, 4]], [[4, 5], [6, 8]]]
 }
}

返回格式

行形式

{
  "predictions": <value>|<(nested)list>|<list-of-objects>
}

如果模型的输出只包含一个命名的tensor,我们省略名字和predictions key map,直接使用标量或者值的list。如果模型输出多个命名的tensor,我们输出对象list,和上面提到的行形式输入类似。

列形式

{
  "outputs": <value>|<(nested)list>|<object>
}

如果模型的输出只包含一个命名的tensor,我们省略名字和outputs key map,直接使用标量或者值的list。如果模型输出多个命名的tensor,我们输出对象,其每个key都和输出的tensor名对应,和上面提到的列形式输入类似。

API demo

$ curl -d '{"instances": [1.0,2.0,5.0]}' -X POST http://localhost:8501/v1/models/half_plus_three:predict
{
    "predictions": [3.5, 4.0, 5.5]
}

[译]TensorFlow Serving RESTful API

源码理解

tensorflow-serving源码理解

开发中遇到的其他问题记录

实现numpy中的切片赋值

# 初始化一个3*4的tensor和大小为3的list
cis = tf.zeros([3, 4)
cis_list = [tf.zeros([4]) for _ in range(3)]
# 替换list的一个元素
cis_list[k] = eis
# 重新堆叠成一个tensor,列切片赋值axis=1?
cis = tf.stack(cis_list)

附录