I have spark output in a s3 folders and I want to move all s3 files from that output folder to another location ,but while moving I want to rename the files .
我在s3文件夹中有spark输出,我想将所有s3文件从该输出文件夹移动到另一个位置,但在移动时我想重命名文件。
For example I have files in S3 folders like below
例如,我有S3文件夹中的文件,如下所示
Now I want to rename all files and put into another directory,but the name of the files would be like below
现在我想重命名所有文件并放入另一个目录,但文件名如下所示
Fundamental.FinancialStatement.FinancialStatementLineItems.Japan.1971-BAL.1.2017-10-18-0439.Full.txt
Fundamental.FinancialStatement.FinancialStatementLineItems.Japan.1971-BAL.2.2017-10-18-0439.Full.txt
Fundamental.FinancialStatement.FinancialStatementLineItems.Japan.1971-BAL.3.2017-10-18-0439.Full.txt
Here Fundamental.FinancialStatementis constant in all the files 2017-10-18-0439
current date time .
这里Fundamental.FinancialStatement在所有文件中保持不变2017-10-18-0439当前日期时间。
This is what I have tried so far but not able to get folder name and loop through all files
这是我到目前为止尝试但不能获取文件夹名称和循环所有文件
import org.apache.hadoop.fs._
val src = new Path("s3://trfsmallfffile/Segments/output")
val dest = new Path("s3://trfsmallfffile/Segments/Finaloutput")
val conf = sc.hadoopConfiguration // assuming sc = spark context
val fs = src.getFileSystem(conf)
//val file = fs.globStatus(new Path("src/DataPartition=Japan/part*.gz"))(0).getPath.getName
//println(file)
val status = fs.listStatus(src)
status.foreach(filename => {
val a = filename.getPath.getName.toString()
println("file name"+a)
//println(filename)
})
This gives me below output
这给了我以下输出
file nameDataPartition=Japan
file nameDataPartition=SelfSourcedPrivate
file nameDataPartition=SelfSourcedPublic
file name_SUCCESS
This gives me folders details not files inside the folder.
这给了我文件夹详细信息而不是文件夹中的文件。
Reference is taken from here Stack Overflow Refrence
参考文献来自Stack Overflow Refrence
2 个解决方案
#1
1
You are getting directory because you have sub dir level in s3 .
您正在获取目录,因为您在s3中具有子目录级别。
/*/* to go in subdir .
Try this
尝试这个
import org.apache.hadoop.fs._
val src = new Path("s3://trfsmallfffile/Segments/Output/*/*")
val dest = new Path("s3://trfsmallfffile/Segments/FinalOutput")
val conf = sc.hadoopConfiguration // assuming sc = spark context
val fs = src.getFileSystem(conf)
val file = fs.globStatus(new Path("s3://trfsmallfffile/Segments/Output/*/*"))
for (urlStatus <- file) {
//println("S3 FILE PATH IS ===:" + urlStatus.getPath)
val partitioName=urlStatus.getPath.toString.split("=")(1).split("\\/")(0).toString
val finalPrefix="Fundamental.FinancialLineItem.Segments."
val finalFileName=finalPrefix+partitioName+".txt"
val dest = new Path("s3://trfsmallfffile/Segments/FinalOutput"+"/"+finalFileName+ " ")
fs.rename(urlStatus.getPath, dest)
}
#2
0
This has worked for me in past
这在过去对我有用
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
val path = "s3://<bucket>/<directory>"
val fs = FileSystem.get(new java.net.URI(path), spark.sparkContext.hadoopConfiguration)
fs.listStatus(new Path(path))
The list status provides all the files in the s3 directory
列表状态提供s3目录中的所有文件
#1
1
You are getting directory because you have sub dir level in s3 .
您正在获取目录,因为您在s3中具有子目录级别。
/*/* to go in subdir .
Try this
尝试这个
import org.apache.hadoop.fs._
val src = new Path("s3://trfsmallfffile/Segments/Output/*/*")
val dest = new Path("s3://trfsmallfffile/Segments/FinalOutput")
val conf = sc.hadoopConfiguration // assuming sc = spark context
val fs = src.getFileSystem(conf)
val file = fs.globStatus(new Path("s3://trfsmallfffile/Segments/Output/*/*"))
for (urlStatus <- file) {
//println("S3 FILE PATH IS ===:" + urlStatus.getPath)
val partitioName=urlStatus.getPath.toString.split("=")(1).split("\\/")(0).toString
val finalPrefix="Fundamental.FinancialLineItem.Segments."
val finalFileName=finalPrefix+partitioName+".txt"
val dest = new Path("s3://trfsmallfffile/Segments/FinalOutput"+"/"+finalFileName+ " ")
fs.rename(urlStatus.getPath, dest)
}
#2
0
This has worked for me in past
这在过去对我有用
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
val path = "s3://<bucket>/<directory>"
val fs = FileSystem.get(new java.net.URI(path), spark.sparkContext.hadoopConfiguration)
fs.listStatus(new Path(path))
The list status provides all the files in the s3 directory
列表状态提供s3目录中的所有文件