【文件属性】:
文件名称:spark sftp 2.11
文件大小:40KB
文件格式:JAR
更新时间:2021-07-19 09:43:21
spark sftp
spark 读取 linux sftp上的文本文件,原jar只支持josn,csv等,增加bcp,txt文件的支持
下面是例子:
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkDataFrame");
JavaSparkContext javacontext = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(javacontext);
Dataset df = sqlContext.read().
format("com.springml.spark.sftp").
option("host", "192.168.1.3").
option("username", "root").
option("password", "111111").
option("fileType", "bcp").
load("/sparktest/sparkfile0.bcp");
/*List list = df.collectAsList();
for(Row row:list){
String[] words = new String(row.getString(0).getBytes(),0,row.getString(0).length(),"UTF-8").split(" ",-1);
for(int i=0;i rowRdd = df.javaRDD();
JavaRDD words_bcp= rowRdd.map(new Function() {
@Override
public Row call(Row row) throws Exception {
// TODO Auto-generated method stub
String line = row.getString(0);
String[] words = new String(line.getBytes(),0,line.getBytes().length,"utf-8").split(" ",-1);
return RowFactory.create(words);
}
});
List list = words_bcp.collect();
for(Row row:list){
System.out.println("row1=="+row.getString(0));
}
df.write().format("com.springml.spark.sftp").
option("host", "192.168.1.3").
option("username", "root").
option("password", "111111").
option("fileType", "bcp").
save("/sparktest/luozhao.bcp");
df.show();
javacontext.close();
}