Spark MLlib Deep Learning Deep Belief Network (深度学习-深度信念网络)2.1
Spark MLlib Deep Learning工具箱,是根据现有深度学习教程《UFLDL教程》中的算法,在SparkMLlib中的实现。具体Spark MLlib Deep Learning(深度学习)目录结构:
第一章Neural Net(NN)
1、源码
2、源码解析
3、实例
第二章Deep Belief Nets(DBNs)
1、源码
2、源码解析
3、实例
第三章Convolution Neural Network(CNN)
第四章 Stacked Auto-Encoders(SAE)
第五章CAE
第二章Deep Belief Network (深度信念网络)
1源码
目前Spark MLlib Deep Learning工具箱源码的github地址为:
https://github.com/sunbow1/SparkMLlibDeepLearn
1.1 DBN代码
package DBN import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.Logging import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg.distributed.RowMatrix import breeze.linalg.{ Matrix => BM, CSCMatrix => BSM, DenseMatrix => BDM, Vector => BV, DenseVector => BDV, SparseVector => BSV, axpy => brzAxpy, svd => brzSvd } import breeze.numerics.{ exp => Bexp, tanh => Btanh } import scala.collection.mutable.ArrayBuffer import java.util.Random import scala.math._ /** * W:权重 * b:偏置 * c:偏置 */ case class DBNweight( W: BDM[Double], vW: BDM[Double], b: BDM[Double], vb: BDM[Double], c: BDM[Double], vc: BDM[Double]) extends Serializable /** * 配置参数 */ case class DBNConfig( size: Array[Int], layer: Int, momentum: Double, alpha: Double) extends Serializable /** * DBN(Deep Belief Network) */ class DBN( private var size: Array[Int], private var layer: Int, private var momentum: Double, private var alpha: Double) extends Serializable with Logging { // var size=Array(5, 10, 10) // var layer=3 // var momentum=0.0 // var alpha=1.0 /** * size = architecture; 网络结构 * layer = numel(nn.size); 网络层数 * momentum = 0.0; Momentum * alpha = 1.0; alpha */ def this() = this(DBN.Architecture, 3, 0.0, 1.0) /** 设置神经网络结构. Default: [10, 5, 1]. */ def setSize(size: Array[Int]): this.type = { this.size = size this } /** 设置神经网络层数据. Default: 3. */ def setLayer(layer: Int): this.type = { this.layer = layer this } /** 设置Momentum. Default: 0.0. */ def setMomentum(momentum: Double): this.type = { this.momentum = momentum this } /** 设置alpha. Default: 1. */ def setAlpha(alpha: Double): this.type = { this.alpha = alpha this } /** * 深度信念网络(Deep Belief Network) * 运行训练DBNtrain */ def DBNtrain(train_d: RDD[(BDM[Double], BDM[Double])], opts: Array[Double]): DBNModel = { // 参数配置 广播配置 val sc = train_d.sparkContext val dbnconfig = DBNConfig(size, layer, momentum, alpha) // 初始化权重 var dbn_W = DBN.InitialW(size) var dbn_vW = DBN.InitialvW(size) var dbn_b = DBN.Initialb(size) var dbn_vb = DBN.Initialvb(size) var dbn_c = DBN.Initialc(size) var dbn_vc = DBN.Initialvc(size) // 训练第1层 printf("Training Level: %d.\n", 1) val weight0 = new DBNweight(dbn_W(0), dbn_vW(0), dbn_b(0), dbn_vb(0), dbn_c(0), dbn_vc(0)) val weight1 = RBMtrain(train_d, opts, dbnconfig, weight0) dbn_W(0) = weight1.W dbn_vW(0) = weight1.vW dbn_b(0) = weight1.b dbn_vb(0) = weight1.vb dbn_c(0) = weight1.c dbn_vc(0) = weight1.vc // 打印权重 printf("dbn_W%d.\n", 1) val tmpw0 = dbn_W(0) for (i <- 0 to tmpw0.rows - 1) { for (j <- 0 to tmpw0.cols - 1) { print(tmpw0(i, j) + "\t") } println() } // 训练第2层 至 n层 for (i <- 2 to dbnconfig.layer - 1) { // 前向计算x // x = sigm(repmat(rbm.c', size(x, 1), 1) + x * rbm.W'); printf("Training Level: %d.\n", i) val tmp_bc_w = sc.broadcast(dbn_W(i - 2)) val tmp_bc_c = sc.broadcast(dbn_c(i - 2)) val train_d2 = train_d.map { f => val lable = f._1 val x = f._2 val x2 = DBN.sigm(x * tmp_bc_w.value.t + tmp_bc_c.value.t) (lable, x2) } // 训练第i层 val weighti = new DBNweight(dbn_W(i - 1), dbn_vW(i - 1), dbn_b(i - 1), dbn_vb(i - 1), dbn_c(i - 1), dbn_vc(i - 1)) val weight2 = RBMtrain(train_d2, opts, dbnconfig, weighti) dbn_W(i - 1) = weight2.W dbn_vW(i - 1) = weight2.vW dbn_b(i - 1) = weight2.b dbn_vb(i - 1) = weight2.vb dbn_c(i - 1) = weight2.c dbn_vc(i - 1) = weight2.vc // 打印权重 printf("dbn_W%d.\n", i) val tmpw1 = dbn_W(i - 1) for (i <- 0 to tmpw1.rows - 1) { for (j <- 0 to tmpw1.cols - 1) { print(tmpw1(i, j) + "\t") } println() } } new DBNModel(dbnconfig, dbn_W, dbn_b, dbn_c) } /** * 深度信念网络(Deep Belief Network) * 每一层神经网络进行训练rbmtrain */ def RBMtrain(train_t: RDD[(BDM[Double], BDM[Double])], opts: Array[Double], dbnconfig: DBNConfig, weight: DBNweight): DBNweight = { val sc = train_t.sparkContext var StartTime = System.currentTimeMillis() var EndTime = System.currentTimeMillis() // 权重参数变量 var rbm_W = weight.W var rbm_vW = weight.vW var rbm_b = weight.b var rbm_vb = weight.vb var rbm_c = weight.c var rbm_vc = weight.vc // 广播参数 val bc_config = sc.broadcast(dbnconfig) // 训练样本数量 val m = train_t.count // 计算batch的数量 val batchsize = opts(0).toInt val numepochs = opts(1).toInt val numbatches = (m / batchsize).toInt // numepochs是循环的次数 for (i <- 1 to numepochs) { StartTime = System.currentTimeMillis() val splitW2 = Array.fill(numbatches)(1.0 / numbatches) var err = 0.0 // 根据分组权重,随机划分每组样本数据 for (l <- 1 to numbatches) { // 1 广播权重参数 val bc_rbm_W = sc.broadcast(rbm_W) val bc_rbm_vW = sc.broadcast(rbm_vW) val bc_rbm_b = sc.broadcast(rbm_b) val bc_rbm_vb = sc.broadcast(rbm_vb) val bc_rbm_c = sc.broadcast(rbm_c) val bc_rbm_vc = sc.broadcast(rbm_vc) // // 打印权重 // println(i + "\t" + l) // val tmpw0 = bc_rbm_W.value // for (i <- 0 to tmpw0.rows - 1) { // for (j <- 0 to tmpw0.cols - 1) { // print(tmpw0(i, j) + "\t") // } // println() // } // 2 样本划分 val train_split2 = train_t.randomSplit(splitW2, System.nanoTime()) val batch_xy1 = train_split2(l - 1) // val train_split3 = train_t.filter { f => (f._1 >= batchsize * (l - 1) + 1) && (f._1 <= batchsize * (l)) } // val batch_xy1 = train_split3.map(f => (f._2, f._3)) // 3 前向计算 // v1 = batch; // h1 = sigmrnd(repmat(rbm.c', opts.batchsize, 1) + v1 * rbm.W'); // v2 = sigmrnd(repmat(rbm.b', opts.batchsize, 1) + h1 * rbm.W); // h2 = sigm(repmat(rbm.c', opts.batchsize, 1) + v2 * rbm.W'); // c1 = h1' * v1; // c2 = h2' * v2; val batch_vh1 = batch_xy1.map { f => val lable = f._1 val v1 = f._2 val h1 = DBN.sigmrnd((v1 * bc_rbm_W.value.t + bc_rbm_c.value.t)) val v2 = DBN.sigmrnd((h1 * bc_rbm_W.value + bc_rbm_b.value.t)) val h2 = DBN.sigm(v2 * bc_rbm_W.value.t + bc_rbm_c.value.t) val c1 = h1.t * v1 val c2 = h2.t * v2 (lable, v1, h1, v2, h2, c1, c2) } // 4 更新前向计算 // rbm.vW = rbm.momentum * rbm.vW + rbm.alpha * (c1 - c2) / opts.batchsize; // rbm.vb = rbm.momentum * rbm.vb + rbm.alpha * sum(v1 - v2)' / opts.batchsize; // rbm.vc = rbm.momentum * rbm.vc + rbm.alpha * sum(h1 - h2)' / opts.batchsize; // W 更新方向 val vw1 = batch_vh1.map { case (lable, v1, h1, v2, h2, c1, c2) => c1 - c2 } val initw = BDM.zeros[Double](bc_rbm_W.value.rows, bc_rbm_W.value.cols) val (vw2, countw2) = vw1.treeAggregate((initw, 0L))( seqOp = (c, v) => { // c: (m, count), v: (m) val m1 = c._1 val m2 = m1 + v (m2, c._2 + 1) }, combOp = (c1, c2) => { // c: (m, count) val m1 = c1._1 val m2 = c2._1 val m3 = m1 + m2 (m3, c1._2 + c2._2) }) val vw3 = vw2 / countw2.toDouble rbm_vW = bc_config.value.momentum * bc_rbm_vW.value + bc_config.value.alpha * vw3 // b 更新方向 val vb1 = batch_vh1.map { case (lable, v1, h1, v2, h2, c1, c2) => (v1 - v2) } val initb = BDM.zeros[Double](bc_rbm_vb.value.cols, bc_rbm_vb.value.rows) val (vb2, countb2) = vb1.treeAggregate((initb, 0L))( seqOp = (c, v) => { // c: (m, count), v: (m) val m1 = c._1 val m2 = m1 + v (m2, c._2 + 1) }, combOp = (c1, c2) => { // c: (m, count) val m1 = c1._1 val m2 = c2._1 val m3 = m1 + m2 (m3, c1._2 + c2._2) }) val vb3 = vb2 / countb2.toDouble rbm_vb = bc_config.value.momentum * bc_rbm_vb.value + bc_config.value.alpha * vb3.t // c 更新方向 val vc1 = batch_vh1.map { case (lable, v1, h1, v2, h2, c1, c2) => (h1 - h2) } val initc = BDM.zeros[Double](bc_rbm_vc.value.cols, bc_rbm_vc.value.rows) val (vc2, countc2) = vc1.treeAggregate((initc, 0L))( seqOp = (c, v) => { // c: (m, count), v: (m) val m1 = c._1 val m2 = m1 + v (m2, c._2 + 1) }, combOp = (c1, c2) => { // c: (m, count) val m1 = c1._1 val m2 = c2._1 val m3 = m1 + m2 (m3, c1._2 + c2._2) }) val vc3 = vc2 / countc2.toDouble rbm_vc = bc_config.value.momentum * bc_rbm_vc.value + bc_config.value.alpha * vc3.t // 5 权重更新 // rbm.W = rbm.W + rbm.vW; // rbm.b = rbm.b + rbm.vb; // rbm.c = rbm.c + rbm.vc; rbm_W = bc_rbm_W.value + rbm_vW rbm_b = bc_rbm_b.value + rbm_vb rbm_c = bc_rbm_c.value + rbm_vc // 6 计算误差 val dbne1 = batch_vh1.map { case (lable, v1, h1, v2, h2, c1, c2) => (v1 - v2) } val (dbne2, counte) = dbne1.treeAggregate((0.0, 0L))( seqOp = (c, v) => { // c: (e, count), v: (m) val e1 = c._1 val e2 = (v :* v).sum val esum = e1 + e2 (esum, c._2 + 1) }, combOp = (c1, c2) => { // c: (e, count) val e1 = c1._1 val e2 = c2._1 val esum = e1 + e2 (esum, c1._2 + c2._2) }) val dbne = dbne2 / counte.toDouble err += dbne } EndTime = System.currentTimeMillis() // 打印误差结果 printf("epoch: numepochs = %d , Took = %d seconds; Average reconstruction error is: %f.\n", i, scala.math.ceil((EndTime - StartTime).toDouble / 1000).toLong, err / numbatches.toDouble) } new DBNweight(rbm_W, rbm_vW, rbm_b, rbm_vb, rbm_c, rbm_vc) } } /** * NN(neural network) */ object DBN extends Serializable { // Initialization mode names val Activation_Function = "sigm" val Output = "linear" val Architecture = Array(10, 5, 1) /** * 初始化权重 * 初始化为0 */ def InitialW(size: Array[Int]): Array[BDM[Double]] = { // 初始化权重参数 // weights and weight momentum // dbn.rbm{u}.W = zeros(dbn.sizes(u + 1), dbn.sizes(u)); val n = size.length val rbm_W = ArrayBuffer[BDM[Double]]() for (i <- 1 to n - 1) { val d1 = BDM.zeros[Double](size(i), size(i - 1)) rbm_W += d1 } rbm_W.toArray } /** * 初始化权重vW * 初始化为0 */ def InitialvW(size: Array[Int]): Array[BDM[Double]] = { // 初始化权重参数 // weights and weight momentum // dbn.rbm{u}.vW = zeros(dbn.sizes(u + 1), dbn.sizes(u)); val n = size.length val rbm_vW = ArrayBuffer[BDM[Double]]() for (i <- 1 to n - 1) { val d1 = BDM.zeros[Double](size(i), size(i - 1)) rbm_vW += d1 } rbm_vW.toArray } /** * 初始化偏置向量b * 初始化为0 */ def Initialb(size: Array[Int]): Array[BDM[Double]] = { // 初始化偏置向量b // weights and weight momentum // dbn.rbm{u}.b = zeros(dbn.sizes(u), 1); val n = size.length val rbm_b = ArrayBuffer[BDM[Double]]() for (i <- 1 to n - 1) { val d1 = BDM.zeros[Double](size(i - 1), 1) rbm_b += d1 } rbm_b.toArray } /** * 初始化偏置向量vb * 初始化为0 */ def Initialvb(size: Array[Int]): Array[BDM[Double]] = { // 初始化偏置向量b // weights and weight momentum // dbn.rbm{u}.vb = zeros(dbn.sizes(u), 1); val n = size.length val rbm_vb = ArrayBuffer[BDM[Double]]() for (i <- 1 to n - 1) { val d1 = BDM.zeros[Double](size(i - 1), 1) rbm_vb += d1 } rbm_vb.toArray } /** * 初始化偏置向量c * 初始化为0 */ def Initialc(size: Array[Int]): Array[BDM[Double]] = { // 初始化偏置向量c // weights and weight momentum // dbn.rbm{u}.c = zeros(dbn.sizes(u + 1), 1); val n = size.length val rbm_c = ArrayBuffer[BDM[Double]]() for (i <- 1 to n - 1) { val d1 = BDM.zeros[Double](size(i), 1) rbm_c += d1 } rbm_c.toArray } /** * 初始化偏置向量vc * 初始化为0 */ def Initialvc(size: Array[Int]): Array[BDM[Double]] = { // 初始化偏置向量c // weights and weight momentum // dbn.rbm{u}.vc = zeros(dbn.sizes(u + 1), 1); val n = size.length val rbm_vc = ArrayBuffer[BDM[Double]]() for (i <- 1 to n - 1) { val d1 = BDM.zeros[Double](size(i), 1) rbm_vc += d1 } rbm_vc.toArray } /** * Gibbs采样 * X = double(1./(1+exp(-P)) > rand(size(P))); */ def sigmrnd(P: BDM[Double]): BDM[Double] = { val s1 = 1.0 / (Bexp(P * (-1.0)) + 1.0) val r1 = BDM.rand[Double](s1.rows, s1.cols) val a1 = s1 :> r1 val a2 = a1.data.map { f => if (f == true) 1.0 else 0.0 } val a3 = new BDM(s1.rows, s1.cols, a2) a3 } /** * Gibbs采样 * X = double(1./(1+exp(-P)))+1*randn(size(P)); */ def sigmrnd2(P: BDM[Double]): BDM[Double] = { val s1 = 1.0 / (Bexp(P * (-1.0)) + 1.0) val r1 = BDM.rand[Double](s1.rows, s1.cols) val a3 = s1 + (r1 * 1.0) a3 } /** * sigm激活函数 * X = 1./(1+exp(-P)); */ def sigm(matrix: BDM[Double]): BDM[Double] = { val s1 = 1.0 / (Bexp(matrix * (-1.0)) + 1.0) s1 } /** * tanh激活函数 * f=1.7159*tanh(2/3.*A); */ def tanh_opt(matrix: BDM[Double]): BDM[Double] = { val s1 = Btanh(matrix * (2.0 / 3.0)) * 1.7159 s1 } }
1.2 DBNModel代码
package DBN import breeze.linalg.{ Matrix => BM, CSCMatrix => BSM, DenseMatrix => BDM, Vector => BV, DenseVector => BDV, SparseVector => BSV } import org.apache.spark.rdd.RDD import scala.collection.mutable.ArrayBuffer class DBNModel( val config: DBNConfig, val dbn_W: Array[BDM[Double]], val dbn_b: Array[BDM[Double]], val dbn_c: Array[BDM[Double]]) extends Serializable { /** * DBN模型转化为NN模型 * 权重转换 */ def dbnunfoldtonn(outputsize: Int): (Array[Int], Int, Array[BDM[Double]]) = { //1 size layer 参数转换 val size = if (outputsize > 0) { val size1 = config.size val size2 = ArrayBuffer[Int]() size2 ++= size1 size2 += outputsize size2.toArray } else config.size val layer = if (outputsize > 0) config.layer + 1 else config.layer //2 dbn_W 参数转换 var initW = ArrayBuffer[BDM[Double]]() for (i <- 0 to dbn_W.length - 1) { initW += BDM.horzcat(dbn_c(i), dbn_W(i)) } (size, layer, initW.toArray) } }
转载请注明出处: