Apache Spark:如何从DataFrame创建矩阵?

时间:2021-05-24 16:52:37

I have a DataFrame in Apache Spark with an array of integers, the source is a set of images. I ultimately want to do PCA on it, but I am having trouble just creating a matrix from my arrays. How do I create a matrix from a RDD?

我在Apache Spark中有一个带有整数数组的DataFrame,源是一组图像。我最终想在它上面做PCA,但是我遇到了麻烦,只是从数组中创建一个矩阵。如何从RDD创建一个矩阵?

> imagerdd = traindf.map(lambda row: map(float, row.image))
> mat = DenseMatrix(numRows=206456, numCols=10, values=imagerdd)
Traceback (most recent call last):

  File "<ipython-input-21-6fdaa8cde069>", line 2, in <module>
mat = DenseMatrix(numRows=206456, numCols=10, values=imagerdd)

  File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 815, in __init__
values = self._convert_to_array(values, np.float64)

  File     "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 806, in _convert_to_array
    return np.asarray(array_like, dtype=dtype)

  File "/usr/local/python/conda/lib/python2.7/site-        packages/numpy/core/numeric.py", line 462, in asarray
    return array(a, dtype, copy=False, order=order)

TypeError: float() argument must be a string or a number

I'm getting the same error from every possible arrangement I can think of:

我能想到的每一种安排都会犯同样的错误:

imagerdd = traindf.map(lambda row: Vectors.dense(row.image))
imagerdd = traindf.map(lambda row: row.image)
imagerdd = traindf.map(lambda row: np.array(row.image))

If I try

如果我尝试

> imagedf = traindf.select("image")
> mat = DenseMatrix(numRows=206456, numCols=10, values=imagedf)

Traceback (most recent call last):

Traceback(最近的电话):

  File "<ipython-input-26-a8cbdad10291>", line 2, in <module>
mat = DenseMatrix(numRows=206456, numCols=10, values=imagedf)

  File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 815, in __init__
    values = self._convert_to_array(values, np.float64)

  File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 806, in _convert_to_array
    return np.asarray(array_like, dtype=dtype)

  File "/usr/local/python/conda/lib/python2.7/site-packages/numpy/core/numeric.py", line 462, in asarray
    return array(a, dtype, copy=False, order=order)

ValueError: setting an array element with a sequence.

1 个解决方案

#1


7  

Since you didn't provide an example input I'll assume it looks more or less like this where id is a row number and image contains values.

由于您没有提供示例输入,所以我假设它看起来或多或少像这样,其中id是行号,而映像包含值。

traindf = sqlContext.createDataFrame([
    (1, [1, 2, 3]),
    (2, [4, 5, 6]),
    (3, (7, 8, 9))
], ("id", "image"))

First thing you have to understand is that the DenseMatrix is a local data structure. To be precise it is a wrapper around numpy.ndarray. As for now (Spark 1.4.1) there are no distributed equivalents in PySpark MLlib.

首先要理解的是,DenseMatrix是一个本地数据结构。准确地说,它是numpi .ndarray的包装器。就目前而言(Spark 1.4.1), PySpark MLlib中没有分布式对等物。

Dense Matrix take three mandatory arguments numRows, numCols, values where values is a local data structure. In your case you have to collect first:

密集矩阵有三个强制性参数numRows, numCols,值是一个本地数据结构。在你的情况下,你必须首先收集:

values = (traindf.
    rdd.
    map(lambda r: (r.id, r.image)). # Extract row id and data
    sortByKey(). # Sort by row id
    flatMap(lambda (id, image): image).
    collect())


ncol = len(traindf.rdd.map(lambda r: r.image).first())
nrow = traindf.count()

dm = DenseMatrix(nrow, ncol, values)

Finally:

最后:

> print dm.toArray()
[[ 1.  4.  7.]
 [ 2.  5.  8.]
 [ 3.  6.  9.]]

Edit:

编辑:

In Spark 1.5+ you can use mllib.linalg.distributed as follows:

在Spark 1.5+中,你可以使用mllib.linalg。分布如下:

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

mat = IndexedRowMatrix(traindf.map(lambda row: IndexedRow(*row)))
mat.numRows()
## 4
mat.numCols()
## 3

although as for now API is still to limited to be useful in practice.

虽然现在的API仍然局限于在实践中有用。

#1


7  

Since you didn't provide an example input I'll assume it looks more or less like this where id is a row number and image contains values.

由于您没有提供示例输入,所以我假设它看起来或多或少像这样,其中id是行号,而映像包含值。

traindf = sqlContext.createDataFrame([
    (1, [1, 2, 3]),
    (2, [4, 5, 6]),
    (3, (7, 8, 9))
], ("id", "image"))

First thing you have to understand is that the DenseMatrix is a local data structure. To be precise it is a wrapper around numpy.ndarray. As for now (Spark 1.4.1) there are no distributed equivalents in PySpark MLlib.

首先要理解的是,DenseMatrix是一个本地数据结构。准确地说,它是numpi .ndarray的包装器。就目前而言(Spark 1.4.1), PySpark MLlib中没有分布式对等物。

Dense Matrix take three mandatory arguments numRows, numCols, values where values is a local data structure. In your case you have to collect first:

密集矩阵有三个强制性参数numRows, numCols,值是一个本地数据结构。在你的情况下,你必须首先收集:

values = (traindf.
    rdd.
    map(lambda r: (r.id, r.image)). # Extract row id and data
    sortByKey(). # Sort by row id
    flatMap(lambda (id, image): image).
    collect())


ncol = len(traindf.rdd.map(lambda r: r.image).first())
nrow = traindf.count()

dm = DenseMatrix(nrow, ncol, values)

Finally:

最后:

> print dm.toArray()
[[ 1.  4.  7.]
 [ 2.  5.  8.]
 [ 3.  6.  9.]]

Edit:

编辑:

In Spark 1.5+ you can use mllib.linalg.distributed as follows:

在Spark 1.5+中,你可以使用mllib.linalg。分布如下:

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

mat = IndexedRowMatrix(traindf.map(lambda row: IndexedRow(*row)))
mat.numRows()
## 4
mat.numCols()
## 3

although as for now API is still to limited to be useful in practice.

虽然现在的API仍然局限于在实践中有用。