Spark 把RDD数据保存到hdfs单个文件中,而不是目录

时间:2024-03-09 11:15:07

相比于Hadoop,Spark在数据的处理方面更加灵活方便。然而在最近的使用中遇到了一点小麻烦:Spark保存文件的的函数(如saveAsTextFile)在保存数据时都需要新建一个目录,然后在这个目录下分块保存文件。如果我们想在原有的目录下增加一个文件(而不是增加一个目录)

 

rddx.repartition(1).saveAsTextFile("test/test.txt")
rddx.coalesce(1).saveAsTextFile("test/test.txt")

 

把分区设置成1个 结果是Spark仍然是新建了一个目录test.txt,然后在这个目录下把数据都保存在了part-00000文件中

问题:如何让spark将Rdd结果输出到一个文件而不是目录中呢?

Spark的保存模式的设定注定了在保存数据的时候只能新建目录,如果想把数据增加到原有的目录中,单独作为一个文件,就只能借助于hadoop的HDFS操作。下面的例子演示如何用Hadoop的FileSystem实现在已有目录下用一个文件保存Spark数据:

 

package com.ys.penspark.util;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.net.URI;

/**
 * @ClassName: HdfsOperate
 * @Description:
 * @Author: Administrator
 * @Date: 2017/6/28
 */
public class HdfsOperate implements Serializable {
    private static Logger logger = LoggerFactory.getLogger(HdfsOperate.class);
    private static Configuration conf = new Configuration();
    private static BufferedWriter writer = null;

    //在hdfs的目标位置新建一个文件,得到一个输出流
    public static void openHdfsFile(String path) throws Exception {
        FileSystem fs = FileSystem.get(URI.create(path),conf);
        writer = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(path))));
        if(null!=writer){
            logger.info("[HdfsOperate]>> initialize writer succeed!");
        }
    }

    //往hdfs文件中写入数据
    public static void writeString(String line) {
        try {
            writer.write(line + "\n");
        }catch(Exception e){
            logger.error("[HdfsOperate]>> writer a line error:"  ,  e);
        }
    }

    //关闭hdfs输出流
    public static void closeHdfsFile() {
        try {
            if (null != writer) {
                writer.close();
                logger.info("[HdfsOperate]>> closeHdfsFile close writer succeed!");
            }
            else{
                logger.error("[HdfsOperate]>> closeHdfsFile writer is null");
            }
        }catch(Exception e){
            logger.error("[HdfsOperate]>> closeHdfsFile close hdfs error:" + e);
        }
    }

}

 

  先将spark的Rdd重新分区,再将每个分区的数据collectPartitions按行写入hdfs文件中

package com.ys.penspark.util;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
 * @ClassName: FeatureExtractor
 * @Description:
 * @Author: mashiwei
 * @Date: 2017/6/28
 */
public class FeatureExtractor implements Serializable{
    private static Logger logger = LoggerFactory.getLogger(FeatureExtractor.class);

    public void extractFeature(Dataset<Row> s, int repartitionNum,String out) throws Exception {

        StringBuffer sb = new StringBuffer();
        for (int i = 0; i<= s.schema().fieldNames().length-1;i++) {
            sb.append(s.schema().fieldNames()[i]);
            if (i == s.schema().fieldNames().length-1){
               break;
            }
            sb.append(",");
        }

        s.show();

        JavaRDD<String> rddx = s.toJavaRDD().map(new ExtractFeatureMap()).repartition(repartitionNum);

        //写入hdfs文件位置
//        String destinationPath = "/kettle/penspark/data.txt" ;
        //创建Hdfs文件,打开Hdfs输出流
        HdfsOperate.openHdfsFile(out);
        HdfsOperate.writeString(sb.toString());
        //分块读取RDD数据并保存到hdfs
        //如果直接用collect()函数获取List<String>,可能因数据量过大超过内存空间而失败
        for (int i = 0; i < repartitionNum; i++) {
            int[] index = new int[1];
            index[0] = i;
//            List<String>[] featureList = rddx.collectPartitions(index);
//            List<String> strs = rddx.collect();
            List<String>[] featureList = rddx.collectPartitions(index);
            if (featureList.length != 1) {
                logger.error("[FeatureExtractor]>> featureList.length is not 1!");
            }
            for (String str : featureList[0]) {
                //写一行到Hdfs文件
                logger.info("-----"+str);
                HdfsOperate.writeString(str);
            }
        }
        //关闭Hdfs输出流
        HdfsOperate.closeHdfsFile();

    }
    class ExtractFeatureMap implements Function<Row, String> {


        @Override
        public String call(Row line) throws Exception {
            try {
                StringBuffer sb = new StringBuffer();
                int len = line.length();
                for (int i = 0; i<= len-1; i++){
                    sb.append(line.get(i).toString());
                    if (i == len-1){
                        break;
                    }
                    sb.append(",");
                }
                return sb.toString();

            } catch (Exception e) {
                logger.error("[FeatureExtractor]>>GetTokenAndKeywordFeature error:", e);
            }

            return null;
        }
    }


    public static void main(String[] args) {

//        SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");
//        JavaSparkContext sc= new JavaSparkContext(conf);

        StructType Schemafinal = new StructType();
        Map<String,String> options = new HashMap<String,String>();
        LinkedList<StructField> obj = new LinkedList<StructField>();
        StructField structField = new StructField("name", DataTypes.StringType, true, Metadata.empty());
        StructField structField1 = new StructField("age", DataTypes.StringType, true, Metadata.empty());
//        StructField structField2 = new StructField("字段2", DataTypes.StringType, true, Metadata.empty());
//        StructField structField3 = new StructField("字段3", DataTypes.StringType, true, Metadata.empty());
        obj.add(structField);
        obj.add(structField1);
//        obj.add(structField2);
//        obj.add(structField3);

        Schemafinal = new StructType(obj.toArray(new StructField[obj.size()]));
        SparkConf conf = new SparkConf().setAppName("Example App").setMaster("local[*]");
        options.put("delimiter",",");
        options.put("header","true");
        JavaSparkContext sc = new JavaSparkContext(conf);
        @SuppressWarnings("deprecation")
        SQLContext sqlContext = new SQLContext(sc);
        SparkSession spark = SparkSession
                .builder()
                .appName("Pentaho Logic as Spark")
                .config("spark.some.config.option", "some-value")
                .config("spark.sql.warehouse.dir", "file:///C:/tmp/")
                .getOrCreate();


        Dataset<Row> tempdf = spark.read()
                .format("com.databricks.spark.csv")
                .options(options)
                .schema(Schemafinal)
                .option("header", true)
                .load("file:///"+"C:\\Users\\Administrator\\Desktop\\测试\\功能开发\\excel.txt");
        tempdf.show();
        FeatureExtractor fx = new FeatureExtractor();
        try {
//            fx.extractFeature(sc,5);
            fx.extractFeature(tempdf,2,"/kettle/tempData.txt");
        } catch (Exception e) {
            e.printStackTrace();
        }


    }
}

  数据

name,age
zs, 44
li, 22
ww, 18