使用docker安装部署Spark集群来训练CNN(含Python实例)

时间:2023-06-23 10:27:19

使用docker安装部署Spark集群来训练CNN(含Python实例)


http://blog.csdn.net/cyh_24/article/details/49683221

实验室有4台神服务器,每台有8个tesla-GPU,然而平时做实验都只使用了其中的一个GPU,实在暴遣天物! 
于是想用Spark来把这些GPU都利用起来。听闻Docker是部署环境的神器,于是决定使用docker安装部署Spark集群来训练CNN。配置环境虽然简单,纯苦力活,但配过的人都知道,里面有太多坑了。

本文是博主含泪写出的踩坑总结,希望能够给各位提供了一些前车之鉴来避开这些坑。

docker

什么是docker

Docker 是一个开源项目,诞生于 2013 年初,最初是 dotCloud 公司内部的一个业余项目。直观来说,docker是一种轻量级的虚拟机。Docker 和传统虚拟化方式的不同之处在于:

docker是在操作系统层面上实现虚拟化,直接复用本地主机的操作系统,而传统方式则是在硬件层面实现。

一张图更直观地解释一下这两种差异:

使用docker安装部署Spark集群来训练CNN(含Python实例) 
使用docker安装部署Spark集群来训练CNN(含Python实例)

为什么使用docker

作为一种新兴的虚拟化方式,Docker 跟传统的虚拟化方式相比具有众多的优势。

  • Docker 容器的启动可以在秒级实现,这相比传统的虚拟机方式要快得多。
  • Docker 对系统资源的利用率很高,一台主机上可以同时运行数千个 Docker 容器。
  • 容器除了运行其中应用外,基本不消耗额外的系统资源,使得应用的性能很高,同时系统的开销尽量小。(传统虚拟机方式运行 10 个不同的应用就要起 10 个虚拟机,而Docker 只需要启动 10 个隔离的应用即可)。
  • 一次创建或配置,就可以在任意地方正常运行。
  • Docker 容器几乎可以在任意的平台上运行,包括物理机、虚拟机、公有云、私有云、个人电脑、服务器等。 这种兼容性可以让用户把一个应用程序从一个平台直接迁移到另外一个。

简单总结一下:

特性 docker 虚拟机
启动 秒级 分钟级
硬盘使用 一般为 MB 一般为 GB
性能 接近原生 弱于
系统支持量 单机支持上千个容器 一般几十个

Spark

Spark是 UC Berkeley AMP lab 所开源的类Hadoop MapReduce 的通用并行框架。 
Spark,拥有Hadoop MapReduce所具有的优点; 
但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS。

因此 Spark 能更好地适用于数据挖掘机器学习等需要迭代的 MapReduce 的算法

关于spark的原理应用等内容,这里就不多说了,改天我再写一篇单独来聊。现在你只要知道它能有办法让你的程序分布式跑起来就行了。

Elephas(支持spark的深度学习库)

先说 keras,它是基于 theano 的深度学习库,用过 theano 的可能会知道,theano 程序不是特别好些。keras 是对theano的一个高层封装,使得代码写起来更加方便,下面贴一段keras的cnn模型代码:

model = Sequential()

model.add(Convolution2D(nb_filters, nb_conv, nb_conv,
                        border_mode='full',
                        input_shape=(1, img_rows, img_cols)))
model.add(Activation('relu'))
model.add(Convolution2D(nb_filters, nb_conv, nb_conv))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(nb_pool, nb_pool)))
model.add(Dropout(0.25))

model.add(Flatten())
model.add(Dense(128))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(nb_classes))
model.add(Activation('softmax'))

model.compile(loss='categorical_crossentropy', optimizer='adadelta')

model.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=1, validation_data=(X_test, Y_test))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

是不是比caffe的配置文件还要简单?

elephas 使得keras程序能够运行在Spark上面。使得基本不改变keras,就能够将程序运行到spark上面了。 
下面贴一个elephas的代码(model还是上文的model):

# Create Spark context
conf = SparkConf().setAppName('Mnist_Spark_MLP').setMaster('local[8]')
sc = SparkContext(conf=conf)

# Build RDD from numpy features and labels
rdd = to_simple_rdd(sc, X_train, Y_train)

# Initialize SparkModel from Keras model and Spark context
spark_model = SparkModel(sc,model)

# Train Spark model
spark_model.train(rdd, nb_epoch=nb_epoch, batch_size=batch_size, verbose=0, validation_split=0.1, num_workers=8)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

要想在spark上面运行,只需要执行下面的命令:

spark-submit –driver-memory 1G ./your_script.py

该介绍的都介绍完了,下面我来手把手教你如何使用docker安装部署Spark-GPU集群来分布式训练CNN.

Spark on docker 安装

在线安装docker

Ubuntu 14.04 版本系统中已经自带了 Docker 包,可以直接安装。

$ sudo apt-get update
$ sudo apt-get install -y docker.io
$ sudo ln -sf /usr/bin/docker.io /usr/local/bin/docker
$ sudo sed -i '$acomplete -F _docker docker' /etc/bash_completion.d/docker.io
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4

如果是较低版本的 Ubuntu 系统,需要先更新内核。

$ sudo apt-get update
$ sudo apt-get install linux-image-generic-lts-raring linux-headers-generic-lts-raring
$ sudo reboot
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

然后重复上面的步骤即可。 
安装之后启动 Docker 服务。

$ sudo service docker start
  • 1
  • 1

离线安装docker

如果你的电脑连不上外网(像我的服务器那样),那还可以通过离线安装包来安装docker。 
你可以从这里下载离线包:https://get.daocloud.io/docker/builds/Linux/x86_64/docker-latest

chmod +x docker-latest
sudo mv docker-latest /usr/local/bin/docker
# Then start docker in daemon mode:
sudo docker daemon &
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4

Spark on docker 安装

Sequenceiq 公司提供了一个docker容器,里面安装好了spark,你只要从docker hub上pull下来就行了。

docker pull sequenceiq/spark:1.5.1

执行下面命令来运行一下:

sudo docker run -it sequenceiq/spark:1.5.1 bash

测试一下spark的功能: 
首先用ifconfig得到ip地址,我的ip是172.17.0.109,然后:

bash-4.1# cd /usr/local/spark 
bash-4.1# cp conf/spark-env.sh.template conf/spark-env.sh 
bash-4.1# vi conf/spark-env.sh

添加两行代码:

export SPARK_LOCAL_IP=172.17.0.109
export SPARK_MASTER_IP=172.17.0.109
  • 1
  • 2
  • 1
  • 2

然后启动master 跟slave:

bash-4.1# ./sbin/start-master.sh 
bash-4.1# ./sbin/start-slave.sh spark:172.17.0.109:7077

浏览器打开(你的ip:8080) 可以看到如下spark各节点的状态。

用spark-sumit提交一个应用运行一下:

bash-4.1# ./bin/spark-submit examples/src/main/Python/pi.py

得到如下结果:

15/11/05 02:11:23 INFO scheduler.DAGScheduler: Job 0 finished: reduce at /usr/local/spark-1.5.1-bin-hadoop2.6/examples/src/main/python/pi.py:39, took 1.095643 s

Pi is roughly 3.148900

恭喜你,刚刚跑了一个spark的应用程序!

你是不是觉得到目前为止都很顺利?提前剧透一下,困难才刚刚开始,好在我把坑都踩了一遍,所以虽然还是有点麻烦,不过至少你们还是绕过了一些深坑。。。

各种库的安装

elephas 需要python2.7,不过我们刚刚安装的docker自带的python是2.6版本,所以,我们先把python版本更新一下。

CentOS 的Python 版本升级

温馨提示:在python编译之前一定要安装openssl和openssl-devel,不要问我是怎么知道的。

yum install -y zlib-devel bzip2-devel openssl openssl-devel xz-libs wget
  • 1
  • 1

安装详情:

wget http://www.python.org/ftp/python/2.7.8/Python-2.7.8.tar.xz
xz -d Python-2.7.8.tar.xz
tar -xvf Python-2.7.8.tar

# 进入目录:
cd Python-2.7.8
# 运行配置 configure:
./configure --prefix=/usr/local CFLAGS=-fPIC (一定要加fPIC,不要问我怎么知道的)
# 编译安装:
make
make altinstall
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

设置 PATH

mv /usr/bin/python /usr/bin/python2.6
export PATH="/usr/local/bin:$PATH"
或者
ln -s /usr/local/bin/python2.7  /usr/bin/python
# 检查 Python 版本:
python -V
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

安装 setuptools

#获取软件包
wget --no-check-certificate https://pypi.python.org/packages/source/s/setuptools/setuptools-1.4.2.tar.gz
# 解压:
tar -xvf setuptools-1.4.2.tar.gz
cd setuptools-1.4.2
# 使用 Python 2.7.8 安装 setuptools
python setup.py install
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

安装 PIP

curl https://raw.githubusercontent.com/pypa/pip/master/contrib/get-pip.py | python -
  • 1
  • 1

修复 yum 工具

vi /usr/bin/yum

#修改 yum中的python
将第一行  #!/usr/bin/python
改为      #!/usr/bin/python2.6
此时yum就ok啦
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

theano, keras, elephas的安装

pip install --upgrade --no-deps git+git://github.com/Theano/Theano.git

pip install keras

pip install elephas
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

已达成技能

我们简单总结一下,我们已经完成的工作:

  1. 安装docker
  2. 载入了spark on docker镜像
  3. 将spark on docker 镜像中的python升级
  4. 安装了theano、keras、elephas

现在,我们已经可以做的事情: 
 如果你的机器有多个CPU(假设24个):

你可以只开一个docker,然后很简单的使用spark结合elephas来并行(利用24个cpu)计算CNN。

 如果你的机器有多个GPU(假设4个):

你可以开4个docker镜像,修改每个镜像内的~/.theanorc来选择特定的GPU来并行(4个GPU)计算。(需自行安装cuda)

单机多CPU集群并行训练CNN实例

跑一个最简单的网络来训练mnist手写字识别,贴一个能够直接运行的代码(要事先下载好mnist.pkl.gz):

from __future__ import absolute_import
from __future__ import print_function
import numpy as np

from keras.datasets import mnist
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation, Flatten
from keras.optimizers import SGD, Adam, RMSprop
from keras.layers.convolutional import Convolution2D, MaxPooling2D
from keras.utils import np_utils

from elephas.spark_model import SparkModel
from elephas.utils.rdd_utils import to_simple_rdd

from pyspark import SparkContext, SparkConf

import gzip
import cPickle

APP_NAME = "mnist"
MASTER_IP = 'local[24]'

# Define basic parameters
batch_size = 128
nb_classes = 10
nb_epoch = 5

# input image dimensions
img_rows, img_cols = 28, 28
# number of convolutional filters to use
nb_filters = 32
# size of pooling area for max pooling
nb_pool = 2
# convolution kernel size
nb_conv = 3

# Load data
f = gzip.open("./mnist.pkl.gz", "rb")
dd = cPickle.load(f)
(X_train, y_train), (X_test, y_test) = dd

X_train = X_train.reshape(X_train.shape[0], 1, img_rows, img_cols)
X_test = X_test.reshape(X_test.shape[0], 1, img_rows, img_cols)

X_train = X_train.astype("float32")
X_test = X_test.astype("float32")
X_train /= 255
X_test /= 255

print(X_train.shape[0], 'train samples')
print(X_test.shape[0], 'test samples')

# Convert class vectors to binary class matrices
Y_train = np_utils.to_categorical(y_train, nb_classes)
Y_test = np_utils.to_categorical(y_test, nb_classes)

model = Sequential()
model.add(Convolution2D(nb_filters, nb_conv, nb_conv,
                        border_mode='full',
                        input_shape=(1, img_rows, img_cols)))
model.add(Activation('relu'))
model.add(Convolution2D(nb_filters, nb_conv, nb_conv))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(nb_pool, nb_pool)))
model.add(Dropout(0.25))

model.add(Flatten())
model.add(Dense(128))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(nb_classes))
model.add(Activation('softmax'))

model.compile(loss='categorical_crossentropy', optimizer='adadelta')

## spark
conf = SparkConf().setAppName(APP_NAME).setMaster(MASTER_IP)
sc = SparkContext(conf=conf)

# Build RDD from numpy features and labels
rdd = to_simple_rdd(sc, X_train, Y_train)

# Initialize SparkModel from Keras model and Spark context
spark_model = SparkModel(sc,model)

# Train Spark model
spark_model.train(rdd, nb_epoch=nb_epoch, batch_size=batch_size, verbose=0, validation_split=0.1, num_workers=24)

# Evaluate Spark model by evaluating the underlying model
score = spark_model.get_network().evaluate(X_test, Y_test, show_accuracy=True, verbose=2)
print('Test accuracy:', score[1])
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92

执行以下命令即可运行:

/usr/local/spark/bin/spark-submit mnist_cnn_spark.py

使用24个slave,并行迭代了5次,得到的准确率和运行时间如下:

Test accuracy: 95.68% 
took: 1135s

不使用spark,大概测了一下,1次迭代就需要1800s,所以还是快7~8倍的。

多GPU集群并行训练CNN实例

由于博主近几日踩太多坑了,心实在太累了! 
关于单机多GPU集群,多机多GPU集群的配置,还请各位多待几日,等博主元气恢复,会继续义无反顾地继续踩坑的。。。

为了赤焰军,我会回来的!