I'm new to Scala and Spark. I'm trying to return multiple key-value pairs during the map transformation. My input data is a simple CSV file.
我刚接触Scala和Spark。我尝试在映射转换期间返回多个键值对。我的输入数据是一个简单的CSV文件。
1, 2, 3 4, 5, 6 7, 8, 9
My Scala script looks like the following.
我的Scala脚本如下所示。
class Key(_i:Integer, _j:Integer) {
def i = _i
def j = _j
}
class Val(_x:Double, _y:Double) {
def x = _x
def y = _y
}
val arr = "1,2,3".split(",")
for(i <- 0 until arr.length) {
val x = arr(i).toDouble
for(j <- 0 until arr.length) {
val y = arr(j).toDouble
val k = new Key(i, j)
val v = new Val(x, y)
//note that i want to return the tuples, (k, v)
}
}
I want to be able to use the for loop and data structures above to return multiple tuples (k, v). Something similar to the code below.
我希望能够使用上面的for循环和数据结构来返回多个元组(k, v),类似于下面的代码。
val file = sc.textFile("/path/to/test.csv")
file.map(line => {
val arr = line.split(",")
for(i <- 0 until arr.length) {
val x = arr(i).toDouble
for(j <- (i+1) until arr.length) {
val y = arr(j).toDouble
val k = new Index(i,j)
val v = new Val(x,y)
(k,v)
}
}
}).collect //reduceByKey is not there, reduce is there, but not what i want
When I copy/paste the code above into the lambda expression (and run on the Scala REPL shell) I get the following error:
当我将上述代码复制/粘贴到lambda表达式(并在Scala REPL shell上运行)时,我得到了以下错误:
error: illegal start of simple expression val arr = line.split(",") ^
I realize also that I am still stuck in imperative/procedural style programming thinking, so please bear with me (and a newbie at Scala/Spark).
我也意识到我仍然停留在命令式/过程式编程思维中,所以请耐心等待我(还有一个Scala/Spark的新手)。
2 个解决方案
#1
3
You are forgetting the brackets after the arrow. You can only omit them if it is a simple expression (one expression).
你忘记了箭头后面的括号。只有当它是一个简单的表达式(一个表达式)时,才可以省略它们。
file.map(line => {
//multiple lines of code here
})
Full answer after edits:
编辑后完整的答案:
case class Index(i:Integer, j:Integer)
case class Val(x:Double, y:Double)
val data = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
data.flatMap(line=>{
val arr = line.split(",")
val doubleSeq = for(i <- 0 until arr.length) yield {
val x = arr(i).toDouble
for(j <- (i+1) until arr.length) yield {
val y = arr(j).toDouble
val k = Index(i,j)
val v = Val(x,y)
(k,v)
}
}
doubleSeq.flatten
})
There were a multitude of problems actually:
实际上有很多问题:
- Notice that I changed your classes to be case classes as they are serializable. Otherwise, you will need to implement
Serializable
- 注意,我将类更改为case类,因为它们是可序列化的。否则,您将需要实现Serializable
- I changed
map
toflatMap
, as well asflatten
ed your array as oneflatMap
would still leave you with an inner array. Now, the combination of the two will yield you yourRDD[(Index, Val)]
, which can now be implicitly used withreduceByKey
- 我把map改成了flatMap,并且把你的数组变平了,因为一个flatMap仍然会给你留下一个内部数组。现在,两者的结合将产生您的RDD[(Index, Val)],现在可以隐式地使用reduceByKey
- I turned your
for
loop into afor
comprehension by usingyield
. You were getting a final type ofUnit
because the return type of afor
loop isUnit
- 我用屈服把你的for循环变成了a for comprehension。你得到的是最终类型的单位因为for循环的返回类型是单位
#2
3
Use RDD.flatMap
and yield
a list from the for
loop:
使用抽样。从for循环中产生一个列表:
val file = sc.textFile("/path/to/test.csv")
file.flatMap { line =>
val arr = line.split(",")
for {
i <- 0 until arr.length
j <- (i + 1) until arr.length
} yield {
val x = arr(i).toDouble
val y = arr(j).toDouble
val k = new Index(i, j)
val v = new Val(x, y)
(k, v)
}
}.collect
#1
3
You are forgetting the brackets after the arrow. You can only omit them if it is a simple expression (one expression).
你忘记了箭头后面的括号。只有当它是一个简单的表达式(一个表达式)时,才可以省略它们。
file.map(line => {
//multiple lines of code here
})
Full answer after edits:
编辑后完整的答案:
case class Index(i:Integer, j:Integer)
case class Val(x:Double, y:Double)
val data = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
data.flatMap(line=>{
val arr = line.split(",")
val doubleSeq = for(i <- 0 until arr.length) yield {
val x = arr(i).toDouble
for(j <- (i+1) until arr.length) yield {
val y = arr(j).toDouble
val k = Index(i,j)
val v = Val(x,y)
(k,v)
}
}
doubleSeq.flatten
})
There were a multitude of problems actually:
实际上有很多问题:
- Notice that I changed your classes to be case classes as they are serializable. Otherwise, you will need to implement
Serializable
- 注意,我将类更改为case类,因为它们是可序列化的。否则,您将需要实现Serializable
- I changed
map
toflatMap
, as well asflatten
ed your array as oneflatMap
would still leave you with an inner array. Now, the combination of the two will yield you yourRDD[(Index, Val)]
, which can now be implicitly used withreduceByKey
- 我把map改成了flatMap,并且把你的数组变平了,因为一个flatMap仍然会给你留下一个内部数组。现在,两者的结合将产生您的RDD[(Index, Val)],现在可以隐式地使用reduceByKey
- I turned your
for
loop into afor
comprehension by usingyield
. You were getting a final type ofUnit
because the return type of afor
loop isUnit
- 我用屈服把你的for循环变成了a for comprehension。你得到的是最终类型的单位因为for循环的返回类型是单位
#2
3
Use RDD.flatMap
and yield
a list from the for
loop:
使用抽样。从for循环中产生一个列表:
val file = sc.textFile("/path/to/test.csv")
file.flatMap { line =>
val arr = line.split(",")
for {
i <- 0 until arr.length
j <- (i + 1) until arr.length
} yield {
val x = arr(i).toDouble
val y = arr(j).toDouble
val k = new Index(i, j)
val v = new Val(x, y)
(k, v)
}
}.collect