Spark学习之数据读取与保存总结(一)

时间:2022-10-22 22:20:21

一、动机

  我们已经学了很多在 Spark 中对已分发的数据执行的操作。到目前为止,所展示的示例都是从本地集合或者普通文件中进行数据读取和保存的。但有时候,数据量可能大到无法放在一台机器中,这时就需要探索别的数据读取和保存的方法了。

  Spark 及其生态系统提供了很多可选方案。本章会介绍以下三类常见的数据源。

  • 文件格式与文件系统:对于存储在本地文件系统或分布式文件系统(比如 NFS、HDFS、Amazon S3 等)中的数据,Spark 可以访问很多种不同的文件格式,包括文本文件、JSON、SequenceFile,以及 protocol buffer。我们会展示几种常见格式的用法,以及 Spark 针对不同文件系统的配置和压缩选项。

  • Spark SQL中的结构化数据源:后面会学习 Spark SQL 模块,它针对包括 JSON 和 Apache Hive 在内的结构化数据源,为我们提供了一套更加简洁高效的 API。此处会粗略地介绍一下如何使用 SparkSQL。

  • 数据库与键值存储:概述 Spark 自带的库和一些第三方库,它们可以用来连接 Cassandra、HBase、Elasticsearch 以及 JDBC 源。

二、文件格式

  Spark 对很多种文件格式的读取和保存方式都很简单。从诸如文本文件的非结构化的文件,到诸如 JSON 格式的半结构化的文件,再到诸如 SequenceFile 这样的结构化的文件,Spark都可以支持(见表)。Spark 会根据文件扩展名选择对应的处理方式。这一过程是封装好的,对用户透明。

  Spark学习之数据读取与保存总结(一)

1、文本文件

  在 Spark 中读写文本文件很容易。当我们将一个文本文件读取为 RDD 时,输入的每一行都会成为 RDD 的一个元素。( SparkContext.wholeTextFiles() 方法)也可以将多个完整的文本文件一次性读取为一个 pair RDD,其中键是文件名,值是文件内容。

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf object Test {
def main(args: Array[String]): Unit = {
// Scala 中读取一个文本文件
val conf = new SparkConf().setAppName("wordcount").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN") // 设置日志显示级别
val input = sc.textFile("words.txt")
input.foreach(println)
} }

  Spark学习之数据读取与保存总结(一)

2、保存文本文件

  输出文本文件也相当简单。saveAsTextFile(outputFile) 方法接收一个路径,并将RDD 中的内容都输入到路径对应的文件中。Spark 将传入的路径作为目录对待,会在那个目录下输出多个文件。这样,Spark 就可以从多个节点上并行输出了。在这个方法中,我们不能控制数据的哪一部分输出到哪个文件中,不过有些输出格式支持控制。

3、读取JSON

  JSON 是一种使用较广的半结构化数据格式。这里有两种方式解析JSON数据,一种是通过Scala自带的JSON包(import scala.util.parsing.json.JSON)。后面还会展示使用Spark SQL读取JSON数据。

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import scala.util.parsing.json.JSON object Test {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("JSONTest").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN") // 设置日志显示级别
val inputFile = "pandainfo.json"//读取json文件
val jsonStr = sc.textFile(inputFile);
val result = jsonStr.map(s => JSON.parseFull(s));//逐个JSON字符串解析
result.foreach(
{
r => r match {
case Some(map:Map[String,Any]) => println(map)
case None => println("parsing failed!")
case other => println("unknown data structure" + other)
}
}
);
} }

  Spark学习之数据读取与保存总结(一)

  Spark学习之数据读取与保存总结(一)

  第二种方法是通过json4s来解析JSON文件。

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.json4s.jackson.Serialization
import org.json4s.ShortTypeHints
import org.json4s.jackson.JsonMethods._
import org.json4s.DefaultFormats object Test {
def main(args: Array[String]): Unit = {
// 第二种方法解析json文件
val conf = new SparkConf().setAppName("wordcount").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN") // 设置日志显示级别
implicit val formats = Serialization.formats(ShortTypeHints(List()))
val input = sc.textFile("pandainfo.json")
input.collect().foreach(x=>{
var c = parse(x).extract[Panda]
println(c.name+","+c.lovesPandas)
})
case class Panda(name:String,lovesPandas:Boolean)
} }

  Spark学习之数据读取与保存总结(一)

4、保存JSON

  写出 JSON 文件比读取它要简单得多。

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.json4s.jackson.Serialization
import org.json4s.ShortTypeHints
import org.json4s.jackson.JsonMethods._
import org.json4s.DefaultFormats object Test {
def main(args: Array[String]): Unit = {
// 第二种方法解析json文件
val conf = new SparkConf().setAppName("wordcount").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN") // 设置日志显示级别
implicit val formats = Serialization.formats(ShortTypeHints(List()))
val input = sc.textFile("pandainfo.json")
input.collect().foreach(x=>{
var c = parse(x).extract[Panda]
println(c.name+","+c.lovesPandas)
})
case class Panda(name:String,lovesPandas:Boolean) // 保存json
val datasave = input.map{ myrecord =>
implicit val formats = DefaultFormats
val jsonObj = parse(myrecord)
jsonObj.extract[Panda]
}
datasave.saveAsTextFile("savejson")
} }

  Spark学习之数据读取与保存总结(一)

5、逗号分隔值与制表符分隔值

  逗号分隔值(CSV)文件每行都有固定数目的字段,字段间用逗号隔开(在制表符分隔值文件,即 TSV 文件中用制表符隔开)。记录通常是一行一条,不过也不总是这样,有时也可以跨行。读取 CSV/TSV 数据和读取 JSON 数据相似,都需要先把文件当作普通文本文件来读取数据,再对数据进行处理。

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf import au.com.bytecode.opencsv.CSVReader
import java.io.StringReader
import java.io.StringWriter
import au.com.bytecode.opencsv.CSVWriter object Test {
def main(args: Array[String]): Unit = { // 在Scala中使用textFile()读取CSV
val conf = new SparkConf().setAppName("wordcount").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN") // 设置日志显示级别
val inputFile = "favourite_animals.csv"//读取csv文件
val input = sc.textFile(inputFile)
val result = input.map{
line => val reader = new CSVReader(new StringReader(line))
reader.readNext()
}
// result.foreach(println)
for(res <- result)
for(r <- res)
println(r)
} }

  Spark学习之数据读取与保存总结(一)

  Spark学习之数据读取与保存总结(一)

6、SequenceFile

  SequenceFile 是由没有相对关系结构的键值对文件组成的常用 Hadoop 格式。SequenceFile文件有同步标记,Spark 可以用它来定位到文件中的某个点,然后再与记录的边界对齐。这可以让 Spark 使用多个节点高效地并行读取 SequenceFile 文件。SequenceFile 也是Hadoop MapReduce 作业中常用的输入输出格式,所以如果你在使用一个已有的 Hadoop 系统,数据很有可能是以 SequenceFile 的格式供你使用的。

  由于 Hadoop 使用了一套自定义的序列化框架,因此 SequenceFile 是由实现 Hadoop 的 Writable接口的元素组成。下表 列出了一些常见的数据类型以及它们对应的 Writable 类。标准的经验法则是尝试在类名的后面加上 Writable 这个词,然后检查它是否是 org.apache.hadoop.io.Writable 已知的子类。如果你无法为要写出的数据找到对应的 Writable 类型(比如自定义的 case class),你可以通过重载 org.apache.hadoop.io.Writable 中的 readfields 和 write 来实现自己的 Writable 类。

  Spark学习之数据读取与保存总结(一)

  读取SequenceFile:Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以调用 sequenceFile(path,keyClass, valueClass, minPartitions) 。前面提到过,SequenceFile 使用 Writable 类,因此 keyClass 和 valueClass 参数都必须使用正确的 Writable 类。

  保存SequenceFile:在 Scala 中将数据写出到 SequenceFile 的做法也很类似。可以直接调用 saveSequenceFile(path) 保存你的 PairRDD ,它会帮你写出数据。

7、对象文件

  对象文件看起来就像是对 SequenceFile 的简单封装,它允许存储只包含值的 RDD。和SequenceFile 不一样的是,对象文件是使用 Java 序列化写出的。要保存对象文件,只需在 RDD 上调用 saveAsObjectFile 就行了。读回对象文件也相当简单:用 SparkContext 中的 objectFile() 函数接收一个路径,返回对应的 RDD。

Spark学习之数据读取与保存总结(一)的更多相关文章

  1. Spark学习之数据读取与保存(4)

    Spark学习之数据读取与保存(4) 1. 文件格式 Spark对很多种文件格式的读取和保存方式都很简单. 如文本文件的非结构化的文件,如JSON的半结构化文件,如SequenceFile结构化文件. ...

  2. Spark学习之数据读取与保存总结&lpar;二&rpar;

    8.Hadoop输入输出格式 除了 Spark 封装的格式之外,也可以与任何 Hadoop 支持的格式交互.Spark 支持新旧两套Hadoop 文件 API,提供了很大的灵活性. 要使用新版的 Ha ...

  3. Spark学习笔记——数据读取和保存

    spark所支持的文件格式 1.文本文件 在 Spark 中读写文本文件很容易. 当我们将一个文本文件读取为 RDD 时,输入的每一行 都会成为 RDD 的 一个元素. 也可以将多个完整的文本文件一次 ...

  4. 【原】Learning Spark &lpar;Python版&rpar; 学习笔记&lpar;二&rpar;----键值对、数据读取与保存、共享特性

    本来应该上周更新的,结果碰上五一,懒癌发作,就推迟了 = =.以后还是要按时完成任务.废话不多说,第四章-第六章主要讲了三个内容:键值对.数据读取与保存与Spark的两个共享特性(累加器和广播变量). ...

  5. Spark学习笔记4:数据读取与保存

    Spark对很多种文件格式的读取和保存方式都很简单.Spark会根据文件扩展名选择对应的处理方式. Spark支持的一些常见文件格式如下: 文本文件 使用文件路径作为参数调用SparkContext中 ...

  6. Spark基础:(四)Spark 数据读取与保存

    1.文件格式 Spark对很多种文件格式的读取和保存方式都很简单. (1)文本文件 读取: 将一个文本文件读取为一个RDD时,输入的每一行都将成为RDD的一个元素. val input=sc.text ...

  7. Spark&lpar;十二&rpar;【SparkSql中数据读取和保存】

    一. 读取和保存说明 SparkSQL提供了通用的保存数据和数据加载的方式,还提供了专用的方式 读取:通用和专用 保存 保存有四种模式: 默认: error : 输出目录存在就报错 append: 向 ...

  8. 【Spark机器学习速成宝典】基础篇03数据读取与保存(Python版)

    目录 保存为文本文件:saveAsTextFile 保存为json:saveAsTextFile 保存为SequenceFile:saveAsSequenceFile 读取hive 保存为文本文件:s ...

  9. matlab各格式数据读取与保存函数

    数据处理及matlab的初学者,可能最一开始接触的就是数据的读取与保存: %matlab数据保存与读入 function datepro clear all; %产生随机数据 mat = rand(, ...

随机推荐

  1. HDU 4405 Aeroplane chess 概率DP 难度&colon;0

    http://acm.hdu.edu.cn/showproblem.php?pid=4405 明显,有飞机的时候不需要考虑骰子,一定是乘飞机更优 设E[i]为分数为i时还需要走的步数期望,j为某个可能 ...

  2. 每个项目单独配置 git 用户

    git多账号登陆问题 设置git全局设置: git config --global user.name "your_name"  git config --global user. ...

  3. Nginx 安装与部署配置以及Nginx和uWSGI开机自启

    下载 官方网站:https://nginx.org/en/download.html Windows下安装 安装 下载后解压(切记不能含有中文路径!!),文件结构如图(我解压的路径就有中文,记得拷贝放 ...

  4. 教你用Windows自带工具给优盘&sol;移动硬盘添加密码

    教你用Windows自带工具给优盘/移动硬盘添加密码 本文中优盘,移动硬盘和分区操作方式一样,为方便描述,下文将只说优盘 优盘成了很多人每天都会用到的工具,有时候自己优盘会存着一些不希望别人看到的文件 ...

  5. ucml JS调用其它页面上的服务端方法

    var params = { _bpoName: "BPO_KH_ED" + "Service", //BPO的名字(拥有那个服务端函数的BPO) _metho ...

  6. tomcat 拒绝服务

    一 尝试重新下载二进制安装包安装包 wget http://mirror.bit.edu.cn/apache/tomcat/tomcat-9/v9.0.16/bin/apache-tomcat-9.0 ...

  7. 用c语言如何在数字前自动补0

    一: #include <stdio.h>int main(){ long a=3,b=4,c=15; printf("......."a,b,c);return 0; ...

  8. BZOJ 2333&colon; &lbrack;SCOI2011&rsqb;棘手的操作 可并堆 左偏树 set

    https://www.lydsy.com/JudgeOnline/problem.php?id=2333 需要两个结构分别维护每个连通块的最大值和所有连通块最大值中的最大值,可以用两个可并堆实现,也 ...

  9. VC的常用调试方法

    前言 VS是非常强大的IDE,所以掌握VSVC的常用方法,将会使得我们找出问题解决问题事半功倍. 目录 VSVC的常用调试方法 前言 1. Watch窗口查看伪变量 2. 查看指针指向的一序列值 3. ...

  10. 用Python处理邮件,全文完

    http://www.chinaunix.net/old_jh/55/575710.html