here I streaming the data from streaming directory and the write it to a output location. I am also trying to implement the process of moving hdfs files from a input folder to the streaming directory. This move happens one time before the streaming context starts. But I want this move to get executed every time for each Batch of Dstream. is that even possible?
在这里,我从流式目录流式传输数据并将其写入输出位置。我也在尝试实现将hdfs文件从输入文件夹移动到流式目录的过程。此流程在流式上下文开始之前发生一次。但是我希望每次为每批Dstream执行此操作。甚至可能吗?
val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat](streaming_directory, (t:Path)=> true , true).map { case (x, y) => (y.toString) }
streamed_rdd.foreachRDD( rdd => {
rdd.map(x =>x.split("\t")).map(x => x(3)).foreachPartition { partitionOfRecords =>
val connection: Connection = connectionFactory.createConnection()
connection.setClientID("Email_send_module_client_id")
println("connection started with active mq")
val session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
println("created session")
val dest = session.createQueue("dwEmailsQueue2")
println("destination queue name = dwEmailsQueue2")
val prod_queue = session.createProducer(dest)
connection.start()
partitionOfRecords.foreach { record =>
val rec_to_send: TextMessage = session.createTextMessage(record)
println("started creating a text message")
prod_queue.send(rec_to_send)
println("sent the record")
}
connection.close()
}
}
)
**val LIST = scala.collection.mutable.MutableList[String]()
val files_to_move = scala.collection.mutable.MutableList[String]()
val cmd = "hdfs dfs -ls -d "+load_directory+"/*"
println(cmd)
val system_time = System.currentTimeMillis
println(system_time)
val output = cmd.!!
output.split("\n").foreach(x => x.split(" ").foreach(x => if (x.startsWith("/user/hdpprod/")) LIST += x))
LIST.foreach(x => if (x.toString.split("/").last.split("_").last.toLong < system_time) files_to_move += x)
println("files to move" +files_to_move)
var mv_cmd :String = "hdfs dfs -mv "
for (file <- files_to_move){
mv_cmd += file+" "
}
mv_cmd += streaming_directory
println(mv_cmd)
val mv_output = mv_cmd.!!
println("moved the data to the folder")**
if (streamed_rdd.count().toString == "0") {
println("no data in the streamed list")
} else {
println("saving the Dstream at "+System.currentTimeMillis())
streamed_rdd.transform(rdd => {rdd.map(x => (check_time_to_send+"\t"+check_time_to_send_utc+"\t"+x))}).saveAsTextFiles("/user/hdpprod/temp/spark_streaming_output_sent/sent")
}
ssc.start()
ssc.awaitTermination()
}
}
1 个解决方案
#1
0
I tried doing same stuff in java implementation as below. you can call this method from foreachPartion on rdd
我尝试在java实现中执行相同的操作,如下所示。你可以在rdd上从foreachPartion调用这个方法
public static void moveFiles(final String moveFilePath,
final JavaRDD rdd) {
for (final Partition partition : rdd.partitions()) {
final UnionPartition unionPartition = (UnionPartition) partition;
final NewHadoopPartition newHadoopPartition = (NewHadoopPartition)
unionPartition.parentPartition();
final String fPath = newHadoopPartition.serializableHadoopSplit()
.value().toString();
final String[] filespaths = fPath.split(":");
if ((filespaths != null) && (filespaths.length > 0)) {
for (final String filepath : filespaths) {
if ((filepath != null) && filepath.contains("/")) {
final File file = new File(filepath);
if (file.exists() && file.isFile()) {
try {
File destFile = new File(moveFilePath + "/" +
file.getName());
if (destFile.exists()) {
destFile = new File(moveFilePath + "/" +
file.getName() + "_");
}
java.nio.file.Files.move((file
.toPath()), destFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
} catch (Exception e) {
logger.error(
"Exception while moving file",
e);
}
}
}
}
}
}
}
#1
0
I tried doing same stuff in java implementation as below. you can call this method from foreachPartion on rdd
我尝试在java实现中执行相同的操作,如下所示。你可以在rdd上从foreachPartion调用这个方法
public static void moveFiles(final String moveFilePath,
final JavaRDD rdd) {
for (final Partition partition : rdd.partitions()) {
final UnionPartition unionPartition = (UnionPartition) partition;
final NewHadoopPartition newHadoopPartition = (NewHadoopPartition)
unionPartition.parentPartition();
final String fPath = newHadoopPartition.serializableHadoopSplit()
.value().toString();
final String[] filespaths = fPath.split(":");
if ((filespaths != null) && (filespaths.length > 0)) {
for (final String filepath : filespaths) {
if ((filepath != null) && filepath.contains("/")) {
final File file = new File(filepath);
if (file.exists() && file.isFile()) {
try {
File destFile = new File(moveFilePath + "/" +
file.getName());
if (destFile.exists()) {
destFile = new File(moveFilePath + "/" +
file.getName() + "_");
}
java.nio.file.Files.move((file
.toPath()), destFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
} catch (Exception e) {
logger.error(
"Exception while moving file",
e);
}
}
}
}
}
}
}