提交任务到spark master -- 分布式计算系统spark学习(四)

时间:2023-01-09 20:28:31

部署暂时先用默认配置,我们来看看如何提交计算程序到spark上面。

 

拿官方的Python的测试程序搞一下。

qpzhang@qpzhangdeMac-mini:~/project/spark-1.3.0-bin-hadoop2.4 $cat examples/SimpleApp.py 
"""SimpleApp.py"""
from pyspark import SparkContext

logFile = "./README.md"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()

print "Lines with a: %i, lines with b: %i" % (numAs, numBs)

执行结果如下:

qpzhang@qpzhangdeMac-mini:~/project/spark-1.3.0-bin-hadoop2.4 $./bin/spark-submit --master spark://qpzhangdeMac-mini.local:7077 ./examples/SimpleApp.py 
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/03/25 16:46:20 INFO SparkContext: Running Spark version 1.3.0
15/03/25 16:46:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/03/25 16:46:21 INFO SecurityManager: Changing view acls to: qpzhang
15/03/25 16:46:21 INFO SecurityManager: Changing modify acls to: qpzhang
15/03/25 16:46:21 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(qpzhang); users with modify permissions: Set(qpzhang)
15/03/25 16:46:21 INFO Slf4jLogger: Slf4jLogger started
15/03/25 16:46:21 INFO Remoting: Starting remoting
15/03/25 16:46:21 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.60.215.93:51960]
15/03/25 16:46:21 INFO Utils: Successfully started service 'sparkDriver' on port 51960.
15/03/25 16:46:21 INFO SparkEnv: Registering MapOutputTracker
15/03/25 16:46:21 INFO SparkEnv: Registering BlockManagerMaster
15/03/25 16:46:21 INFO DiskBlockManager: Created local directory at /var/folders/2l/195zcc1n0sn2wjfjwf9hl9d80000gn/T/spark-14e34274-f17f-4609-b285-5bd983c1d53e/blockmgr-f09ceb14-d287-495b-9d90-b6b3d2fdc95f 15/03/25 16:46:21 INFO MemoryStore: MemoryStore started with capacity 530.0 MB
15/03/25 16:46:21 INFO HttpFileServer: HTTP File server directory is /var/folders/2l/195zcc1n0sn2wjfjwf9hl9d80000gn/T/spark-e1581a53-049f-4076-b61b-a2cc271f6adb/httpd-d84dd7f6-4702-4160-ac98-feda26bb0938
15/03/25 16:46:21 INFO HttpServer: Starting HTTP Server 15/03/25 16:46:21 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/25 16:46:21 INFO AbstractConnector: Started SocketConnector@0.0.0.0:51961
15/03/25 16:46:21 INFO Utils: Successfully started service 'HTTP file server' on port 51961.
15/03/25 16:46:21 INFO SparkEnv: Registering OutputCommitCoordinator
15/03/25 16:46:21 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/25 16:46:21 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/03/25 16:46:21 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/03/25 16:46:21 INFO SparkUI: Started SparkUI at http://10.60.215.93:4040
15/03/25 16:46:22 INFO Utils: Copying /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py to /var/folders/2l/195zcc1n0sn2wjfjwf9hl9d80000gn/T/spark-aff816e3-f65f-46a3-9b7b-8a761a8705a4/userFiles-ca491e45-070a-4599-b9c0-7268b32c68fe/SimpleApp.py
15/03/25 16:46:22 INFO SparkContext: Added file file:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py at file:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py with timestamp 1427273182014
15/03/25 16:46:22 INFO Executor: Starting executor ID <driver> on host localhost 15/03/25 16:46:22 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@10.60.215.93:51960/user/HeartbeatReceiver
15/03/25 16:46:22 INFO NettyBlockTransferService: Server created on 51962
15/03/25 16:46:22 INFO BlockManagerMaster: Trying to register BlockManager
15/03/25 16:46:22 INFO BlockManagerMasterActor: Registering block manager localhost:51962 with 530.0 MB RAM, BlockManagerId(<driver>, localhost, 51962)
15/03/25 16:46:22 INFO BlockManagerMaster: Registered BlockManager
15/03/25 16:46:22 INFO MemoryStore: ensureFreeSpace(211311) called with curMem=0, maxMem=555755765
15/03/25 16:46:22 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 206.4 KB, free 529.8 MB)
15/03/25 16:46:22 INFO MemoryStore: ensureFreeSpace(31262) called with curMem=211311, maxMem=555755765
15/03/25 16:46:22 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 30.5 KB, free 529.8 MB)
15/03/25 16:46:22 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:51962 (size: 30.5 KB, free: 530.0 MB)
15/03/25 16:46:22 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/03/25 16:46:22 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2
15/03/25 16:46:22 INFO FileInputFormat: Total input paths to process : 1
15/03/25 16:46:22 INFO SparkContext: Starting job: count at /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py:8
15/03/25 16:46:22 INFO DAGScheduler: Got job 0 (count at /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py:8) with 1 output partitions (allowLocal=false)
15/03/25 16:46:22 INFO DAGScheduler: Final stage: Stage 0(count at /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py:8)
15/03/25 16:46:22 INFO DAGScheduler: Parents of final stage: List()
15/03/25 16:46:22 INFO DAGScheduler: Missing parents: List()
15/03/25 16:46:22 INFO DAGScheduler: Submitting Stage 0 (PythonRDD[2] at count at /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py:8), which has no missing parents
15/03/25 16:46:22 INFO MemoryStore: ensureFreeSpace(6080) called with curMem=242573, maxMem=555755765
15/03/25 16:46:22 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.9 KB, free 529.8 MB)
15/03/25 16:46:22 INFO MemoryStore: ensureFreeSpace(4460) called with curMem=248653, maxMem=555755765
15/03/25 16:46:22 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.4 KB, free 529.8 MB)
15/03/25 16:46:22 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:51962 (size: 4.4 KB, free: 530.0 MB)
15/03/25 16:46:22 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/03/25 16:46:22 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839
15/03/25 16:46:22 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (PythonRDD[2] at count at /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py:8)
15/03/25 16:46:22 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/03/25 16:46:22 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1414 bytes)
15/03/25 16:46:22 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/03/25 16:46:22 INFO Executor: Fetching file:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py with timestamp 1427273182014
15/03/25 16:46:22 INFO Utils: /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py has been previously copied to /var/folders/2l/195zcc1n0sn2wjfjwf9hl9d80000gn/T/spark-aff816e3-f65f-46a3-9b7b-8a761a8705a4/userFiles-ca491e45-070a-4599-b9c0-7268b32c68fe/SimpleApp.py
15/03/25 16:46:23 INFO CacheManager: Partition rdd_1_0 not found, computing it
15/03/25 16:46:23 INFO HadoopRDD: Input split: file:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/README.md:0+3629
15/03/25 16:46:23 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/03/25 16:46:23 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/03/25 16:46:23 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/03/25 16:46:23 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/03/25 16:46:23 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/03/25 16:46:23 INFO MemoryStore: ensureFreeSpace(3021) called with curMem=253113, maxMem=555755765
15/03/25 16:46:23 INFO MemoryStore: Block rdd_1_0 stored as bytes in memory (estimated size 3.0 KB, free 529.8 MB)
15/03/25 16:46:23 INFO BlockManagerInfo: Added rdd_1_0 in memory on localhost:51962 (size: 3.0 KB, free: 530.0 MB)
15/03/25 16:46:23 INFO BlockManagerMaster: Updated info of block rdd_1_0
15/03/25 16:46:23 INFO PythonRDD: Times: total = 1094, boot = 977, init = 116, finish = 1
15/03/25 16:46:23 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2439 bytes result sent to driver 15/03/25 16:46:24 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1219 ms on localhost (1/1)
15/03/25 16:46:24 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/03/25 16:46:24 INFO DAGScheduler: Stage 0 (count at /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py:8) finished in 1.236 s
15/03/25 16:46:24 INFO DAGScheduler: Job 0 finished: count at /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py:8, took 1.303672 s
15/03/25 16:46:24 INFO SparkContext: Starting job: count at /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py:9
15/03/25 16:46:24 INFO DAGScheduler: Got job 1 (count at /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py:9) with 1 output partitions (allowLocal=false)
15/03/25 16:46:24 INFO DAGScheduler: Final stage: Stage 1(count at /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py:9)
15/03/25 16:46:24 INFO DAGScheduler: Parents of final stage: List()
15/03/25 16:46:24 INFO DAGScheduler: Missing parents: List()
15/03/25 16:46:24 INFO DAGScheduler: Submitting Stage 1 (PythonRDD[3] at count at /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py:9), which has no missing parents
15/03/25 16:46:24 INFO MemoryStore: ensureFreeSpace(6080) called with curMem=256134, maxMem=555755765
15/03/25 16:46:24 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 5.9 KB, free 529.8 MB)
15/03/25 16:46:24 INFO MemoryStore: ensureFreeSpace(4460) called with curMem=262214, maxMem=555755765
15/03/25 16:46:24 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.4 KB, free 529.8 MB)
15/03/25 16:46:24 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:51962 (size: 4.4 KB, free: 530.0 MB)
15/03/25 16:46:24 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
15/03/25 16:46:24 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839
15/03/25 16:46:24 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (PythonRDD[3] at count at /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py:9)
15/03/25 16:46:24 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
15/03/25 16:46:24 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1414 bytes)
15/03/25 16:46:24 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
15/03/25 16:46:24 INFO BlockManager: Found block rdd_1_0 locally
15/03/25 16:46:24 INFO PythonRDD: Times: total = 12, boot = 1, init = 10, finish = 1
15/03/25 16:46:24 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1870 bytes result sent to driver
15/03/25 16:46:24 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 26 ms on localhost (1/1)
15/03/25 16:46:24 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15/03/25 16:46:24 INFO DAGScheduler: Stage 1 (count at /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py:9) finished in 0.028 s
15/03/25 16:46:24 INFO DAGScheduler: Job 1 finished: count at /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/SimpleApp.py:9, took 0.048193 s
Lines with a: 60, lines with b: 29

这里我起了两个worker,但是只是从运行日志看,没有看到分布式的影子。

强制加上 --deploy-mode cluster  参数,发现Python不支持

qpzhang@qpzhangdeMac-mini:~/project/spark-1.3.0-bin-hadoop2.4 $./bin/spark-submit --master spark://qpzhangdeMac-mini.local:7077 --deploy-mode cluster  ./examples/SimpleApp.py 
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Error: Cluster deploy mode is currently not supported for python applications on standalone clusters.
Run with --help for usage help or --verbose for debug output
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

 

换个java的来试试。

这里用的是 https://github.com/databricks/learning-spark/tree/master/mini-complete-example 里面的java  word count 的例子。

 

qpzhang@qpzhangdeMac-mini:~/project/spark-1.3.0-bin-hadoop2.4/examples/mini-complete-example $mvn clean && mvn compile && mvn package
[INFO] Scanning for projects...
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building example 0.0.1
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ learning-spark-mini-example ---

//中间下载一坨坨的jar包

[INFO] Building jar: /Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/examples/mini-complete-example/target/learning-spark-mini-example-0.0.1.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:25 min
[INFO] Finished at: 2015-03-25T16:32:55+08:00
[INFO] Final Memory: 14M/184M
[INFO] ------------------------------------------------------------------------

最终在target目录下生产编译好的jar包:

qpzhang@qpzhangdeMac-mini:~/project/spark-1.3.0-bin-hadoop2.4/examples/mini-complete-example $tree
.
├── pom.xml
├── src
│   └── main
│       └── java
│           └── com
│               └── oreilly
│                   └── learningsparkexamples
│                       └── mini
│                           └── java
│                               └── WordCount.java
└── target
    ├── classes
    │   └── com
    │       └── oreilly
    │           └── learningsparkexamples
    │               └── mini
    │                   └── java
    │                       ├── WordCount$1.class
    │                       ├── WordCount$2.class
    │                       ├── WordCount$3.class
    │                       └── WordCount.class
    ├── generated-sources
    │   └── annotations
    ├── learning-spark-mini-example-0.0.1.jar
    ├── maven-archiver
    │   └── pom.properties
    └── maven-status
        └── maven-compiler-plugin
            └── compile
                └── default-compile
                    ├── createdFiles.lst
                    └── inputFiles.lst

用Python同样的命令执行,发现任务失败,查看日志,原来是找不到文件:

qpzhang@qpzhangdeMac-mini:~/project/spark-1.3.0-bin-hadoop2.4 $./bin/spark-submit  --master spark://qpzhangdeMac-mini.local:7077 --deploy-mode cluster --class com.oreilly.learningsparkexamples.mini.java.WordCount ./ex
amples/mini-complete-example/target/learning-spark-mini-example-0.0.1.jar ./README.md ./wordcounts
Launch Command: "/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/bin/java" "-cp" ":/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/conf:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.3.0-hadoop2.4.0.jar:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar" "-Dakka.loglevel=WARNING" "-Dspark.driver.supervise=false" "-Dspark.app.name=com.oreilly.learningsparkexamples.mini.java.WordCount" "-Dspark.akka.askTimeout=10" "-Dspark.jars=file:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/mini-complete-example/target/learning-spark-mini-example-0.0.1.jar" "-Dspark.driver.memory=1g" "-Dspark.master=spark://qpzhangdeMac-mini.local:7077" "-Xms1024M" "-Xmx1024M" "org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp://sparkWorker@10.60.215.93:50033/user/Worker" "/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/work/driver-20150325174458-0000/learning-spark-mini-example-0.0.1.jar" "com.oreilly.learningsparkexamples.mini.java.WordCount" "./README.md" "./wordcounts"
========================================

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/03/25 17:45:00 INFO SecurityManager: Changing view acls to: qpzhang
15/03/25 17:45:00 INFO SecurityManager: Changing modify acls to: qpzhang
15/03/25 17:45:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(qpzhang); users with modify permissions: Set(qpzhang)
15/03/25 17:45:00 INFO Slf4jLogger: Slf4jLogger started
15/03/25 17:45:00 INFO Utils: Successfully started service 'Driver' on port 52168.
15/03/25 17:45:00 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@10.60.215.93:50033/user/Worker
15/03/25 17:45:00 INFO SparkContext: Running Spark version 1.3.0
15/03/25 17:45:00 INFO SecurityManager: Changing view acls to: qpzhang
15/03/25 17:45:00 INFO SecurityManager: Changing modify acls to: qpzhang
15/03/25 17:45:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(qpzhang); users with modify permissions: Set(qpzhang)
15/03/25 17:45:00 INFO Slf4jLogger: Slf4jLogger started
15/03/25 17:45:00 INFO Utils: Successfully started service 'sparkDriver' on port 52169.
15/03/25 17:45:00 INFO SparkEnv: Registering MapOutputTracker
15/03/25 17:45:00 INFO SparkEnv: Registering BlockManagerMaster
15/03/25 17:45:01 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@10.60.215.93:50033/user/Worker
15/03/25 17:45:01 INFO DiskBlockManager: Created local directory at /var/folders/2l/195zcc1n0sn2wjfjwf9hl9d80000gn/T/spark-d86b1f0c-04a9-4965-a4bd-e08a091c5252/blockmgr-133f4563-2eb8-4898-928a-819d5869932d
15/03/25 17:45:01 INFO MemoryStore: MemoryStore started with capacity 530.0 MB
15/03/25 17:45:01 INFO HttpFileServer: HTTP File server directory is /var/folders/2l/195zcc1n0sn2wjfjwf9hl9d80000gn/T/spark-74895413-78e4-4d77-9db1-d6768acc6464/httpd-1e9b9243-d3e7-49b6-bdf6-cb2a6eff6527
15/03/25 17:45:01 INFO HttpServer: Starting HTTP Server
15/03/25 17:45:01 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/25 17:45:01 INFO AbstractConnector: Started SocketConnector@0.0.0.0:52171
15/03/25 17:45:01 INFO Utils: Successfully started service 'HTTP file server' on port 52171.
15/03/25 17:45:01 INFO SparkEnv: Registering OutputCommitCoordinator
15/03/25 17:45:01 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/25 17:45:01 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/03/25 17:45:01 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/03/25 17:45:01 INFO SparkUI: Started SparkUI at http://10.60.215.93:4040
15/03/25 17:45:01 INFO SparkContext: Added JAR file:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/mini-complete-example/target/learning-spark-mini-example-0.0.1.jar at http://10.60.215.93:52171/jars/learning-spark-mini-example-0.0.1.jar with timestamp 1427276701410
15/03/25 17:45:01 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkMaster@qpzhangdeMac-mini.local:7077/user/Master...
15/03/25 17:45:01 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150325174501-0000
15/03/25 17:45:01 INFO AppClient$ClientActor: Executor added: app-20150325174501-0000/0 on worker-20150325161345-10.60.215.93-50033 (10.60.215.93:50033) with 7 cores
15/03/25 17:45:01 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150325174501-0000/0 on hostPort 10.60.215.93:50033 with 7 cores, 512.0 MB RAM
15/03/25 17:45:01 INFO AppClient$ClientActor: Executor added: app-20150325174501-0000/1 on worker-20150325013207-10.0.2.15-48618 (10.0.2.15:48618) with 1 cores
15/03/25 17:45:01 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150325174501-0000/1 on hostPort 10.0.2.15:48618 with 1 cores, 512.0 MB RAM
15/03/25 17:45:01 INFO AppClient$ClientActor: Executor updated: app-20150325174501-0000/0 is now RUNNING
15/03/25 17:45:01 INFO AppClient$ClientActor: Executor updated: app-20150325174501-0000/1 is now RUNNING
15/03/25 17:45:01 INFO AppClient$ClientActor: Executor updated: app-20150325174501-0000/0 is now LOADING
15/03/25 17:45:01 INFO NettyBlockTransferService: Server created on 52173
15/03/25 17:45:01 INFO BlockManagerMaster: Trying to register BlockManager
15/03/25 17:45:01 INFO BlockManagerMasterActor: Registering block manager 10.60.215.93:52173 with 530.0 MB RAM, BlockManagerId(<driver>, 10.60.215.93, 52173)
15/03/25 17:45:01 INFO BlockManagerMaster: Registered BlockManager
15/03/25 17:45:02 INFO AppClient$ClientActor: Executor updated: app-20150325174501-0000/1 is now LOADING
15/03/25 17:45:02 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
15/03/25 17:45:02 INFO MemoryStore: ensureFreeSpace(159118) called with curMem=0, maxMem=555755765
15/03/25 17:45:02 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 155.4 KB, free 529.9 MB)
15/03/25 17:45:02 INFO MemoryStore: ensureFreeSpace(22692) called with curMem=159118, maxMem=555755765
15/03/25 17:45:02 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.2 KB, free 529.8 MB)
15/03/25 17:45:02 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.60.215.93:52173 (size: 22.2 KB, free: 530.0 MB)
15/03/25 17:45:02 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/03/25 17:45:02 INFO SparkContext: Created broadcast 0 from textFile at WordCount.java:31
Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:59)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/work/driver-20150325174458-0000/README.md

这里的原因是因为,执行jar包的路径和提交的路径是不一样的。从日志可以看到执行任务的路径:file:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/work/driver-20150325174458-0000/

到这里又不得不提分布式计算的原理了,master把可执行程序(jar包,so等),分发到各个worker(从spark的日志揣测,driver是启动了一个httpserver,供slaver来下载)。

这样我们把输入的文件换成绝对路径就可以了

Launch Command: "/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/bin/java" "-cp" ":/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/conf:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.3.0-hadoop2.4.0.jar:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar" "-Dakka.loglevel=WARNING" "-Dspark.driver.supervise=false" "-Dspark.app.name=com.oreilly.learningsparkexamples.mini.java.WordCount" "-Dspark.akka.askTimeout=10" "-Dspark.jars=file:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/mini-complete-example/target/learning-spark-mini-example-0.0.1.jar" "-Dspark.driver.memory=1g" "-Dspark.master=spark://qpzhangdeMac-mini.local:7077" "-Xms1024M" "-Xmx1024M" "org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp://sparkWorker@10.60.215.93:50033/user/Worker" "/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/work/driver-20150325175556-0001/learning-spark-mini-example-0.0.1.jar" "com.oreilly.learningsparkexamples.mini.java.WordCount" "/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/README.md" "/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/wordcount.txt"
========================================

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/03/25 17:55:58 INFO SecurityManager: Changing view acls to: qpzhang
15/03/25 17:55:58 INFO SecurityManager: Changing modify acls to: qpzhang
15/03/25 17:55:58 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(qpzhang); users with modify permissions: Set(qpzhang)
15/03/25 17:55:58 INFO Slf4jLogger: Slf4jLogger started
15/03/25 17:55:58 INFO Utils: Successfully started service 'Driver' on port 52220.
15/03/25 17:55:58 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@10.60.215.93:50033/user/Worker
15/03/25 17:55:58 INFO SparkContext: Running Spark version 1.3.0
15/03/25 17:55:59 INFO SecurityManager: Changing view acls to: qpzhang
15/03/25 17:55:59 INFO SecurityManager: Changing modify acls to: qpzhang
15/03/25 17:55:59 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(qpzhang); users with modify permissions: Set(qpzhang)
15/03/25 17:55:59 INFO Slf4jLogger: Slf4jLogger started
15/03/25 17:55:59 INFO Utils: Successfully started service 'sparkDriver' on port 52222.
15/03/25 17:55:59 INFO SparkEnv: Registering MapOutputTracker
15/03/25 17:55:59 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@10.60.215.93:50033/user/Worker
15/03/25 17:55:59 INFO SparkEnv: Registering BlockManagerMaster
15/03/25 17:55:59 INFO DiskBlockManager: Created local directory at /var/folders/2l/195zcc1n0sn2wjfjwf9hl9d80000gn/T/spark-5df78be1-f688-411b-a632-092a84017016/blockmgr-f5e20d04-112d-482b-8906-c54a3e46e812
15/03/25 17:55:59 INFO MemoryStore: MemoryStore started with capacity 530.0 MB
15/03/25 17:55:59 INFO HttpFileServer: HTTP File server directory is /var/folders/2l/195zcc1n0sn2wjfjwf9hl9d80000gn/T/spark-a92b0a81-c2ee-4d02-861f-790106644a12/httpd-dfb27abf-c9a6-46a9-8396-76879a2b534b 15/03/25 17:55:59 INFO HttpServer: Starting HTTP Server
15/03/25 17:55:59 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/25 17:55:59 INFO AbstractConnector: Started SocketConnector@0.0.0.0:52223
15/03/25 17:55:59 INFO Utils: Successfully started service 'HTTP file server' on port 52223.
15/03/25 17:55:59 INFO SparkEnv: Registering OutputCommitCoordinator
15/03/25 17:55:59 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/25 17:55:59 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/03/25 17:55:59 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/03/25 17:55:59 INFO SparkUI: Started SparkUI at http://10.60.215.93:4040
15/03/25 17:55:59 INFO SparkContext: Added JAR file:/Users/qpzhang/project/spark-1.3.0-bin-hadoop2.4/./examples/mini-complete-example/target/learning-spark-mini-example-0.0.1.jar at http://10.60.215.93:52223/jars/learning-spark-mini-example-0.0.1.jar with timestamp 1427277359530
15/03/25 17:55:59 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkMaster@qpzhangdeMac-mini.local:7077/user/Master...
15/03/25 17:55:59 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150325175559-0001 15/03/25 17:55:59 INFO AppClient$ClientActor: Executor added: app-20150325175559-0001/0 on worker-20150325161345-10.60.215.93-50033 (10.60.215.93:50033) with 7 cores 15/03/25 17:55:59 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150325175559-0001/0 on hostPort 10.60.215.93:50033 with 7 cores, 512.0 MB RAM 15/03/25 17:55:59 INFO AppClient$ClientActor: Executor added: app-20150325175559-0001/1 on worker-20150325013207-10.0.2.15-48618 (10.0.2.15:48618) with 1 cores 15/03/25 17:55:59 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150325175559-0001/1 on hostPort 10.0.2.15:48618 with 1 cores, 512.0 MB RAM 15/03/25 17:55:59 INFO AppClient$ClientActor: Executor updated: app-20150325175559-0001/0 is now LOADING 15/03/25 17:55:59 INFO AppClient$ClientActor: Executor updated: app-20150325175559-0001/0 is now RUNNING 15/03/25 17:55:59 INFO AppClient$ClientActor: Executor updated: app-20150325175559-0001/1 is now LOADING 15/03/25 17:55:59 INFO AppClient$ClientActor: Executor updated: app-20150325175559-0001/1 is now RUNNING 15/03/25 17:55:59 INFO NettyBlockTransferService: Server created on 52225
15/03/25 17:55:59 INFO BlockManagerMaster: Trying to register BlockManager
15/03/25 17:55:59 INFO BlockManagerMasterActor: Registering block manager 10.60.215.93:52225 with 530.0 MB RAM, BlockManagerId(<driver>, 10.60.215.93, 52225)
15/03/25 17:55:59 INFO BlockManagerMaster: Registered BlockManager
15/03/25 17:56:00 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
15/03/25 17:56:00 INFO MemoryStore: ensureFreeSpace(159118) called with curMem=0, maxMem=555755765
15/03/25 17:56:00 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 155.4 KB, free 529.9 MB)
15/03/25 17:56:01 INFO MemoryStore: ensureFreeSpace(22692) called with curMem=159118, maxMem=555755765
15/03/25 17:56:01 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.2 KB, free 529.8 MB)
15/03/25 17:56:01 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.60.215.93:52225 (size: 22.2 KB, free: 530.0 MB)
15/03/25 17:56:01 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/03/25 17:56:01 INFO SparkContext: Created broadcast 0 from textFile at WordCount.java:31
15/03/25 17:56:01 INFO FileInputFormat: Total input paths to process : 1
15/03/25 17:56:01 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/03/25 17:56:01 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/03/25 17:56:01 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/03/25 17:56:01 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/03/25 17:56:01 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/03/25 17:56:01 INFO SparkContext: Starting job: saveAsTextFile at WordCount.java:46
15/03/25 17:56:01 INFO DAGScheduler: Registering RDD 3 (mapToPair at WordCount.java:39)
15/03/25 17:56:01 INFO DAGScheduler: Got job 0 (saveAsTextFile at WordCount.java:46) with 2 output partitions (allowLocal=false)
15/03/25 17:56:01 INFO DAGScheduler: Final stage: Stage 1(saveAsTextFile at WordCount.java:46)
15/03/25 17:56:01 INFO DAGScheduler: Parents of final stage: List(Stage 0)
15/03/25 17:56:01 INFO DAGScheduler: Missing parents: List(Stage 0)
15/03/25 17:56:01 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[3] at mapToPair at WordCount.java:39), which has no missing parents
15/03/25 17:56:01 INFO MemoryStore: ensureFreeSpace(4472) called with curMem=181810, maxMem=555755765
15/03/25 17:56:01 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.4 KB, free 529.8 MB)
15/03/25 17:56:01 INFO MemoryStore: ensureFreeSpace(3148) called with curMem=186282, maxMem=555755765
15/03/25 17:56:01 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.1 KB, free 529.8 MB)
15/03/25 17:56:01 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.60.215.93:52225 (size: 3.1 KB, free: 530.0 MB)
15/03/25 17:56:01 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/03/25 17:56:01 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839
15/03/25 17:56:01 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[3] at mapToPair at WordCount.java:39)
15/03/25 17:56:01 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/03/25 17:56:03 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@10.60.215.93:52228/user/Executor#-261943491] with ID 0
15/03/25 17:56:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.60.215.93, PROCESS_LOCAL, 1394 bytes)
15/03/25 17:56:03 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.60.215.93, PROCESS_LOCAL, 1394 bytes)
15/03/25 17:56:03 INFO BlockManagerMasterActor: Registering block manager 10.60.215.93:52231 with 265.1 MB RAM, BlockManagerId(0, 10.60.215.93, 52231)
15/03/25 17:56:04 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.60.215.93:52231 (size: 3.1 KB, free: 265.1 MB)
15/03/25 17:56:04 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.60.215.93:52231 (size: 22.2 KB, free: 265.1 MB)
15/03/25 17:56:05 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 1487 ms on 10.60.215.93 (1/2)
15/03/25 17:56:05 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1505 ms on 10.60.215.93 (2/2)
15/03/25 17:56:05 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/03/25 17:56:05 INFO DAGScheduler: Stage 0 (mapToPair at WordCount.java:39) finished in 3.405 s
15/03/25 17:56:05 INFO DAGScheduler: looking for newly runnable stages
15/03/25 17:56:05 INFO DAGScheduler: running: Set()
15/03/25 17:56:05 INFO DAGScheduler: waiting: Set(Stage 1)
15/03/25 17:56:05 INFO DAGScheduler: failed: Set()
15/03/25 17:56:05 INFO DAGScheduler: Missing parents for Stage 1: List()
15/03/25 17:56:05 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[5] at saveAsTextFile at WordCount.java:46), which is now runnable
15/03/25 17:56:05 INFO MemoryStore: ensureFreeSpace(112656) called with curMem=189430, maxMem=555755765
15/03/25 17:56:05 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 110.0 KB, free 529.7 MB)
15/03/25 17:56:05 INFO MemoryStore: ensureFreeSpace(67099) called with curMem=302086, maxMem=555755765
15/03/25 17:56:05 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 65.5 KB, free 529.7 MB)
15/03/25 17:56:05 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.60.215.93:52225 (size: 65.5 KB, free: 529.9 MB)
15/03/25 17:56:05 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
15/03/25 17:56:05 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839
15/03/25 17:56:05 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 (MapPartitionsRDD[5] at saveAsTextFile at WordCount.java:46)
15/03/25 17:56:05 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
15/03/25 17:56:05 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 10.60.215.93, PROCESS_LOCAL, 1134 bytes)
15/03/25 17:56:05 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 10.60.215.93, PROCESS_LOCAL, 1134 bytes)
15/03/25 17:56:05 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.60.215.93:52231 (size: 65.5 KB, free: 265.1 MB)
15/03/25 17:56:05 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@10.60.215.93:52228
15/03/25 17:56:05 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 150 bytes
15/03/25 17:56:05 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 241 ms on 10.60.215.93 (1/2)
15/03/25 17:56:05 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 246 ms on 10.60.215.93 (2/2)
15/03/25 17:56:05 INFO DAGScheduler: Stage 1 (saveAsTextFile at WordCount.java:46) finished in 0.246 s
15/03/25 17:56:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15/03/25 17:56:05 INFO DAGScheduler: Job 0 finished: saveAsTextFile at WordCount.java:46, took 3.789099 s

从日志里面终于看到分布式的影子了。查看结果:

qpzhang@qpzhangdeMac-mini:~/project/spark-1.3.0-bin-hadoop2.4 $ll wordcount.txt/
total 40
drwxr-xr-x   8 qpzhang  staff   272  3 25 17:56 ./
drwxr-xr-x@ 19 qpzhang  staff   646  3 25 17:56 ../
-rw-r--r--   1 qpzhang  staff     8  3 25 17:56 ._SUCCESS.crc
-rw-r--r--   1 qpzhang  staff    24  3 25 17:56 .part-00000.crc
-rw-r--r--   1 qpzhang  staff    24  3 25 17:56 .part-00001.crc
-rw-r--r--   1 qpzhang  staff     0  3 25 17:56 _SUCCESS
-rw-r--r-- 1 qpzhang staff 1956 3 25 17:56 part-00000 -rw-r--r-- 1 qpzhang staff 1717 3 25 17:56 part-00001

qpzhang@qpzhangdeMac-mini:~/project/spark-1.3.0-bin-hadoop2.4 $vim wordcount.txt/part-00000

(package,1)
(this,1)
(Because,1)
(Python,2)
(cluster.,1)
(its,1)
([run,1)
(general,2)
(YARN,,1)
(have,1)
(pre-built,1)
(locally.,1)
(changed,1)
(locally,2)
(sc.parallelize(1,1)

两个worker,生成了两个文件,为啥没合并成一个呢?这是一个问题,后面再看。

 =====================================================

转载请注明出处:http://www.cnblogs.com/zhangqingping/p/4366472.html