Spark流:将Dstream批量加入到单个输出文件夹中。

时间:2021-02-21 20:48:46

I am using Spark Streaming to fetch tweets from twitter by creating a StreamingContext as :
val ssc = new StreamingContext("local[3]", "TwitterFeed",Minutes(1))

我使用Spark流来通过创建一个StreamingContext来从twitter获取tweet: val ssc = new StreamingContext(“local[3]”,“TwitterFeed”,Minutes(1))

and creating twitter stream as :
val tweetStream = TwitterUtils.createStream(ssc, Some(new OAuthAuthorization(Util.config)),filters)

创建推特流:val tweetStream = TwitterUtils。createStream(ssc,一些(新OAuthAuthorization(Util.config)),过滤器)

then saving it as text file
tweets.repartition(1).saveAsTextFiles("/tmp/spark_testing/")

然后将其保存为文本文件tweeter .repartition(1).saveAsTextFiles(“/tmp/spark_testing/”)

and the problem is that the tweets are being saved as folders based on batch time but I need all the data of each batch in a same folder.

问题是,这些推文被保存为基于批处理时间的文件夹,但我需要同一文件夹中每批的所有数据。

Is there any workaround for it?

有什么变通方法吗?

Thanks

谢谢

1 个解决方案

#1


0  

We can do this using Spark SQL's new DataFrame saving API which allow appending to an existing output. By default, saveAsTextFile, won't be able to save to a directory with existing data (see https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes ). https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations covers how to setup a Spark SQL context for use with Spark Streaming.

我们可以使用Spark SQL的新DataFrame节省API来实现这一点,该API允许对现有的输出追加应用程序。默认情况下,saveAsTextFile将无法保存到现有数据的目录中(参见https://spark.apache.org/docs/latest/sql-编程-guide.html#save-mode)。https://spark.apache.org/docs/latest/streamingprogram .html#dataframe-and- SQL操作包括如何设置Spark SQL上下文,用于Spark流。

Assuming you copy the part from the guide with the SQLContextSingleton, The resulting code would look something like:

假设您使用SQLContextSingleton从指南中复制该部分,生成的代码将如下所示:

data.foreachRDD{rdd =>
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  // Convert your data to a DataFrame, depends on the structure of your data
  val df = ....
  df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
}

(Note the above example used JSON to save the result, but you can use different output formats too).

(请注意上面的示例使用了JSON来保存结果,但是您也可以使用不同的输出格式)。

#1


0  

We can do this using Spark SQL's new DataFrame saving API which allow appending to an existing output. By default, saveAsTextFile, won't be able to save to a directory with existing data (see https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes ). https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations covers how to setup a Spark SQL context for use with Spark Streaming.

我们可以使用Spark SQL的新DataFrame节省API来实现这一点,该API允许对现有的输出追加应用程序。默认情况下,saveAsTextFile将无法保存到现有数据的目录中(参见https://spark.apache.org/docs/latest/sql-编程-guide.html#save-mode)。https://spark.apache.org/docs/latest/streamingprogram .html#dataframe-and- SQL操作包括如何设置Spark SQL上下文,用于Spark流。

Assuming you copy the part from the guide with the SQLContextSingleton, The resulting code would look something like:

假设您使用SQLContextSingleton从指南中复制该部分,生成的代码将如下所示:

data.foreachRDD{rdd =>
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  // Convert your data to a DataFrame, depends on the structure of your data
  val df = ....
  df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
}

(Note the above example used JSON to save the result, but you can use different output formats too).

(请注意上面的示例使用了JSON来保存结果,但是您也可以使用不同的输出格式)。