pyspark:在spark数据框架中使用spark-ml创建k-means集群模型。

时间:2021-01-24 23:12:01

I am using the following code to create a clustering model:

我使用下面的代码来创建一个集群模型:

import pandas as pd
pandas_df = pd.read_pickle('df_features.pickle')
spark_df = sqlContext.createDataFrame(pandas_df)

from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=2, seed=1.0)
modela = kmeans.fit(spark_df)

Then I got errors:

然后我有错误:

AnalysisException                         Traceback (most recent call last)
<ipython-input-26-00e1e2ba1983> in <module>()
      3 
      4 kmeans = KMeans(k=2, seed=1.0)
----> 5 modela = kmeans.fit(spark_df)

/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/ml/base.pyc in fit(self, dataset, params)
     62                 return self.copy(params)._fit(dataset)
     63             else:
---> 64                 return self._fit(dataset)
     65         else:
     66             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/ml/wrapper.pyc in _fit(self, dataset)
    211 
    212     def _fit(self, dataset):
--> 213         java_model = self._fit_java(dataset)
    214         return self._create_model(java_model)
    215 

/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/ml/wrapper.pyc in _fit_java(self, dataset)
    208         """
    209         self._transfer_params_to_java()
--> 210         return self._java_obj.fit(dataset._jdf)
    211 
    212     def _fit(self, dataset):

/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    931         answer = self.gateway_client.send_command(command)
    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934 
    935         for temp_arg in temp_args:

/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: u"cannot resolve '`features`' given input columns: [field_1, field_2, field_3, field_4, field_5, field_6, field_7];"

Did I create the data frame wrong? Does anyone know what I missed? Thanks!

我是否创建了错误的数据框架?有人知道我错过了什么吗?谢谢!

3 个解决方案

#1


2  

You need to use VectorAssembler http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler

您需要使用VectorAssembler http://spark.apache.org/docs/latest/api/python/pyspark/pyspark.ml.html # pyspark.ml.html .VectorAssembler。

from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=spark_df.columns, outputCol="features")
vector_df = vecAssembler.transform(spark_df)
kmeans = KMeans().setK(n_clusters).setSeed(1)
model = kmeans.fit(vector_df )

#2


0  

For kmeans, it requires an rdd of DenseVectors. So you need to create a rdd of DenseVectors, where each vector corresponds to one row of your dataframe. So supposing that your dataframe has three columns you are feeding into the K Means model, I would refactor it to be along the lines of:

对于kmeans,它需要一个densevector的rdd。因此需要创建一个densevector的rdd,其中每个向量对应于dataframe的一行。假设你的dataframe有三列你正在输入K均值模型,我会将它重构为:

spark_rdd = spark_df.rdd.sortByKey()
modelInput = spark_rdd.map(lambda x: Vectors.dense(x[0],x[1],x[2])).sortByKey()
modelObject = Kmeans.train(modelInput,2)

Then if you want to get the results back from an RDD into a dataframe, I would do something like:

如果你想把结果从RDD返回到dataframe,我会做如下的事情:

labels = modelInput.map(lambda x: model.predict(x))
results = labels.zip(spark_rdd)
resultFrame = results.map(lambda x: Row(Label = x[0], Column1 = x[0][1], Column2 = x[1][1],Column3 = x[1][2]).toDF()

#3


0  

data = [(Vectors.dense( [x[0], x[1]]),) for x in pandas_df.iloc[0:,2:4].values]   
spark_df = spark.createDataFrame(data, ["features"])  

kmeans = KMeans(k=2, seed=1.0)    
modela = kmeans.fit(spark_df)

for more details refer to the official manual

更多细节请参考官方手册。

#1


2  

You need to use VectorAssembler http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler

您需要使用VectorAssembler http://spark.apache.org/docs/latest/api/python/pyspark/pyspark.ml.html # pyspark.ml.html .VectorAssembler。

from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=spark_df.columns, outputCol="features")
vector_df = vecAssembler.transform(spark_df)
kmeans = KMeans().setK(n_clusters).setSeed(1)
model = kmeans.fit(vector_df )

#2


0  

For kmeans, it requires an rdd of DenseVectors. So you need to create a rdd of DenseVectors, where each vector corresponds to one row of your dataframe. So supposing that your dataframe has three columns you are feeding into the K Means model, I would refactor it to be along the lines of:

对于kmeans,它需要一个densevector的rdd。因此需要创建一个densevector的rdd,其中每个向量对应于dataframe的一行。假设你的dataframe有三列你正在输入K均值模型,我会将它重构为:

spark_rdd = spark_df.rdd.sortByKey()
modelInput = spark_rdd.map(lambda x: Vectors.dense(x[0],x[1],x[2])).sortByKey()
modelObject = Kmeans.train(modelInput,2)

Then if you want to get the results back from an RDD into a dataframe, I would do something like:

如果你想把结果从RDD返回到dataframe,我会做如下的事情:

labels = modelInput.map(lambda x: model.predict(x))
results = labels.zip(spark_rdd)
resultFrame = results.map(lambda x: Row(Label = x[0], Column1 = x[0][1], Column2 = x[1][1],Column3 = x[1][2]).toDF()

#3


0  

data = [(Vectors.dense( [x[0], x[1]]),) for x in pandas_df.iloc[0:,2:4].values]   
spark_df = spark.createDataFrame(data, ["features"])  

kmeans = KMeans(k=2, seed=1.0)    
modela = kmeans.fit(spark_df)

for more details refer to the official manual

更多细节请参考官方手册。