如何使用Spark映射转换在Scala中返回多个键-值对?

时间:2021-08-29 23:11:39

I'm new to Scala and Spark. I'm trying to return multiple key-value pairs during the map transformation. My input data is a simple CSV file.

我刚接触Scala和Spark。我尝试在映射转换期间返回多个键值对。我的输入数据是一个简单的CSV文件。

1, 2, 3
4, 5, 6
7, 8, 9

My Scala script looks like the following.

我的Scala脚本如下所示。

class Key(_i:Integer, _j:Integer) {
 def i = _i
 def j = _j
}
class Val(_x:Double, _y:Double) {
 def x = _x
 def y = _y
}
val arr = "1,2,3".split(",")
for(i <- 0 until arr.length) {
 val x = arr(i).toDouble
 for(j <- 0 until arr.length) {
  val y = arr(j).toDouble
  val k = new Key(i, j)
  val v = new Val(x, y)
  //note that i want to return the tuples, (k, v)
 }
}

I want to be able to use the for loop and data structures above to return multiple tuples (k, v). Something similar to the code below.

我希望能够使用上面的for循环和数据结构来返回多个元组(k, v),类似于下面的代码。

val file = sc.textFile("/path/to/test.csv")
file.map(line => {
 val arr = line.split(",")
 for(i <- 0 until arr.length) {
  val x = arr(i).toDouble
  for(j <- (i+1) until arr.length) {
   val y = arr(j).toDouble
   val k = new Index(i,j)
   val v = new Val(x,y)
   (k,v)
  }
 }
}).collect //reduceByKey is not there, reduce is there, but not what i want

When I copy/paste the code above into the lambda expression (and run on the Scala REPL shell) I get the following error:

当我将上述代码复制/粘贴到lambda表达式(并在Scala REPL shell上运行)时,我得到了以下错误:

error: illegal start of simple expression
val arr = line.split(",")
^

I realize also that I am still stuck in imperative/procedural style programming thinking, so please bear with me (and a newbie at Scala/Spark).

我也意识到我仍然停留在命令式/过程式编程思维中,所以请耐心等待我(还有一个Scala/Spark的新手)。

2 个解决方案

#1


3  

You are forgetting the brackets after the arrow. You can only omit them if it is a simple expression (one expression).

你忘记了箭头后面的括号。只有当它是一个简单的表达式(一个表达式)时,才可以省略它们。

file.map(line => {
    //multiple lines of code here
})

Full answer after edits:

编辑后完整的答案:

case class Index(i:Integer, j:Integer)
case class Val(x:Double, y:Double)

val data = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
data.flatMap(line=>{
val arr = line.split(",")
 val doubleSeq = for(i <- 0 until arr.length) yield {
  val x = arr(i).toDouble
  for(j <- (i+1) until arr.length) yield {
   val y = arr(j).toDouble
   val k = Index(i,j)
   val v = Val(x,y)
   (k,v)
  }
 }
 doubleSeq.flatten
})

There were a multitude of problems actually:

实际上有很多问题:

  • Notice that I changed your classes to be case classes as they are serializable. Otherwise, you will need to implement Serializable
  • 注意,我将类更改为case类,因为它们是可序列化的。否则,您将需要实现Serializable
  • I changed map to flatMap, as well as flattened your array as one flatMap would still leave you with an inner array. Now, the combination of the two will yield you your RDD[(Index, Val)], which can now be implicitly used with reduceByKey
  • 我把map改成了flatMap,并且把你的数组变平了,因为一个flatMap仍然会给你留下一个内部数组。现在,两者的结合将产生您的RDD[(Index, Val)],现在可以隐式地使用reduceByKey
  • I turned your for loop into a for comprehension by using yield. You were getting a final type of Unit because the return type of a for loop is Unit
  • 我用屈服把你的for循环变成了a for comprehension。你得到的是最终类型的单位因为for循环的返回类型是单位

#2


3  

Use RDD.flatMap and yield a list from the for loop:

使用抽样。从for循环中产生一个列表:

val file = sc.textFile("/path/to/test.csv")
file.flatMap { line =>
  val arr = line.split(",")
  for {
    i <- 0 until arr.length
    j <- (i + 1) until arr.length
  } yield {
    val x = arr(i).toDouble
    val y = arr(j).toDouble
    val k = new Index(i, j)
    val v = new Val(x, y)
    (k, v)
  }
}.collect

#1


3  

You are forgetting the brackets after the arrow. You can only omit them if it is a simple expression (one expression).

你忘记了箭头后面的括号。只有当它是一个简单的表达式(一个表达式)时,才可以省略它们。

file.map(line => {
    //multiple lines of code here
})

Full answer after edits:

编辑后完整的答案:

case class Index(i:Integer, j:Integer)
case class Val(x:Double, y:Double)

val data = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
data.flatMap(line=>{
val arr = line.split(",")
 val doubleSeq = for(i <- 0 until arr.length) yield {
  val x = arr(i).toDouble
  for(j <- (i+1) until arr.length) yield {
   val y = arr(j).toDouble
   val k = Index(i,j)
   val v = Val(x,y)
   (k,v)
  }
 }
 doubleSeq.flatten
})

There were a multitude of problems actually:

实际上有很多问题:

  • Notice that I changed your classes to be case classes as they are serializable. Otherwise, you will need to implement Serializable
  • 注意,我将类更改为case类,因为它们是可序列化的。否则,您将需要实现Serializable
  • I changed map to flatMap, as well as flattened your array as one flatMap would still leave you with an inner array. Now, the combination of the two will yield you your RDD[(Index, Val)], which can now be implicitly used with reduceByKey
  • 我把map改成了flatMap,并且把你的数组变平了,因为一个flatMap仍然会给你留下一个内部数组。现在,两者的结合将产生您的RDD[(Index, Val)],现在可以隐式地使用reduceByKey
  • I turned your for loop into a for comprehension by using yield. You were getting a final type of Unit because the return type of a for loop is Unit
  • 我用屈服把你的for循环变成了a for comprehension。你得到的是最终类型的单位因为for循环的返回类型是单位

#2


3  

Use RDD.flatMap and yield a list from the for loop:

使用抽样。从for循环中产生一个列表:

val file = sc.textFile("/path/to/test.csv")
file.flatMap { line =>
  val arr = line.split(",")
  for {
    i <- 0 until arr.length
    j <- (i + 1) until arr.length
  } yield {
    val x = arr(i).toDouble
    val y = arr(j).toDouble
    val k = new Index(i, j)
    val v = new Val(x, y)
    (k, v)
  }
}.collect