Spark 0.9.1 MLLib
机器学习库简介
MLlib 是Spark对常用的机器学习算法的实现库,同时包括相关的测试和数据生成器。MLlib 目前支持四种常见的机器学习问题:二元分类,回归,聚类以及协同过滤,同时也包括一个底层的梯度下降优化基础算法。本指南将会简要介绍 MLlib 中所支持的功能,并给出相应的调用 MLlib 的例子。
依赖
MLlib 将会调用 jblas 线性代数库,这个库本身依赖于原生的 Fortran 程序。如果你的节点中没有这些库,你也许会需要安装 gfortran runtime library。如果程序没有办法自动检测到这些库,MLlib 将会抛出链接错误的异常。
如果想用 Python 调用 MLlib,你需要安装 NumPy 1.7 或者更新的版本。
二元分类
二元分类是一个监督学习问题。在这个问题中,我们希望将实体归类到两个独立的类别或标签的其中一个中,例如判断一个邮件是否是垃圾邮件。这个问题涉及在一组被打过标签的样例运行一个学习算法,例如一组由(数字)特征和(相关的)类别标签所代表的实体。这个算法将会返回一个训练好的模型,该模型能够对标签未知的新个体进行潜在标签预测。
MLlib 目前支持两个适用于二元分类的标准模型家族:线性支持向量机(SVMs) 和逻辑回归,同时也包括分别适用与这两个模型家族的 L1 和 L2 正则化 变体。这些训练算法都利用了一个底层的梯度下降基础算法(描述如下)。二元分类算法的输入值是一个正则项参数(regParam) 和多个与梯度下降相关的参数(stepSize, numIterations, miniBatchFraction) 。
目前可用的二元分类算法:
线性回归
线性回归是另一个经典的监督学习问题。在这个问题中,每个个体都有一个与之相关联的实数标签(而在二元分类中个体的标签都是二元的),并且我们希望在给出用于表示这些实体的数值特征后,所预测出的标签值可以尽可能接近实际值。MLlib支持线性回归和与之相关的 L1 (lasso)和 L2 (ridge) 正则化的变体。MLlib中的回归算法也利用了底层的梯度下降基础算法(描述如下),输入参数与上述二元分类算法一致。
目前可用的线性回归算法:
聚类
聚类是一个非监督学习问题,在这个问题上,我们的目标是将一部分实体根据某种意义上的相似度和另一部分实体聚在一起。聚类通常被用于探索性的分析,或者作为层次化监督学习管道网(hierarchical supervised learning pipeline) 的一个组件(其中每一个类簇都会用与训练不同的分类器或者回归模型)。 MLlib 目前已经支持作为最被广泛使用的聚类算法之一的 k-means 聚类算法,根据事先定义的类簇个数,这个算法能对数据进行聚类。MLlib 的实现中包含一个 k-means++ 方法的并行化变体 kmeans||。 MLlib 里面的实现有如下的参数:
-
k 是所需的类簇的个数。
-
maxIterations 是最大的迭代次数。
-
initializationMode 这个参数决定了是用随机初始化还是通过 k-means|| 进行初始化。
-
runs 是跑 k-means 算法的次数(k-mean 算法不能保证能找出最优解,如果在给定的数据集上运行多次,算法将会返回最佳的结果)。
-
initializiationSteps 决定了 k-means|| 算法的步数。
-
epsilon 决定了判断 k-means 是否收敛的距离阀值。
目前可用的聚类算法:
协同过滤
协同过滤常被应用于推荐系统。这些技术旨在补充用户-商品关联矩阵中所缺失的部分。MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐语义因子进行表达,并且这些因子也用于预测缺失的元素。为此,我们实现了交替最小二乘法(ALS) 来学习这些隐性语义因子。在 MLlib 中的实现有如下的参数:
numBlocks 是用于并行化计算的分块个数 (设置为-1为自动配置)。
rank 是模型中隐语义因子的个数。
iterations 是迭代的次数。
lambda 是ALS的正则化参数。
implicitPrefs 决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本。
alpha 是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准。
隐性反馈 vs 显性反馈
基于矩阵分解的协同过滤的标准方法一般将用户商品矩阵中的元素作为用户对商品的显性偏好。
在许多的现实生活中的很多场景中,我们常常只能接触到隐性的反馈(例如游览,点击,购买,喜欢,分享等等)在 MLlib 中所用到的处理这种数据的方法来源于文献: Collaborative Filtering for Implicit Feedback Datasets。 本质上,这个方法将数据作为二元偏好值和偏好强度的一个结合,而不是对评分矩阵直接进行建模。因此,评价就不是与用户对商品的显性评分而是和所观察到的用户偏好强度关联了起来。然后,这个模型将尝试找到隐语义因子来预估一个用户对一个商品的偏好。
目前可用的协同过滤的算法:
梯度下降基础算法
梯度下降(及其随机的变种)是非常适用于大型分布式计算的一阶优化方案。梯度下降旨在通过向一个函数当前点(当前的参数值)的负梯度方向移动的方式迭代地找到这个函数的本地最优解。MLlib 以梯度下降作为一个底层的基础算法,在上面开发了各种机器学习算法。梯度下降算法有如下的参数:
-
gradient 这个类是用来计算要被优化的函数的随机梯度(如:相对于单一训练样本当前的参数值)。MLlib 包含常见损失函数 (hinge, logistic, least-squares) 的梯度类。梯度类将训练样本,其标签,以及当前的参数值作为输入值。
-
updater 是在梯度下降的每一次迭代中更新权重的类。MLlib 包含适用于无正则项,L1 正则项和 L2 正则项3种情况下的类。
-
stepSize 是一个表示梯度下降初始步长的数值。MLlib 中所有的更新器第 t 步的步长等于 stepSize / sqrt(t)。
-
numIterations 表示迭代的次数。
-
regParam 是在使用L1,L2 正则项时的正则化参数。
-
miniBatchFraction 是每一次迭代中用来计算梯度的数据百分比。
目前可用的梯度下降算法:
用Scala调用MLLib
下面的代码段可以在spark-shell中运行。
二元分类
下面的代码段演示了如何导入一份样本数据集,使用算法对象中的静态方法在训练集上执行训练算法,在所得的模型上进行预测并计算训练误差。
12345678910111213141516171819202122 |
import org.apache.spark.SparkContextimport org.apache.spark.mllib.classification.SVMWithSGDimport org.apache.spark.mllib.regression.LabeledPoint // Load and parse the data fileval data = sc.textFile("mllib/data/sample_svm_data.txt")val parsedData = data.map { line =>val parts = line.split(' ')LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray)} // Run training algorithm to build the modelval numIterations = 20val model = SVMWithSGD.train(parsedData, numIterations) // Evaluate model on training examples and compute training errorval labelAndPreds = parsedData.map { point =>val prediction = model.predict(point.features)(point.label, prediction)}val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / parsedData.countprintln("Training Error = " + trainErr) |
默认情况下,这个SVMWithSGD.train()方法使用正则参数为 1.0 的 L2 正则项。如果我们想配置这个算法,我们可以通过直接新建一个新的对象,并调用setter的方法,进一步个性化设置SVMWithSGD。所有其他的 MLlib 算法也是通过这样的方法来支持个性化的设置。比如,下面的代码给出了一个正则参数为0.1的 L1 正则化SVM变体,并且让这个训练算法迭代200遍。
1234567 |
importorg.apache.spark.mllib.optimization.L1Updater valsvmAlg=newSVMWithSGD()svmAlg.optimizer.setNumIterations(200).setRegParam(0.1).setUpdater(newL1Updater)valmodelL1=svmAlg.run(parsedData) |
线性回归
下面这个例子演示了如何导入训练集数据,将其解析为带标签点的RDD。然后,使用LinearRegressionWithSGD 算法来建立一个简单的线性模型来预测标签的值。最后我们计算了均方差来评估预测值与实际值的吻合度。
123456789101112131415161718192021 |
import org.apache.spark.mllib.regression.LinearRegressionWithSGDimport org.apache.spark.mllib.regression.LabeledPoint // Load and parse the dataval data = sc.textFile("mllib/data/ridge-data/lpsa.data")val parsedData = data.map { line =>val parts = line.split(',')LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray)} // Building the modelval numIterations = 20val model = LinearRegressionWithSGD.train(parsedData, numIterations) // Evaluate model on training examples and compute training errorval valuesAndPreds = parsedData.map { point =>val prediction = model.predict(point.features)(point.label, prediction)}val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/valuesAndPreds.countprintln("training Mean Squared Error = " + MSE) |
类似的,你也可以使用 RidgeRegressionWithSGD 和 LassoWithSGD 这两个算法,并比较这些算法在训练集上的均方差。
聚类
在下面的例子中,在载入和解析数据之后,我们使用 KMeans 对象来将数据聚类到两个类簇当中。所需的类簇个数会被传递到算法中。然后我们将计算集内均方差总和 (WSSSE). 你可以通过增加类簇的个数k 来减小误差。 实际上,最优的类簇数通常是 1,因为这一点通常是WSSSE图中的 “低谷点”。
1234567891011121314 |
importorg.apache.spark.mllib.clustering.KMeans // Load and parse the datavaldata=sc.textFile("kmeans_data.txt")valparsedData=data.map(_.split(' ').map(_.toDouble)) // Cluster the data into two classes using KMeansvalnumIterations=20valnumClusters=2valclusters=KMeans.train(parsedData,numClusters,numIterations) // Evaluate clustering by computing Within Set Sum of Squared ErrorsvalWSSSE=clusters.computeCost(parsedData)println("Within Set Sum of Squared Errors = "+WSSSE) |
协同过滤
在下面的例子中,我们导入的训练集中,数据每一行由一个用户,一个商品和相应的评分组成。假设评分是显性的,在这种情况下我们使用默认的ALS.train()方法。我们通过计算预测出的评分的均方差来评估这个推荐模型。
12345678910111213141516171819202122232425 |
import org.apache.spark.mllib.recommendation.ALSimport org.apache.spark.mllib.recommendation.Rating // Load and parse the dataval data = sc.textFile("mllib/data/als/test.data")val ratings = data.map(_.split(',') match {case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble)}) // Build the recommendation model using ALSval numIterations = 20val model = ALS.train(ratings, 1, 20, 0.01) // Evaluate the model on rating dataval usersProducts = ratings.map{ case Rating(user, product, rate) => (user, product)}val predictions = model.predict(usersProducts).map{case Rating(user, product, rate) => ((user, product), rate)}val ratesAndPreds = ratings.map{case Rating(user, product, rate) => ((user, product), rate)}.join(predictions)val MSE = ratesAndPreds.map{case ((user, product), (r1, r2)) => math.pow((r1- r2), 2)}.reduce(_ + _)/ratesAndPreds.countprintln("Mean Squared Error = " + MSE) |
如果这个评分矩阵是通过其他的信息来源(如从其他的信号中提取出来的)所获得,你也可以使用trainImplicit的方法来得到更好的结果。
1 |
valmodel=ALS.trainImplicit(ratings,1,20,0.01) |
用Java调用MLLib
所有 MLlib 中的算法都是对Java友好的,因此你可以用在 Scala 中一样的方法来导入和调用这些算法。唯一要注意的是,这些算法的输入值是Scala RDD对象,而在 Spark Java API 中用了分离的JavaRDD类。你可以在你的 JavaRDD对象中调用.rdd()的方法来将Java RDD转化成Scala RDD。
用Python调用MLLib
下面的列子可以在 PySpark shell 中得到测试。
二元分类
下面的代码段表明了如何导入一份样本数据集,使用算法对象中的静态方法在训练集上执行训练算法,在所得的模型上进行预测并计算训练误差。
123456789101112131415 |
from pyspark.mllib.classification import LogisticRegressionWithSGDfrom numpy import array # Load and parse the datadata = sc.textFile("mllib/data/sample_svm_data.txt")parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))model = LogisticRegressionWithSGD.train(parsedData) # Build the modellabelsAndPreds = parsedData.map(lambda point: (int(point.item(0)),model.predict(point.take(range(1, point.size))))) # Evaluating the model on training datatrainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())print("Training Error = " + str(trainErr)) |
线性回归
下面这个例子给出了如何导入训练集数据,将其解析为带标签点的RDD。然后,这个例子使用了LinearRegressionWithSGD 算法来建立一个简单的线性模型来预测标签的值。我们在最后计算了均方差来评估预测值与实际值的吻合度。
123456789101112131415 |
frompyspark.mllib.regressionimportLinearRegressionWithSGDfromnumpyimportarray # Load and parse the datadata=sc.textFile("mllib/data/ridge-data/lpsa.data")parsedData=data.map(lambdaline:array([float(x)forxinline.replace(',',' ').split(' ')])) # Build the modelmodel=LinearRegressionWithSGD.train(parsedData) # Evaluate the model on training datavaluesAndPreds=parsedData.map(lambdapoint:(point.item(0),model.predict(point.take(range(1,point.size)))))MSE=valuesAndPreds.map(lambda(v,p):(v-p)**2).reduce(lambdax,y:x+y)/valuesAndPreds.count()print("Mean Squared Error = "+str(MSE)) |
类似的,你也可以使用 RidgeRegressionWithSGD 和 LassoWithSGD 这两个算法,并比较这些算法在训练集上的均方差。
聚类
在下面的例子中,在载入和解析数据之后,我们使用 KMeans对象来将数据聚类到两个类簇当中。所需的类簇个数被传递到算法中。然后我们将计算集内均方差总和(WSSSE). 你可以通过增加类簇的个数 k来减小误差。 实际上,最优的类簇数通常是 1,因为这一点通常是WSSSE图中的”低谷点”。
12345678910111213141516171819 |
from pyspark.mllib.clustering import KMeansfrom numpy import arrayfrom math import sqrt # Load and parse the datadata = sc.textFile("kmeans_data.txt")parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) # Build the model (cluster the data)clusters = KMeans.train(parsedData, 2, maxIterations=10,runs=30, initialization_mode="random") # Evaluate clustering by computing Within Set Sum of Squared Errorsdef error(point):center = clusters.centers[clusters.predict(point)]return sqrt(sum([x**2 for x in (point - center)])) WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)print("Within Set Sum of Squared Error = " + str(WSSSE)) |
协同过滤
在下面的例子中,我们导入的训练集中,数据每一行由一个用户,一个商品和相应的评分组成。假设评分是显性的,在这种情况下我们使用默认的>ALS.train()方法。我们通过计算预测出的评分的均方差来评估这个推荐模型。
12345678910111213141516 |
frompyspark.mllib.recommendationimportALSfromnumpyimportarray # Load and parse the datadata=sc.textFile("mllib/data/als/test.data")ratings=data.map(lambdaline:array([float(x)forxinline.split(',')])) # Build the recommendation model using Alternating Least Squaresmodel=ALS.train(ratings,1,20) # Evaluate the model on training datatestdata=ratings.map(lambdap:(int(p[0]),int(p[1])))predictions=model.predictAll(testdata).map(lambdar:((r[0],r[1]),r[2]))ratesAndPreds=ratings.map(lambdar:((r[0],r[1]),r[2])).join(predictions)MSE=ratesAndPreds.map(lambdar:(r[1][0]-r[1][1])**2).reduce(lambdax,y:x+y)/ratesAndPreds.count()print("Mean Squared Error = "+str(MSE)) |
如果这个评分矩阵是通过其他的信息来源(如从其他的信号中提取出来的)所获得,你也可以使用trainImplicit的方法来得到更好的结果。
12 |
# Build the recommendation model using Alternating Least Squares based on implicit ratingsmodel=ALS.trainImplicit(ratings,1,20) |