1 二元逻辑回归
回归是一种很容易理解的模型,就相当于y=f(x),表明自变量x与因变量y的关系。最常见问题如医生治病时的望、闻、问、切,之后判定病人是否生病或生了什么病, 其中的望、闻、问、切就是获取的自变量x,即特征数据,判断是否生病就相当于获取因变量y,即预测分类。最简单的回归是线性回归,但是线性回归的鲁棒性很差。
逻辑回归是一种减小预测范围,将预测值限定为[0,1]间的一种回归模型,其回归方程与回归曲线如下图所示。逻辑曲线在z=0时,十分敏感,在z>>0或z<<0时,都不敏感。
逻辑回归其实是在线性回归的基础上,套用了一个逻辑函数。上图的g(z)就是这个逻辑函数(或称为Sigmoid函数)。下面左图是一个线性的决策边界,右图是非线性的决策边界。
对于线性边界的情况,边界形式可以归纳为如下公式(1):
因此我们可以构造预测函数为如下公式(2):
该预测函数表示分类结果为1时的概率。因此对于输入点x
,分类结果为类别1和类别0的概率分别为如下公式(3):
2 多元逻辑回归
二元逻辑回归可以一般化为多元逻辑回归用来训练和预测多分类问题。对于多分类问题,算法将会训练出一个多元逻辑回归模型, 它包含K-1个二元回归模型。给定一个数据点,K-1个模型都会运行,概率最大的类别将会被选为预测类别。
对于输入点x,分类结果为各类别的概率分别为如下公式(6),其中k表示类别个数。
对于k类的多分类问题,模型的权重w = (w_1, w_2, ..., w_{K-1})是一个矩阵,如果添加截距,矩阵的维度为(K-1) * (N+1),否则为(K-1) * N。单个样本的目标函数的损失函数可以写成如下公式(7)的形式。
对损失函数求一阶导数,我们可以得到下面的公式(8):
根据上面的公式,如果某些margin的值大于709.78,multiplier以及逻辑函数的计算会出现算术溢出(arithmetic overflow)的情况。这个问题发生在有离群点远离超平面的情况下。 幸运的是,当max(margins) = maxMargin > 0时,损失函数可以重写为如下公式(9)的形式:
同理,multiplier也可以重写为如下公式(10)的形式。
3 逻辑回归的优缺点
- 优点:计算代价低,速度快,容易理解和实现。
- 缺点:容易欠拟合,分类和回归的精度不高。
4 实例
下面的例子展示了如何使用逻辑回归:
importorg.apache.spark.SparkContextimportorg.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}
importorg.apache.spark.mllib.evaluation.MulticlassMetricsimportorg.apache.spark.mllib.regression.LabeledPointimportorg.apache.spark.mllib.linalg.Vectorsimportorg.apache.spark.mllib.util.MLUtils// 加载训练数据valdata=MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// 切分数据,training (60%) and test (40%).valsplits= data.randomSplit(Array(0.6, 0.4), seed =11L)
valtraining= splits(0).cache()
valtest= splits(1)
// 训练模型valmodel=newLogisticRegressionWithLBFGS()
.setNumClasses(10)
.run(training)
// Compute raw scores on the test set.valpredictionAndLabels= test.map { caseLabeledPoint(label, features) =>valprediction= model.predict(features)
(prediction, label)
}
// Get evaluation metrics.valmetrics=newMulticlassMetrics(predictionAndLabels)
valprecision= metrics.precision
println("Precision = "+ precision)
// 保存和加载模型
model.save(sc, "myModelPath")
valsameModel=LogisticRegressionModel.load(sc, "myModelPath")
5 源码分析
5.1 训练模型
如上所述,在MLlib中,分别使用了梯度下降法和L-BFGS实现逻辑回归参数的计算。这两个算法的实现我们会在最优化章节介绍,这里我们介绍公共的部分。
LogisticRegressionWithLBFGS和LogisticRegressionWithSGD的入口函数均是GeneralizedLinearAlgorithm.run,下面详细分析该方法:
defrun(input: RDD[LabeledPoint]):M= {
if (numFeatures <0) {
//计算特征数
numFeatures = input.map(_.features.size).first()
}
valinitialWeights= {
if (numOfLinearPredictor ==1) {
Vectors.zeros(numFeatures)
} elseif (addIntercept) {
Vectors.zeros((numFeatures +1) * numOfLinearPredictor)
} else {
Vectors.zeros(numFeatures * numOfLinearPredictor)
}
}
run(input, initialWeights)
}
上面的代码初始化权重向量,向量的值均初始化为0。需要注意的是,addIntercept表示是否添加截距(Intercept,指函数图形与坐标的交点到原点的距离),默认是不添加的。numOfLinearPredictor表示二元逻辑回归模型的个数。 我们重点看run(input, initialWeights)的实现。它的实现分四步。
5.1.1 根据提供的参数缩放特征并添加截距
valscaler=if (useFeatureScaling) {
newStandardScaler(withStd =true, withMean =false).fit(input.map(_.features))
} else {
null
}
valdata=if (addIntercept) {
if (useFeatureScaling) {
input.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache()
} else {
input.map(lp => (lp.label, appendBias(lp.features))).cache()
}
} else {
if (useFeatureScaling) {
input.map(lp => (lp.label, scaler.transform(lp.features))).cache()
} else {
input.map(lp => (lp.label, lp.features))
}
}
valinitialWeightsWithIntercept=if (addIntercept && numOfLinearPredictor ==1) {
appendBias(initialWeights)
} else {
/** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */
initialWeights
}
在最优化过程中,收敛速度依赖于训练数据集的条件数(condition number),缩放变量经常可以启发式地减少这些条件数,提高收敛速度。不减少条件数,一些混合有不同范围列的数据集可能不能收敛。 在这里使用StandardScaler将数据集的特征进行缩放。详细信息请看StandardScaler。appendBias方法很简单,就是在每个向量后面加一个值为1的项。
defappendBias(vector: Vector):Vector= {
vector match {
casedv: DenseVector=>valinputValues= dv.values
valinputLength= inputValues.length
valoutputValues=Array.ofDim[Double](inputLength +1)
System.arraycopy(inputValues, 0, outputValues, 0, inputLength)
outputValues(inputLength) =1.0Vectors.dense(outputValues)
casesv: SparseVector=>valinputValues= sv.values
valinputIndices= sv.indices
valinputValuesLength= inputValues.length
valdim= sv.size
valoutputValues=Array.ofDim[Double](inputValuesLength +1)
valoutputIndices=Array.ofDim[Int](inputValuesLength +1)
System.arraycopy(inputValues, 0, outputValues, 0, inputValuesLength)
System.arraycopy(inputIndices, 0, outputIndices, 0, inputValuesLength)
outputValues(inputValuesLength) =1.0
outputIndices(inputValuesLength) = dim
Vectors.sparse(dim +1, outputIndices, outputValues)
case _ =>thrownewIllegalArgumentException(s"Do not support vector type ${vector.getClass}")
}
5.1.2 使用最优化算法计算最终的权重值
valweightsWithIntercept= optimizer.optimize(data, initialWeightsWithIntercept)
有梯度下降算法和L-BFGS两种算法来计算最终的权重值,查看梯度下降法和L-BFGS了解详细实现。 这两种算法均使用Gradient的实现类计算梯度,使用Updater的实现类更新参数。在 LogisticRegressionWithSGD 和 LogisticRegressionWithLBFGS 中,它们均使用 LogisticGradient 实现类计算梯度,使用 SquaredL2Updater 实现类更新参数。
//在GradientDescent中privatevalgradient=newLogisticGradient()
privatevalupdater=newSquaredL2Updater()
overridevaloptimizer=newGradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)
//在LBFGS中overridevaloptimizer=newLBFGS(newLogisticGradient,newSquaredL2Updater)
下面将详细介绍LogisticGradient的实现和SquaredL2Updater的实现。
- LogisticGradient
valmargin=-1.0* dot(data, weights)
valmultiplier= (1.0/ (1.0+ math.exp(margin))) - label
//y += a * x,即cumGradient += multiplier * data
axpy(multiplier, data, cumGradient)
if (label >0) {
// The following is equivalent to log(1 + exp(margin)) but more numerically stable.MLUtils.log1pExp(margin)
} else {
MLUtils.log1pExp(margin) - margin
}
这里的multiplier就是上文的公式(2)。axpy方法用于计算梯度,这里表示的意思是h(x) * x。下面是多元逻辑回归的实现方法。
//权重valweightsArray= weights match {
casedv: DenseVector=> dv.values
case _ =>thrownewIllegalArgumentException
}
//梯度valcumGradientArray= cumGradient match {
casedv: DenseVector=> dv.values
case _ =>thrownewIllegalArgumentException
}
// 计算所有类别中最大的marginvarmarginY=0.0varmaxMargin=Double.NegativeInfinityvarmaxMarginIndex=0valmargins=Array.tabulate(numClasses -1) { i =>varmargin=0.0
data.foreachActive { (index, value) =>if (value !=0.0) margin += value * weightsArray((i * dataSize) + index)
}
if (i == label.toInt -1) marginY = margin
if (margin > maxMargin) {
maxMargin = margin
maxMarginIndex = i
}
margin
}
//计算sum,保证每个margin都小于0,避免出现算术溢出的情况valsum= {
vartemp=0.0if (maxMargin >0) {
for (i <-0 until numClasses -1) {
margins(i) -= maxMargin
if (i == maxMarginIndex) {
temp += math.exp(-maxMargin)
} else {
temp += math.exp(margins(i))
}
}
} else {
for (i <-0 until numClasses -1) {
temp += math.exp(margins(i))
}
}
temp
}
//计算multiplier并计算梯度for (i <-0 until numClasses -1) {
valmultiplier= math.exp(margins(i)) / (sum +1.0) - {
if (label !=0.0&& label == i +1) 1.0else0.0
}
data.foreachActive { (index, value) =>if (value !=0.0) cumGradientArray(i * dataSize + index) += multiplier * value
}
}
//计算损失函数,valloss=if (label >0.0) math.log1p(sum) - marginY else math.log1p(sum)
if (maxMargin >0) {
loss + maxMargin
} else {
loss
}
- SquaredL2Updater
classSquaredL2UpdaterextendsUpdater {
overridedefcompute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
// w' = w - thisIterStepSize * (gradient + regParam * w)// w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient//表示步长,即负梯度方向的大小valthisIterStepSize= stepSize / math.sqrt(iter)
valbrzWeights:BV[Double] = weightsOld.toBreeze.toDenseVector
//正则化,brzWeights每行数据均乘以(1.0 - thisIterStepSize * regParam)
brzWeights :*= (1.0- thisIterStepSize * regParam)
//y += x * a,即brzWeights -= gradient * thisInterStepSize
brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
//正则化||w||_2valnorm= brzNorm(brzWeights, 2.0)
(Vectors.fromBreeze(brzWeights), 0.5* regParam * norm * norm)
}
}
该函数的实现规则是:
w' = w - thisIterStepSize * (gradient + regParam * w)w'= (1- thisIterStepSize * regParam) * w - thisIterStepSize * gradient
这里thisIterStepSize表示参数沿负梯度方向改变的速率,它随着迭代次数的增多而减小。
5.1.3 对最终的权重值进行后处理
valintercept=if (addIntercept && numOfLinearPredictor ==1) {
weightsWithIntercept(weightsWithIntercept.size -1)
} else {
0.0
}
varweights=if (addIntercept && numOfLinearPredictor ==1) {
Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size -1))
} else {
weightsWithIntercept
}
该段代码获得了截距(intercept)以及最终的权重值。由于截距(intercept)和权重是在收缩的空间进行训练的,所以我们需要再把它们转换到原始的空间。数学知识告诉我们,如果我们仅仅执行标准化而没有减去均值,即withStd = true, withMean = false, 那么截距(intercept)的值并不会发送改变。所以下面的代码仅仅处理权重向量。
if (useFeatureScaling) {
if (numOfLinearPredictor ==1) {
weights = scaler.transform(weights)
} else {
vari=0valn= weights.size / numOfLinearPredictor
valweightsArray= weights.toArray
while (i < numOfLinearPredictor) {
//排除interceptvalstart= i * n
valend= (i +1) * n - { if (addIntercept) 1else0 }
valpartialWeightsArray= scaler.transform(
Vectors.dense(weightsArray.slice(start, end))).toArray
System.arraycopy(partialWeightsArray, 0, weightsArray, start, partialWeightsArray.size)
i +=1
}
weights =Vectors.dense(weightsArray)
}
}
5.1.4 创建模型
createModel(weights, intercept)
5.2 预测
训练完模型之后,我们就可以通过训练的模型计算得到测试数据的分类信息。predictPoint用来预测分类信息。它针对二分类和多分类,分别进行处理。
- 二分类的情况
valmargin= dot(weightMatrix, dataMatrix) + intercept
valscore=1.0/ (1.0+ math.exp(-margin))
threshold match {
caseSome(t) =>if (score > t) 1.0else0.0caseNone=> score
}
- 多分类情况
varbestClass=0varmaxMargin=0.0valwithBias= dataMatrix.size +1== dataWithBiasSize
(0 until numClasses -1).foreach { i =>varmargin=0.0
dataMatrix.foreachActive { (index, value) =>if (value !=0.0) margin += value * weightsArray((i * dataWithBiasSize) + index)
}
// Intercept is required to be added into margin.if (withBias) {
margin += weightsArray((i * dataWithBiasSize) + dataMatrix.size)
}
if (margin > maxMargin) {
maxMargin = margin
bestClass = i +1
}
}
bestClass.toDouble
参考文献