I have a very basic Spark application that streams an input file, every line contains a JSON string I want to create a model object of.
public final class SparkStreamingApplication {
public static JavaSparkContext javaSparkContext() {
final SparkConf conf = new SparkConf()
return new JavaSparkContext(conf);
public static void main(String[] args) {
final JavaSparkContext sparkContext = javaSparkContext();
final String path = "data/input.txt";
final JavaStreamingContext streamingContext = new JavaStreamingContext(sparkContext, Durations.seconds(10));
final JavaDStream<String> linesDStream = streamingContext.textFileStream(path);
final JavaDStream<String> tokens = linesDStream.flatMap(x -> Arrays.asList(x.split("|")));
final JavaDStream<Long> count = tokens.count();
This results in:
16/01/24 18:44:56 INFO SparkContext: Running Spark version 1.6.0
16/01/24 18:44:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/24 18:44:58 WARN Utils: Your hostname, markus-lenovo resolves to a loopback address:; using instead (on interface wlp2s0)
16/01/24 18:44:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
16/01/24 18:44:58 INFO SecurityManager: Changing view acls to: markus
16/01/24 18:44:58 INFO SecurityManager: Changing modify acls to: markus
16/01/24 18:44:58 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(markus); users with modify permissions: Set(markus)
16/01/24 18:44:59 INFO Utils: Successfully started service 'sparkDriver' on port 38761.
16/01/24 18:44:59 INFO Slf4jLogger: Slf4jLogger started
16/01/24 18:44:59 INFO Remoting: Starting remoting
16/01/24 18:45:00 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@]
16/01/24 18:45:00 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 45438.
16/01/24 18:45:00 INFO SparkEnv: Registering MapOutputTracker
16/01/24 18:45:00 INFO SparkEnv: Registering BlockManagerMaster
16/01/24 18:45:00 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-82c4981c-0b78-47c0-a8c7-e6fe8bc6ac84
16/01/24 18:45:00 INFO MemoryStore: MemoryStore started with capacity 1092.4 MB
16/01/24 18:45:00 INFO SparkEnv: Registering OutputCommitCoordinator
16/01/24 18:45:00 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/01/24 18:45:00 INFO SparkUI: Started SparkUI at
16/01/24 18:45:00 INFO Executor: Starting executor ID driver on host localhost
16/01/24 18:45:00 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 35429.
16/01/24 18:45:00 INFO NettyBlockTransferService: Server created on 35429
16/01/24 18:45:00 INFO BlockManagerMaster: Trying to register BlockManager
16/01/24 18:45:00 INFO BlockManagerMasterEndpoint: Registering block manager localhost:35429 with 1092.4 MB RAM, BlockManagerId(driver, localhost, 35429)
16/01/24 18:45:00 INFO BlockManagerMaster: Registered BlockManager
16/01/24 18:45:01 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@3c35c345
16/01/24 18:45:02 INFO ForEachDStream: metadataCleanupDelay = -1
16/01/24 18:45:02 INFO MappedDStream: metadataCleanupDelay = -1
16/01/24 18:45:02 INFO MappedDStream: metadataCleanupDelay = -1
16/01/24 18:45:02 INFO ShuffledDStream: metadataCleanupDelay = -1
16/01/24 18:45:02 INFO TransformedDStream: metadataCleanupDelay = -1
16/01/24 18:45:02 INFO MappedDStream: metadataCleanupDelay = -1
16/01/24 18:45:02 INFO FlatMappedDStream: metadataCleanupDelay = -1
16/01/24 18:45:02 INFO MappedDStream: metadataCleanupDelay = -1
16/01/24 18:45:02 INFO FileInputDStream: metadataCleanupDelay = -1
16/01/24 18:45:02 INFO FileInputDStream: Slide time = 10000 ms
16/01/24 18:45:02 INFO FileInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/01/24 18:45:02 INFO FileInputDStream: Checkpoint interval = null
16/01/24 18:45:02 INFO FileInputDStream: Remember duration = 60000 ms
16/01/24 18:45:02 INFO FileInputDStream: Initialized and validated org.apache.spark.streaming.dstream.FileInputDStream@3c35c345
16/01/24 18:45:02 INFO MappedDStream: Slide time = 10000 ms
16/01/24 18:45:02 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/01/24 18:45:02 INFO MappedDStream: Checkpoint interval = null
16/01/24 18:45:02 INFO MappedDStream: Remember duration = 10000 ms
16/01/24 18:45:02 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@45f27baa
16/01/24 18:45:02 INFO FlatMappedDStream: Slide time = 10000 ms
16/01/24 18:45:02 INFO FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/01/24 18:45:02 INFO FlatMappedDStream: Checkpoint interval = null
16/01/24 18:45:02 INFO FlatMappedDStream: Remember duration = 10000 ms
16/01/24 18:45:02 INFO FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@18d0e76e
16/01/24 18:45:02 INFO MappedDStream: Slide time = 10000 ms
16/01/24 18:45:02 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/01/24 18:45:02 INFO MappedDStream: Checkpoint interval = null
16/01/24 18:45:02 INFO MappedDStream: Remember duration = 10000 ms
16/01/24 18:45:02 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@eb2c23e
16/01/24 18:45:02 INFO TransformedDStream: Slide time = 10000 ms
16/01/24 18:45:02 INFO TransformedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/01/24 18:45:02 INFO TransformedDStream: Checkpoint interval = null
16/01/24 18:45:02 INFO TransformedDStream: Remember duration = 10000 ms
16/01/24 18:45:02 INFO TransformedDStream: Initialized and validated org.apache.spark.streaming.dstream.TransformedDStream@26b276d3
16/01/24 18:45:02 INFO ShuffledDStream: Slide time = 10000 ms
16/01/24 18:45:02 INFO ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/01/24 18:45:02 INFO ShuffledDStream: Checkpoint interval = null
16/01/24 18:45:02 INFO ShuffledDStream: Remember duration = 10000 ms
16/01/24 18:45:02 INFO ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@704b6684
16/01/24 18:45:02 INFO MappedDStream: Slide time = 10000 ms
16/01/24 18:45:02 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/01/24 18:45:02 INFO MappedDStream: Checkpoint interval = null
16/01/24 18:45:02 INFO MappedDStream: Remember duration = 10000 ms
16/01/24 18:45:02 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@6fbf1474
16/01/24 18:45:02 INFO MappedDStream: Slide time = 10000 ms
16/01/24 18:45:02 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/01/24 18:45:02 INFO MappedDStream: Checkpoint interval = null
16/01/24 18:45:02 INFO MappedDStream: Remember duration = 10000 ms
16/01/24 18:45:02 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@7784888f
16/01/24 18:45:02 INFO ForEachDStream: Slide time = 10000 ms
16/01/24 18:45:02 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/01/24 18:45:02 INFO ForEachDStream: Checkpoint interval = null
16/01/24 18:45:02 INFO ForEachDStream: Remember duration = 10000 ms
16/01/24 18:45:02 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@42b57c42
16/01/24 18:45:02 INFO RecurringTimer: Started timer for JobGenerator at time 1453657510000
16/01/24 18:45:02 INFO JobGenerator: Started JobGenerator at 1453657510000 ms
16/01/24 18:45:02 INFO JobScheduler: Started JobScheduler
16/01/24 18:45:02 INFO StreamingContext: StreamingContext started
16/01/24 18:45:10 INFO FileInputDStream: Finding new files took 184 ms
16/01/24 18:45:10 INFO FileInputDStream: New files at time 1453657510000 ms:
16/01/24 18:45:10 INFO JobScheduler: Added jobs for time 1453657510000 ms
16/01/24 18:45:10 INFO JobScheduler: Starting job streaming job 1453657510000 ms.0 from job set of time 1453657510000 ms
16/01/24 18:45:10 INFO SparkContext: Starting job: print at SparkStreamingApplication.java:33
16/01/24 18:45:10 INFO DAGScheduler: Registering RDD 5 (union at DStream.scala:617)
16/01/24 18:45:10 INFO DAGScheduler: Got job 0 (print at SparkStreamingApplication.java:33) with 1 output partitions
16/01/24 18:45:10 INFO DAGScheduler: Final stage: ResultStage 1 (print at SparkStreamingApplication.java:33)
16/01/24 18:45:10 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/01/24 18:45:10 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/01/24 18:45:10 INFO DAGScheduler: Submitting ShuffleMapStage 0 (UnionRDD[5] at union at DStream.scala:617), which has no missing parents
16/01/24 18:45:10 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.6 KB, free 4.6 KB)
16/01/24 18:45:10 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 7.2 KB)
16/01/24 18:45:10 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:35429 (size: 2.6 KB, free: 1092.4 MB)
16/01/24 18:45:10 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/01/24 18:45:10 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (UnionRDD[5] at union at DStream.scala:617)
16/01/24 18:45:10 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/01/24 18:45:10 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2148 bytes)
16/01/24 18:45:10 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/01/24 18:45:10 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1159 bytes result sent to driver
16/01/24 18:45:11 INFO DAGScheduler: ShuffleMapStage 0 (union at DStream.scala:617) finished in 0.211 s
16/01/24 18:45:11 INFO DAGScheduler: looking for newly runnable stages
16/01/24 18:45:11 INFO DAGScheduler: running: Set()
16/01/24 18:45:11 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/01/24 18:45:11 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 174 ms on localhost (1/1)
16/01/24 18:45:11 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/01/24 18:45:11 INFO DAGScheduler: failed: Set()
16/01/24 18:45:11 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[8] at count at SparkStreamingApplication.java:32), which has no missing parents
16/01/24 18:45:11 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.5 KB, free 10.8 KB)
16/01/24 18:45:11 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.0 KB, free 12.8 KB)
16/01/24 18:45:11 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:35429 (size: 2.0 KB, free: 1092.4 MB)
16/01/24 18:45:11 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/01/24 18:45:11 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[8] at count at SparkStreamingApplication.java:32)
16/01/24 18:45:11 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/01/24 18:45:11 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1813 bytes)
16/01/24 18:45:11 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
16/01/24 18:45:11 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/01/24 18:45:11 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms
16/01/24 18:45:11 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1241 bytes result sent to driver
16/01/24 18:45:11 INFO DAGScheduler: ResultStage 1 (print at SparkStreamingApplication.java:33) finished in 0.068 s
16/01/24 18:45:11 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 72 ms on localhost (1/1)
16/01/24 18:45:11 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/01/24 18:45:11 INFO DAGScheduler: Job 0 finished: print at SparkStreamingApplication.java:33, took 0.729150 s
16/01/24 18:45:11 INFO SparkContext: Starting job: print at SparkStreamingApplication.java:33
16/01/24 18:45:11 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 144 bytes
16/01/24 18:45:11 INFO DAGScheduler: Got job 1 (print at SparkStreamingApplication.java:33) with 1 output partitions
16/01/24 18:45:11 INFO DAGScheduler: Final stage: ResultStage 3 (print at SparkStreamingApplication.java:33)
16/01/24 18:45:11 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 2)
16/01/24 18:45:11 INFO DAGScheduler: Missing parents: List()
16/01/24 18:45:11 INFO DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[8] at count at SparkStreamingApplication.java:32), which has no missing parents
16/01/24 18:45:11 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.5 KB, free 16.3 KB)
16/01/24 18:45:11 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.0 KB, free 18.3 KB)
16/01/24 18:45:11 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:35429 (size: 2.0 KB, free: 1092.4 MB)
16/01/24 18:45:11 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
16/01/24 18:45:11 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 3 (MapPartitionsRDD[8] at count at SparkStreamingApplication.java:32)
16/01/24 18:45:11 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
16/01/24 18:45:11 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 2, localhost, partition 1,PROCESS_LOCAL, 1813 bytes)
16/01/24 18:45:11 INFO Executor: Running task 0.0 in stage 3.0 (TID 2)
16/01/24 18:45:11 INFO ContextCleaner: Cleaned accumulator 1
16/01/24 18:45:11 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
16/01/24 18:45:11 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/01/24 18:45:11 INFO Executor: Finished task 0.0 in stage 3.0 (TID 2). 1163 bytes result sent to driver
16/01/24 18:45:11 INFO DAGScheduler: ResultStage 3 (print at SparkStreamingApplication.java:33) finished in 0.048 s
16/01/24 18:45:11 INFO DAGScheduler: Job 1 finished: print at SparkStreamingApplication.java:33, took 0.112123 s
Time: 1453657510000 ms
16/01/24 18:45:11 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 2) in 48 ms on localhost (1/1)
16/01/24 18:45:11 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
16/01/24 18:45:11 INFO JobScheduler: Finished job streaming job 1453657510000 ms.0 from job set of time 1453657510000 ms
16/01/24 18:45:11 INFO JobScheduler: Total delay: 1.318 s for time 1453657510000 ms (execution: 0.963 s)
16/01/24 18:45:11 INFO FileInputDStream: Cleared 0 old files that were older than 1453657450000 ms:
16/01/24 18:45:11 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:35429 in memory (size: 2.0 KB, free: 1092.4 MB)
16/01/24 18:45:11 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
16/01/24 18:45:11 INFO ContextCleaner: Cleaned accumulator 2
16/01/24 18:45:11 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:35429 in memory (size: 2.6 KB, free: 1092.4 MB)
16/01/24 18:45:11 INFO InputInfoTracker: remove old batch metadata
As you can see a the at the lower end of the output the printing of the tokens dstream is 0. But the result should be 3 because each line of my input file is in the format xx | yy | zz ?!?!?
如您所见,在输出的低端,标记dstream的打印是0。但是结果应该是3,因为我输入文件的每一行都是xx | yy | zz格式?
Is there something wrong in my Spark configuration or in the usage of DStreams? Thanks for any ideas and suggestions!
1 个解决方案
Spark's textFileStream
creates a stream that watches a directory for new files only.
You have to change path
to "data/"
, then you have to put the file into the directory when your stream is started.
Please note that only new files are detected and processed according to the documentation:
Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
However when the file is renamed, Spark detects it.
Spark's textFileStream
creates a stream that watches a directory for new files only.
You have to change path
to "data/"
, then you have to put the file into the directory when your stream is started.
Please note that only new files are detected and processed according to the documentation:
Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
However when the file is renamed, Spark detects it.