相比于Hadoop,Spark在数据的处理方面更加灵活方便。然而在最近的使用中遇到了一点小麻烦:Spark保存文件的的函数(如saveAsTextFile)在保存数据时都需要新建一个目录,然后在这个目录下分块保存文件。如果我们想在原有的目录下增加一个文件(而不是增加一个目录)
rddx.repartition(1).saveAsTextFile("test/test.txt") rddx.coalesce(1).saveAsTextFile("test/test.txt")
把分区设置成1个 结果是Spark仍然是新建了一个目录test.txt,然后在这个目录下把数据都保存在了part-00000文件中
问题:如何让spark将Rdd结果输出到一个文件而不是目录中呢?
Spark的保存模式的设定注定了在保存数据的时候只能新建目录,如果想把数据增加到原有的目录中,单独作为一个文件,就只能借助于hadoop的HDFS操作。下面的例子演示如何用Hadoop的FileSystem实现在已有目录下用一个文件保存Spark数据:
package com.ys.penspark.util; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedWriter; import java.io.OutputStreamWriter; import java.io.Serializable; import java.net.URI; /** * @ClassName: HdfsOperate * @Description: * @Author: Administrator * @Date: 2017/6/28 */ public class HdfsOperate implements Serializable { private static Logger logger = LoggerFactory.getLogger(HdfsOperate.class); private static Configuration conf = new Configuration(); private static BufferedWriter writer = null; //在hdfs的目标位置新建一个文件,得到一个输出流 public static void openHdfsFile(String path) throws Exception { FileSystem fs = FileSystem.get(URI.create(path),conf); writer = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(path)))); if(null!=writer){ logger.info("[HdfsOperate]>> initialize writer succeed!"); } } //往hdfs文件中写入数据 public static void writeString(String line) { try { writer.write(line + "\n"); }catch(Exception e){ logger.error("[HdfsOperate]>> writer a line error:" , e); } } //关闭hdfs输出流 public static void closeHdfsFile() { try { if (null != writer) { writer.close(); logger.info("[HdfsOperate]>> closeHdfsFile close writer succeed!"); } else{ logger.error("[HdfsOperate]>> closeHdfsFile writer is null"); } }catch(Exception e){ logger.error("[HdfsOperate]>> closeHdfsFile close hdfs error:" + e); } } }
先将spark的Rdd重新分区,再将每个分区的数据collectPartitions按行写入hdfs文件中
package com.ys.penspark.util; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; /** * @ClassName: FeatureExtractor * @Description: * @Author: mashiwei * @Date: 2017/6/28 */ public class FeatureExtractor implements Serializable{ private static Logger logger = LoggerFactory.getLogger(FeatureExtractor.class); public void extractFeature(Dataset<Row> s, int repartitionNum,String out) throws Exception { StringBuffer sb = new StringBuffer(); for (int i = 0; i<= s.schema().fieldNames().length-1;i++) { sb.append(s.schema().fieldNames()[i]); if (i == s.schema().fieldNames().length-1){ break; } sb.append(","); } s.show(); JavaRDD<String> rddx = s.toJavaRDD().map(new ExtractFeatureMap()).repartition(repartitionNum); //写入hdfs文件位置 // String destinationPath = "/kettle/penspark/data.txt" ; //创建Hdfs文件,打开Hdfs输出流 HdfsOperate.openHdfsFile(out); HdfsOperate.writeString(sb.toString()); //分块读取RDD数据并保存到hdfs //如果直接用collect()函数获取List<String>,可能因数据量过大超过内存空间而失败 for (int i = 0; i < repartitionNum; i++) { int[] index = new int[1]; index[0] = i; // List<String>[] featureList = rddx.collectPartitions(index); // List<String> strs = rddx.collect(); List<String>[] featureList = rddx.collectPartitions(index); if (featureList.length != 1) { logger.error("[FeatureExtractor]>> featureList.length is not 1!"); } for (String str : featureList[0]) { //写一行到Hdfs文件 logger.info("-----"+str); HdfsOperate.writeString(str); } } //关闭Hdfs输出流 HdfsOperate.closeHdfsFile(); } class ExtractFeatureMap implements Function<Row, String> { @Override public String call(Row line) throws Exception { try { StringBuffer sb = new StringBuffer(); int len = line.length(); for (int i = 0; i<= len-1; i++){ sb.append(line.get(i).toString()); if (i == len-1){ break; } sb.append(","); } return sb.toString(); } catch (Exception e) { logger.error("[FeatureExtractor]>>GetTokenAndKeywordFeature error:", e); } return null; } } public static void main(String[] args) { // SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local"); // JavaSparkContext sc= new JavaSparkContext(conf); StructType Schemafinal = new StructType(); Map<String,String> options = new HashMap<String,String>(); LinkedList<StructField> obj = new LinkedList<StructField>(); StructField structField = new StructField("name", DataTypes.StringType, true, Metadata.empty()); StructField structField1 = new StructField("age", DataTypes.StringType, true, Metadata.empty()); // StructField structField2 = new StructField("字段2", DataTypes.StringType, true, Metadata.empty()); // StructField structField3 = new StructField("字段3", DataTypes.StringType, true, Metadata.empty()); obj.add(structField); obj.add(structField1); // obj.add(structField2); // obj.add(structField3); Schemafinal = new StructType(obj.toArray(new StructField[obj.size()])); SparkConf conf = new SparkConf().setAppName("Example App").setMaster("local[*]"); options.put("delimiter",","); options.put("header","true"); JavaSparkContext sc = new JavaSparkContext(conf); @SuppressWarnings("deprecation") SQLContext sqlContext = new SQLContext(sc); SparkSession spark = SparkSession .builder() .appName("Pentaho Logic as Spark") .config("spark.some.config.option", "some-value") .config("spark.sql.warehouse.dir", "file:///C:/tmp/") .getOrCreate(); Dataset<Row> tempdf = spark.read() .format("com.databricks.spark.csv") .options(options) .schema(Schemafinal) .option("header", true) .load("file:///"+"C:\\Users\\Administrator\\Desktop\\测试\\功能开发\\excel.txt"); tempdf.show(); FeatureExtractor fx = new FeatureExtractor(); try { // fx.extractFeature(sc,5); fx.extractFeature(tempdf,2,"/kettle/tempData.txt"); } catch (Exception e) { e.printStackTrace(); } } }
数据
name,age zs, 44 li, 22 ww, 18