Introduction
Spark provides a unified runtime for big data. HDFS, which is Hadoop's filesystem, is the most used storage platform for Spark as it provides const-effefctive storage for unstructured and semi-structured data on commodity hardware. Spark is not limited to HDFS and can work with any Hadoop-supported storage.
Hadoop supported storage means a storage format that can work with Hadoop's InputFormat and OutputFormat interfaces. InputFormats is responsible for creating InputSplits from input data and dividing it further into records. OutputFormat is responsible for writing to storage.
Loading data from the local filesystem
Though the local filesystem is not a good fit to store big data due to disk size limitations and lack of distributed nature, technically you can load data in distributed systems using the local filesystem. But then the file/directory you are accessing has to be available on each node.
1. create the words directory
mkdir words
2. get into the words directory
cd words
3. create the sh.txt file
echo "to be or not to be" > sh.txt
4. start the spark shell
spark-shell
5. load the words directory as RDD
scala> val words = sc.textFile("file:///home/hduser/words")
6. count the number of lines
scala> words.count
7. divide the line (or lines) into multiple words
scala> val wordsFlatMap = words.flatMap(_.split("\\W+"))
8. convert word to (word,1)
scala> val wordsMap = wordsFlatMap.map( w => (w,1))
9. add the number of occurrences for each word
scala > val wordCount = wordsMap.reduceByKey( (a,b) => (a+b))
10. print the RDD
scala> wordCount.collect.foreach(println)
11. doing all in one step
scala> sc.textFile("file:///home/hduser/ words"). flatMap(_.split("\\W+")).map( w => (w,1)). reduceByKey( (a,b) => (a+b)).foreach(println)
Loading data from HDFS
HDFS is the most widely used big data storage system. One of the reasons for the wide adoption of HDFS is schema-on-read. What this means is that HDFS does not put any restriction on data when data is being written. Any and all kinds of data are welcome and can be stored in a raw format. This feature makes it ideal storage for raw unstructured data and semi-structured data.
1. create the words directory
mkdir words
2. get into the words directory
cd words
3. create the sh.txt file
echo "to be or not to be" > sh.txt
4. start the spark shell
spark-shell
5. load the words directory as RDD
scala> val words = sc.textFile("hdfs://localhost:9000/user/hduser/words")
6. count the number of lines
scala> words.count
7. divide the line (or lines) into multiple words
scala> val wordsFlatMap = words.flatMap(_.split("\\W+"))
8. convert word to (word,1)
scala> val wordsMap = wordsFlatMap.map( w => (w,1))
9. add the number of occurrences for each word
scala > val wordCount = wordsMap.reduceByKey( (a,b) => (a+b))
10. print the RDD
scala> wordCount.collect.foreach(println)
11. doing all in one step
scala> sc.textFile("file:///home/hduser/ words"). flatMap(_.split("\\W+")).map( w => (w,1)). reduceByKey( (a,b) => (a+b)).foreach(println)
Loading data from HDFS using a custom InputFormat
Sometimes you need to load data in a specific format and TextInputFormat is not a good fit for that. Spark provides two methods for this purpose:
1. sparkContext.hadoopFile: This supports the old MapReduce API
2. sparkContext.newAPIHadoopFile: This supports the new MapReduce API
These two methods provide support for all of Hadoop's built-in InputFormats interfaces as well as any custom InputFormat.
1. create the currency directory
mkdir currency
2. get into the words directory
cd words
3. create the na.txt file and upload the currency folder to HDFS
vi na.txt
United States of America US Dollar
Canada Canadian Dollar
Mexico Peso
hdfs dfs -put currency /user/hduser/currency
4. start the spark shell and import statements
spark-shell
scala> import org.apache.hadoop.io.Text
scala> import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
5. load the currency directory as RDD and convert it from tuple of (Text, Text) to tuple of (String, String)
val currencyFile = sc.newAPIHadoopFile("hdfs://localhost:9000/user/hduser/currency", classOf[KeyValueTextInputFormat], classOf[Text])
val currencyRDD = currencyFile.map(t => (t._1.toString, t._2.toString))
6. count the number of elements in the RDD
scala> currencyRDD.count
7. print the RDD
scala> currencyRDD.collect.foreach(println)
Loading data from Amazon S3
Amazon Simple Storage Service (S3) provides developers and IT teams with a secure, durable, and scalable storage platform. The biggest advantage of Amazon S3 is that there is no up-front IT investment and companies can build capacity (just by clicking a button a button) as they need.
Though Amazon S3 can be used with any compute platform, it integrates really well with Amazon's cloud services such as Amazon Elastic Compute Cloud (EC2) and Amazon Elastic Block Storage (EBS). For this reason, companies who use Amazon Web Services (AWS) are likely to have significant data is already stored on Amazon S3.
1. go to http://aws.amazon.com and log in with username and password
2. navigate to Storage & Content Delivery | S3 | Create Bucket
3. enter the bucket name - for example, com.infoobjects.wordcount
4. select Region, click on Create
5. click on Create Folder end enter words as the folder name
6. create sh.txt file on the local system
echo "to be or not to be" > sh.txt
7. navigate to Words | Upload | Add Files and choose sh.txt from the dialog box
8. click on Start Upload
9. select sh.txt and click on Properties
10. set AWS_ACCESS_KEY and AWS_SECRET_ACCESS_KEY as environment variables
11. open the spark shell and load the words directory from s3 in the words RDD
scala> val words = sc.textFile("s3n://com.infoobjects.wordcount/words")
Load data from Apache Cassandra
Apache Cassandra is a NoSQL database with a masterless ring cluster structure. While HDFS is a good fit for streaming data access, it does not work well with random access. For example, HDFS will work well when your average file size is 100 MB and you want to read the whole file. If you frequently access the nth line in a file or some other part as a record, HDFS would be too slow.
Relational databases have traditionally provided a solution to that, providing low latency, random access, but they do not work well with big data. NoSQL databases such as Cassandra fill the gap by providing relational database type access but in a distributed architecture on commodity servers.
1. create a keyspace named people in Cassandra using the CQL shell
cqlsh> CREATE KEYSPACE people WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
2. create a column family(from CQL 3.0 onwords, it can also be called a table) person in newer version of Cassandra
cqlsh> create columnfamily person(id int primary key, first_name varchar, last_name varchar);
3. insert a few records in the column family
cqlsh> insert into person(id,first_name,last_name) values(1,'Barack','Obama');
cqlsh> insert into person(id,first_name,last_name) values(2,'Joe','Smith');
4. add Cassandra connector dependency to SBT
"com.datastax.spark" % "spark-cassandra-connector" % 1.2.0
5. can also add the Cassandra dependency to Maven
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.2.0</version>
</dependency>
6. start the spark shell
spark-shell
7. set the spark.cassandra.connection.host property
scala> sc.getConf.set("spark.cassandra.connection.host", "localhost")
8. import Cassandra-specific libraries
scala> import com.datastax.spark.connector._
9. load the person column family as an RDD
scala> val personRDD = sc.cassandraTable("people", "person")
10. count the number of lines
scala> personRDD.count
11. print the RDD
scala> personRDD.collect.foreach(println)
12. retrieve the first row
scala> var firstRow = personRDD.first
13. get the column names
scala> firstRow.columnNames
14. access Cassandra through Spark SQL
scala> val cc = new org.apache.spark.sql.cassandra.CassandraSQLContext(sc)
15. load the person data as SchemaRDD
scala> val p = cc.sql("select * from people.person")
16. print the person data
scala> p.collect.foreach(println)
creating uber JARs with sbt-assembly plugin provided by SBT
1. mkdir uber
2. cd uber
3. open the SBT prompt
sbt
4. give the project a name sc-uber, save the session and exit
> set name := "sc-uber"
> session save
> exit
5. add the spark-cassandra-driver denpendency to build.sbt
vi build.sbt
name := "sc-uber"
libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector" % "1.1.0"
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => (xs map {_.toLowerCase}) match {
case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) => MergeStrategy.discard
case _ => MergeStrategy.discard
}
case _ => MergeStrategy.first
}
9. create plugins.sbt in the project folder
vi plugins.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")
10. build a JAR
sbt assembly
The uber JAR is now created in target/scala-2.10/sc-uber-assembly-0.1- SNAPSHOT.jar.
11. rename the JAR file
mv thirdparty/sc-uber-assembly-0.1-SNAPSHOT.jar thirdparty/sc-uber.jar
12. load the spark shell with the uber JAR
spark-shell --jars thirdparty/sc-uber.jar
13. call spark-submit with JARS option to submit Scala code to a cluster
spark-submit --jars thirdparty/sc-uber.jar