Spark:任务不可序列化(广播/RDD/SparkContext)

时间:2022-04-15 23:11:18

There are numerous questions about Task is not serializable in Spark. However, this case seems quite particular.

在Spark中有很多关于任务不能序列化的问题。然而,这个案例似乎相当特殊。

I have created a class:

我创建了一个类:

class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
  val allEs: RDD[(String, E)] = e.map(e => (e.w, e))
    .persist()
  val sc = allEs.sparkContext
  val centroids = sc.broadcast(m.clusterCenters)
  [...]

The class defines the following method:

类定义了以下方法:

private def centroidDistances(v: Vector): Array[Double] = {
  centroids.value.map(c => (centroids.value.indexOf(c), Vectors.sqdist(v, c)))
    .sortBy(_._1)
    .map(_._2)
}

However, when the class is called, a Task is not serializable exception is thrown.

但是,当调用类时,会抛出一个任务不是serializable异常。

Strange enough, a tiny change in the header of class Neighbours suffices to fix the issue. Instead of creating a val sc: SparkContext to use for broadcasting, I merely inline the code that creates the Spark context:

奇怪的是,阶级邻居的头头的微小变化足以解决这个问题。与其创建一个val sc: SparkContext用于广播,我只是内联了创建Spark上下文的代码:

class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
  val allEs: RDD[(String, E)] = e.map(e => (e.w, e))
  .setName("embeddings")
  .persist()
  val centroids = allEmbeddings.sparkContext(m.clusterCenters)
  [...]

My question is: how does the second variant make a difference? What goes wrong in the first one? Intuitively, this should be merely syntactic sugar, is this a bug in Spark?

我的问题是:第二种变体如何产生影响?第一个有什么问题?直觉上,这应该只是语法上的糖,这是火花中的bug吗?

I use Spark 1.4.1 on a Hadoop/Yarn cluster.

我在Hadoop/纱线集群上使用Spark 1.4.1。

1 个解决方案

#1


1  

When you define

当你定义

class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
  ...
  val sc = allEmbeddings.sparkContext
  val centroids = sc.broadcast(m.clusterCenters)
  ...
}

You have made sc into a class variable, meaning it could be accessed from an instance of Neighbours e.g. neighbours.sc. This means that sc needs to be serializable, which is it not.

您已经将sc设置为一个类变量,这意味着可以从邻居的实例(例如neighbor .sc)访问它。这意味着sc需要是可序列化的,事实并非如此。

When you inline the code, only the final value of centroids needs to be serializable. centroids is of type Broadcast which is Serializable.

当您内联代码时,只有centroid的最终值是可序列化的。质心是可串行化的广播类型。

#1


1  

When you define

当你定义

class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
  ...
  val sc = allEmbeddings.sparkContext
  val centroids = sc.broadcast(m.clusterCenters)
  ...
}

You have made sc into a class variable, meaning it could be accessed from an instance of Neighbours e.g. neighbours.sc. This means that sc needs to be serializable, which is it not.

您已经将sc设置为一个类变量,这意味着可以从邻居的实例(例如neighbor .sc)访问它。这意味着sc需要是可序列化的,事实并非如此。

When you inline the code, only the final value of centroids needs to be serializable. centroids is of type Broadcast which is Serializable.

当您内联代码时,只有centroid的最终值是可序列化的。质心是可串行化的广播类型。