Spark:如何“扫描”RDD集合?

时间:2022-05-07 23:12:56

Does Spark have any analog of Scala scan operation to work on RDD collections? (for details please see Reduce, fold or scan (Left/Right)?)

Spark是否有类似于Scala扫描的操作来处理RDD集合?(详情请参阅减少、折叠或扫描(左/右)?)

For example:

例如:

val abc = List("A", "B", "C")

def add(res: String, x: String) = { 
  println(s"op: $res + $x = ${res + x}")
  res + x
} 

So to get:

所以得到:

abc.scanLeft("z")(add)
// op: z + A = zA      // same operations as foldLeft above...
// op: zA + B = zAB
// op: zAB + C = zABC
// res: List[String] = List(z, zA, zAB, zABC) // maps intermediate results

Any other means to achieve the same result?

还有其他方法可以达到同样的结果吗?

Update

更新

What is "Spark" way to solve, for example, the following problem:

什么是“火花”方式来解决,例如,下面的问题:

Compute elements of the vector as (in pseudocode):

计算向量a的元素(伪代码):

x(i) = SomeFun(for k from 0 to i-1)(y(k)) 

Should I collect RDD for this? No other way?

我应该为此收取RDD吗?没有其他方法吗?

Update 2

更新2

Ok, I understand the general problem. Yet maybe you could advise me on the particular case I have to deal with.

好的,我理解一般的问题。也许你可以就我要处理的具体情况给我一些建议。

I have a list of ints as input RDD and I have to build an outptut RDD, where the following should hold:

我有一个输入RDD的ints列表,我需要构建一个outptut RDD,下面应该包含以下内容:

1) input.length == output.length // output list is of the same length as input

2) output(i) = sum( range (0..i), input(i)) / q^i // i-th element of output list equals sum of input elements from 0 to i divided by i-th power of some constant q   

In fact I need a combination of map and fold function to solve this.

实际上我需要一个地图和折叠函数的组合来解决这个问题。

Another idea is to write a recursive fold on diminishing tails of the input list. But this is super inefficient and AFAIK Spark does not have tail or init function for RDD.

另一个想法是在输入列表的递减尾上写一个递归折叠。但这非常低效,而且AFAIK Spark没有针对RDD的tail或init功能。

How would you solve this problem in Sparck?

你将如何解决Sparck的这个问题?

1 个解决方案

#1


2  

You are correct that there does not exist the analog of scan() in the generic RDD.

您是对的,在一般的RDD中不存在类似的scan()。

A potential explantion: Such a method would require access to all elements of the distributed collection to process each element of the generated output collection. before continuing on to the next output element.

一种可能的解释:这种方法需要访问分布式集合的所有元素,以处理生成的输出集合的每个元素。在继续下一个输出元素之前。

So if your input list were say 1 million plus one entries there would be 1 million shuffle operations on the cluster (even though the sorting is not required here - spark gives it for "free" when doing a cluster collect step).

所以,如果你的输入列表是100万+ 1个条目,那么集群上将会有100万次洗牌操作(即使这里不需要排序——spark在执行集群收集步骤时提供“免费”)。

UPDATE OP has expanded the question. Here is response to the expanded question.

更新操作系统已经扩展了这个问题。以下是对扩展问题的回答。

from updated OP:

从OP更新:

x(i) = SomeFun(for k from 0 to i-1)(y(k)) 

You need to distinguish whether x(i) computation - specifically the y(k) function - were going to either:

你需要区分x(i)计算——尤其是y(k)函数——是否会:

  • require access to the entire dataset x(0 .. i -1)
  • 需要访问整个数据集x(0。我1)
  • change the structure of the dataset
  • 改变数据集的结构

on each iteration. That is the case for scan - and given your description it seems to be your purpose. AFAIK this is not supported in Spark. Once again - think if you were developing the distributed framework. How would you achieve same? It does not seem to be a scalable means to achieve - so yes you would need to do that computation in an

在每个迭代。这就是扫描的情况——根据你的描述,这似乎是你的目的。这在Spark中不受支持。再想想,如果您正在开发分布式框架。你如何实现同样的目标?这似乎不是一种可扩展的实现方式——所以您需要在an中进行计算

collect()

invocation against the original RDD and perform it on the Driver.

对原始RDD进行调用并在驱动程序上执行。

#1


2  

You are correct that there does not exist the analog of scan() in the generic RDD.

您是对的,在一般的RDD中不存在类似的scan()。

A potential explantion: Such a method would require access to all elements of the distributed collection to process each element of the generated output collection. before continuing on to the next output element.

一种可能的解释:这种方法需要访问分布式集合的所有元素,以处理生成的输出集合的每个元素。在继续下一个输出元素之前。

So if your input list were say 1 million plus one entries there would be 1 million shuffle operations on the cluster (even though the sorting is not required here - spark gives it for "free" when doing a cluster collect step).

所以,如果你的输入列表是100万+ 1个条目,那么集群上将会有100万次洗牌操作(即使这里不需要排序——spark在执行集群收集步骤时提供“免费”)。

UPDATE OP has expanded the question. Here is response to the expanded question.

更新操作系统已经扩展了这个问题。以下是对扩展问题的回答。

from updated OP:

从OP更新:

x(i) = SomeFun(for k from 0 to i-1)(y(k)) 

You need to distinguish whether x(i) computation - specifically the y(k) function - were going to either:

你需要区分x(i)计算——尤其是y(k)函数——是否会:

  • require access to the entire dataset x(0 .. i -1)
  • 需要访问整个数据集x(0。我1)
  • change the structure of the dataset
  • 改变数据集的结构

on each iteration. That is the case for scan - and given your description it seems to be your purpose. AFAIK this is not supported in Spark. Once again - think if you were developing the distributed framework. How would you achieve same? It does not seem to be a scalable means to achieve - so yes you would need to do that computation in an

在每个迭代。这就是扫描的情况——根据你的描述,这似乎是你的目的。这在Spark中不受支持。再想想,如果您正在开发分布式框架。你如何实现同样的目标?这似乎不是一种可扩展的实现方式——所以您需要在an中进行计算

collect()

invocation against the original RDD and perform it on the Driver.

对原始RDD进行调用并在驱动程序上执行。