I am using Spark Streaming to fetch tweets from twitter by creating a StreamingContext as :val ssc = new StreamingContext("local[3]", "TwitterFeed",Minutes(1))
我使用Spark流来通过创建一个StreamingContext来从twitter获取tweet: val ssc = new StreamingContext(“local[3]”,“TwitterFeed”,Minutes(1))
and creating twitter stream as :val tweetStream = TwitterUtils.createStream(ssc, Some(new OAuthAuthorization(Util.config)),filters)
创建推特流:val tweetStream = TwitterUtils。createStream(ssc,一些(新OAuthAuthorization(Util.config)),过滤器)
then saving it as text file tweets.repartition(1).saveAsTextFiles("/tmp/spark_testing/")
然后将其保存为文本文件tweeter .repartition(1).saveAsTextFiles(“/tmp/spark_testing/”)
and the problem is that the tweets are being saved as folders based on batch time but I need all the data of each batch in a same folder.
Is there any workaround for it?
1 个解决方案
We can do this using Spark SQL's new DataFrame saving API which allow appending to an existing output. By default, saveAsTextFile, won't be able to save to a directory with existing data (see https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes ). https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations covers how to setup a Spark SQL context for use with Spark Streaming.
我们可以使用Spark SQL的新DataFrame节省API来实现这一点,该API允许对现有的输出追加应用程序。默认情况下,saveAsTextFile将无法保存到现有数据的目录中(参见https://spark.apache.org/docs/latest/sql-编程-guide.html#save-mode)。https://spark.apache.org/docs/latest/streamingprogram .html#dataframe-and- SQL操作包括如何设置Spark SQL上下文,用于Spark流。
Assuming you copy the part from the guide with the SQLContextSingleton, The resulting code would look something like:
data.foreachRDD{rdd =>
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
// Convert your data to a DataFrame, depends on the structure of your data
val df = ....
df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
(Note the above example used JSON to save the result, but you can use different output formats too).
We can do this using Spark SQL's new DataFrame saving API which allow appending to an existing output. By default, saveAsTextFile, won't be able to save to a directory with existing data (see https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes ). https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations covers how to setup a Spark SQL context for use with Spark Streaming.
我们可以使用Spark SQL的新DataFrame节省API来实现这一点,该API允许对现有的输出追加应用程序。默认情况下,saveAsTextFile将无法保存到现有数据的目录中(参见https://spark.apache.org/docs/latest/sql-编程-guide.html#save-mode)。https://spark.apache.org/docs/latest/streamingprogram .html#dataframe-and- SQL操作包括如何设置Spark SQL上下文,用于Spark流。
Assuming you copy the part from the guide with the SQLContextSingleton, The resulting code would look something like:
data.foreachRDD{rdd =>
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
// Convert your data to a DataFrame, depends on the structure of your data
val df = ....
df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
(Note the above example used JSON to save the result, but you can use different output formats too).