tensorflow如何做到分布式计算

时间:2022-02-12 03:31:43

I. 从简单开始,首先是单机单卡:

服务器是虚拟机,只有CPU,下面程序分别计算加法、乘法

#coding=utf-8
#单机单卡
#对于单机单卡,可以把参数和计算都定义再gpu上,不过如果参数模型比较大,显存不足等情况,就得放在cpu上
import  tensorflow as tf

with tf.device('/cpu:0'):#也可以放在gpu上
        w=tf.get_variable('w',(2,2),tf.float32,initializer=tf.constant_initializer(2))
        b=tf.get_variable('b',(2,2),tf.float32,initializer=tf.constant_initializer(5))

with tf.device('/cpu:0'):
        addwb=w+b
        mutwb=w*b

ini=tf.global_variables_initializer()
with tf.Session() as sess:
        sess.run(ini)
        np1,np2=sess.run([addwb,mutwb])
        print(np1)
        print(np2)


II.单机多卡

单机多卡,只要用device直接指定设备,就可以进行训练,SGD采用各个卡的平均值。

检查服务器CPU个数:

cat /proc/cpuinfo| grep "physical id"| sort| uniq| wc -l

实例程序改写如下,通过ConfigProto定义最多使用的CPU数目,利用两个物理CPU分别进行加法和乘法计算:

#coding=utf-8
#单机多卡:
#一般采用共享操作定义在cpu上,然后并行操作定义在各自的gpu上,比如对于深度学习来说,我们一把把参数定义、参数梯度更新统一放在cpu上
#各个gpu通过各自计算各自batch 数据的梯度值,然后统一传到cpu上,由cpu计算求取平均值,cpu更新参数。
#具体的深度学习多卡训练代码,请参考:https://github.com/tensorflow/models/blob/master/inception/inception/inception_train.py
import  tensorflow as tf
with tf.device('/cpu:0'):
w=tf.get_variable('w',(2,2),tf.float32,initializer=tf.constant_initializer(2))
b=tf.get_variable('b',(2,2),tf.float32,initializer=tf.constant_initializer(5))

with tf.device('/cpu:0'):
addwb=w+b
with tf.device('/cpu:1'):
mutwb=w*b

ini=tf.global_variables_initializer()
config = tf.ConfigProto(device_count={"CPU": 2}, # limit to num_cpu_core CPU usage
                #inter_op_parallelism_threads = 1, 
                #intra_op_parallelism_threads = 1,
                log_device_placement=True)
with tf.Session(config = config) as sess:
sess.run(ini)
while 1:
print(sess.run([addwb,mutwb]))

在上面代码中我们通过 “device_count={"CPU": 4}” 参数来构建一个ConfigProto() 类,传入tf.Session()来使每个会话分配相应的资源,这里我们给tensorflow程序共分配了4个CPU core。

在进行tf.ConfigProto()初始化时,我们也可以通过设置intra_op_parallelism_threads参数和inter_op_parallelism_threads参数,来控制每个操作符op并行计算的线程个数。二者的区别在于:
intra_op_parallelism_threads 控制运算符op内部的并行
当运算符op为单一运算符,并且内部可以实现并行时,如矩阵乘法,reduce_sum之类的操作,可以通过设置intra_op_parallelism_threads 参数来并行, intra代表内部。
inter_op_parallelism_threads 控制多个运算符op之间的并行计算
当有多个运算符op,并且他们之间比较独立,运算符和运算符之间没有直接的路径Path相连。Tensorflow会尝试并行地计算他们,使用由inter_op_parallelism_threads参数来控制数量的一个线程池。

III.多机多卡

task:每台机器上的一个进程;

job:由多个task组成,分为ps、worker,分别用于参数服务和计算服务;

cluster:由job组成。

SGD:

同步SGD,各个用于并行计算的电脑,计算完各自的batch 后,求取梯度值,把梯度值统一送到ps服务机器中,由ps服务机器求取梯度平均值,更新ps服务器上的参数。

异步SGD,ps服务器收到只要收到一台机器的梯度值,就直接进行参数更新,无需等待其它机器。这种迭代方法比较不稳定,收敛曲线震动比较厉害。

改写上述程序,在10.11.2.31上启动PS,在10.11.2.35上启动worker,PS和worker程序分别如下,启动脚本为:

首先为PS

python3 mm.py  --task_index 0 --job_name ps

然后为Worker:

python3 mm.py  --task_index 0 --job_name worker

PS:

#coding=utf-8
#上面是因为worker计算内容各不相同,不过再深度学习中,一般每个worker的计算内容是一样的,
# 以为都是计算神经网络的每个batch 前向传导,所以一般代码是重用的
#coding=utf-8
#多台机器,每台机器有一个显卡、或者多个显卡,这种训练叫做分布式训练
import  tensorflow as tf
#现在假设我们有A、B、C、D四台机器,首先需要在各台机器上写一份代码,并跑起来,各机器上的代码内容大部分相同
# ,除了开始定义的时候,需要各自指定该台机器的task之外。以机器A为例子,A机器上的代码如下:
cluster=tf.train.ClusterSpec({
    "worker": [
        "10.11.8.35:1234",#格式 IP地址:端口号,第一台机器A的IP地址 ,在代码中需要用这台机器计算的时候,就要定义:/job:worker/task:0
    ],
    "ps": [
        "10.11.8.31:2223"#第四台机器的IP地址 对应到代码块:/job:ps/task:0
    ]})

#不同的机器,下面这一行代码各不相同,server可以根据job_name、task_index两个参数,查找到集群cluster中对应的机器
isps=True
if isps:
server=tf.train.Server(cluster,job_name='ps',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
server.join()
else:
server=tf.train.Server(cluster,job_name='worker',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:0',cluster=cluster)):
w=tf.get_variable('w',(2,2),tf.float32,initializer=tf.constant_initializer(2))
b=tf.get_variable('b',(2,2),tf.float32,initializer=tf.constant_initializer(5))
addwb=w+b
mutwb=w*b
divwb=w/b

saver = tf.train.Saver()
summary_op = tf.summary.merge_all() 
init_op = tf.initialize_all_variables()
sv = tf.train.Supervisor(init_op=init_op, summary_op=summary_op, saver=saver)
with sv.managed_session(server.target) as sess:
while 1:
print(sess.run([addwb,mutwb,divwb]))


Worker:

#coding=utf-8
#上面是因为worker计算内容各不相同,不过再深度学习中,一般每个worker的计算内容是一样的,
# 以为都是计算神经网络的每个batch 前向传导,所以一般代码是重用的
import  tensorflow as tf
#现在假设我们有A、B台机器,首先需要在各台机器上写一份代码,并跑起来,各机器上的代码内容大部分相同
# ,除了开始定义的时候,需要各自指定该台机器的task之外。以机器A为例子,A机器上的代码如下:
cluster=tf.train.ClusterSpec({
    "worker": [
        "10.11.8.35:1234",#格式 IP地址:端口号,第一台机器A的IP地址 ,在代码中需要用这台机器计算的时候,就要定义:/job:worker/task:0
    ],
    "ps": [
        "10.11.8.31:2223"#第四台机器的IP地址 对应到代码块:/job:ps/task:0
    ]})
#不同的机器,下面这一行代码各不相同,server可以根据job_name、task_index两个参数,查找到集群cluster中对应的机器
isps=False
if isps:
server=tf.train.Server(cluster,job_name='ps',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
server.join()
else:
server=tf.train.Server(cluster,job_name='worker',task_index=0)#找到‘worker’名字下的,task0,也就是机器A
with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:0',cluster=cluster)):
w=tf.get_variable('w',(2,2),tf.float32,initializer=tf.constant_initializer(2))
b=tf.get_variable('b',(2,2),tf.float32,initializer=tf.constant_initializer(5))
addwb=w+b
mutwb=w*b
divwb=w/b
saver = tf.train.Saver()
summary_op = tf.summary.merge_all()
init_op = tf.initialize_all_variables()
sv = tf.train.Supervisor(init_op=init_op, summary_op=summary_op, saver=saver)
with sv.managed_session(server.target) as sess:
while 1:
print(sess.run([addwb,mutwb,divwb]))


参照:

http://blog.csdn.net/hjimce/article/details/61197190