如何以编程方式提交spark作业

时间:2021-04-01 15:22:55

While submitting the application with spark-submit it works.

在使用spark-submit提交应用程序时,它可以正常工作。

But while trying to submit it Programmatically using the command below

但在尝试使用以下命令以编程方式提交时

mvn exec:java -Dexec.mainClass="org.cybergen.SubmitJobExample" -Dexec.args="/opt/spark/current/README.md Please"

Am getting the following error while trying to do so

尝试这样做时出现以下错误

Application Log

15/05/12 17:19:46 INFO AppClient$ClientActor: Connecting to master spark://cyborg:7077...
15/05/12 17:19:46 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkMaster@cyborg:7077] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
15/05/12 17:20:06 INFO AppClient$ClientActor: Connecting to master spark://cyborg:7077...
15/05/12 17:20:06 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkMaster@cyborg:7077] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

Spark Master Log

Spark Master Log

15/05/12 17:33:22 ERROR EndpointWriter: AssociationError [akka.tcp://sparkMaster@cyborg:7077] <- [akka.tcp://sparkDriver@10.18.26.116:49592]: Error [org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 7674242335164700840, local class serialVersionUID = 2596819202403185464] [
java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 7674242335164700840, local class serialVersionUID = 2596819202403185464
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
    at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
    at scala.util.Try$.apply(Try.scala:161)
    at akka.serialization.Serialization.deserialize(Serialization.scala:98)
    at akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:63)
    at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
    at scala.util.Try$.apply(Try.scala:161)
    at akka.serialization.Serialization.deserialize(Serialization.scala:98)
    at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
    at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
    at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)
    at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76)
    at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
]
15/05/12 17:33:22 INFO Master: akka.tcp://sparkDriver@10.18.26.116:49592 got disassociated, removing it.
15/05/12 17:33:22 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@10.18.26.116:49592] has failed, address is now gated for [5000] ms. Reason is: [org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 7674242335164700840, local class serialVersionUID = 2596819202403185464].
15/05/12 17:33:22 INFO LocalActorRef: Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://sparkMaster/deadLetters] to Actor[akka://sparkMaster/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsparkDriver%4010.18.26.116%3A49592-6/endpointWriter/endpointReader-akka.tcp%3A%2F%2FsparkDriver%4010.18.26.116%3A49592-0#1749840468] was not delivered. [10] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
15/05/12 17:33:22 INFO LocalActorRef: Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://sparkMaster/deadLetters] to Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%40127.0.0.1%3A50366-7#-1224275483] was not delivered. [11] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
15/05/12 17:33:42 ERROR EndpointWriter: AssociationError [akka.tcp://sparkMaster@cyborg:7077] <- [akka.tcp://sparkDriver@10.18.26.116:49592]: Error [org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 7674242335164700840, local class serialVersionUID = 2596819202403185464] [
java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 7674242335164700840, local class serialVersionUID = 2596819202403185464
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)

SparkTestJob : is the spark job class

SparkTestJob:是spark工作类

class SparkTestJob(val filePath:String="",val filter:String ="") extends Serializable{
  def runWordCount() :Long = {
  val conf = new SparkConf()
    .setAppName("word count for the word"+filter)
    .setMaster("spark://cyborg:7077")
    .setJars(Seq("/tmp/spark-example-1.0-SNAPSHOT-driver.jar"))
    .setSparkHome("/opt/spark/current")
  val sc = new SparkContext(conf)
  val file = sc.textFile(filePath)
  file.filter(line => line.contains(filter)).count()
 }
}

SubmitJobExample is the object which initiates the SparkTestJob Class

SubmitJobExample是启动SparkTestJob类的对象

object SubmitJobExample {

  def main(args: Array[String]):Unit={
    if(args.length==2){
      val fileName = args(0)
      val filterByWord = args(1)
      println("Reading file "+fileName+" for word "+filterByWord)
      val jobObject = new SparkTestJob(fileName,filterByWord)
      println("word count for the file "+fileName+" is "+jobObject.runWordCount())
    }else{
      val jobObject = new SparkTestJob("/opt/spark/current/README.md","Please")
      println("word count for the file /opt/spark/current/README.md is "+jobObject.runWordCount())
    }
  }
}

2 个解决方案

#1


The Actual problem was a mismatch of spark versions by one of the dependencies Changing all the dependencies to the same spark version Fixed the problem.

实际问题是其中一个依赖项的spark版本不匹配将所有依赖项更改为相同的spark版本修复了问题。

Reason why it worked while performing spark-submit is because of the java JAR-class-path precedence which used the correct spark jar version.

执行spark-submit时它工作的原因是因为java JAR-class-path优先级使用了正确的spark jar版本。

#2


Your code looks fine to me. To debug serialization problems run with -Dsun.io.serialization.extendedDebugInfo=true. This will print extra output upon NotSerializableException and you will see what it's trying to serialize.

你的代码看起来很好。要调试使用-Dsun.io.serialization.extendedDebugInfo = true运行的序列化问题。这将在NotSerializableException上打印额外输出,您将看到它尝试序列化的内容。

#1


The Actual problem was a mismatch of spark versions by one of the dependencies Changing all the dependencies to the same spark version Fixed the problem.

实际问题是其中一个依赖项的spark版本不匹配将所有依赖项更改为相同的spark版本修复了问题。

Reason why it worked while performing spark-submit is because of the java JAR-class-path precedence which used the correct spark jar version.

执行spark-submit时它工作的原因是因为java JAR-class-path优先级使用了正确的spark jar版本。

#2


Your code looks fine to me. To debug serialization problems run with -Dsun.io.serialization.extendedDebugInfo=true. This will print extra output upon NotSerializableException and you will see what it's trying to serialize.

你的代码看起来很好。要调试使用-Dsun.io.serialization.extendedDebugInfo = true运行的序列化问题。这将在NotSerializableException上打印额外输出,您将看到它尝试序列化的内容。