DataSet API主要可以分为3块来分析:DataSource、Transformation、Sink。
- DataSource是程序的数据源输入。
- Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,例如map、flatMap、filter等操作。
- DataSink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中。
一、DataSet API之DataSource
针对DataSet批处理而言,其实最多的就是读取HDFS中的文件数据,所以在这里我们主要介绍两个DataSource组件。
- 基于集合。fromCollection(Collection),主要是为了方便测试使用。它的用法和DataStreamAPI中的用法一样,我们已经用过很多次了。
- 基于文件。readTextFile(path),读取hdfs中的数据文件。这个前面我们也使用过了。
二、DataSet API之Transformation
1. mapPartition
mapPartition算子和spark中的用法一样,mapPartition就是一次处理一批数据,如果在处理数据的时候想要获取第三方资源连接,建议使用mapPartition,这样可以一批数据获取一次连接,提高性能。
scala代码:
package com.imooc.scala.batch
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer
object BatchMapPartitionScala {
def main(args: Array[String]): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//生成数据源数据
val text: DataSet[String] = env.fromCollection(Array("hello you", "hello me"))
//每次处理一个分区数据
text.mapPartition(it => {
val res: ListBuffer[String] = ListBuffer[String]()
it.foreach(line => {
val words: Array[String] = line.split(" ")
for(word <- words){
res.append(word)
}
})
res
//关闭数据库连接
}).print()
//注意:针对DataSetAPI,如果在后面调用的是count、collect、print,则最后不需要指定execute即可。
//env.execute("BatchMapPartitionScala")
}
}
Java代码:
package com.imooc.java.batch;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.Iterator;
public class BatchMapPartitionJava {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 生成数据源数据
DataSource<String> text = env.fromCollection(Arrays.asList("hello you", "hello me"));
//每次处理一个分区的数据
text.mapPartition(new MapPartitionFunction<String, String>() {
@Override
public void mapPartition(Iterable<String> iterable, Collector<String> collector) throws Exception {
//可以在此处创建数据库连接,建议把这块代码放到try-catch代码块中
Iterator<String> it = iterable.iterator();
while(it.hasNext()){
String line = it.next();
String[] words = line.split(" ");
for (String word: words){
collector.collect(word);
}
}
}
}).print();
}
}
2. join : 内连接,可以连接两份数据集
scala代码
package com.imooc.java.batch
import org.apache.flink.api.scala.ExecutionEnvironment
object BatchJoinScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//初始化第一份数据 Tuple2<用户id,用户姓名>
val text1 = env.fromCollection(Array((1, "jack"), (2, "tom"), (3, "mick")))
//初始化第二份数据 Tuple2<用户id,用户所在城市>
val text2 = env.fromCollection(Array((1, "bj"), (2, "sh"), (4, "gz")))
//对两份数据进行join操作
//注意:这里的where和equalTo实现了类似于on fieldA=fieldB的效果
//where:指定左边数据集中参与比较的元素角标,equalTo指定右边数据集中参与比较的元素角标
text1.join(text2).where(0).equalTo(0) {
(first, second) => (first._1, first._2, second._2)
}.print()
}
}
3. cross : 获取两个数据集的笛卡尔积
scala代码
package com.imooc.scala.batch
import org.apache.flink.api.scala.ExecutionEnvironment
object BatchCrossScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//初始化第一份数据
val text1 = env.fromCollection(Array(1, 2))
//初始化第二份数据
val text2 = env.fromCollection(Array(3, 4))
//执行cross操作
text1.cross(text2).print()
}
}
4. union:返回两个数据集的总和,数据类型需要一致
和DataStreamAPI中的union操作功能一样
5. first-n :获取集合中的前N个元素
6. groupBy :分组
7. sortGroup:分组内排序
8. 实例:获取分组排序后每组的前N个元素
scala代码
package com.imooc.scala.batch
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer
object BatchFirstNScala {
def main(args: Array[String]): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val data: ListBuffer[(Int, String)] = ListBuffer[Tuple2[Int, String]]()
data.append((2,"zs"))
data.append((4,"ls"))
data.append((3,"ww"))
data.append((1,"aw"))
data.append((1,"xw"))
data.append((1,"mw"))
import org.apache.flink.api.scala._
val text: DataSet[(Int, String)] = env.fromCollection(data)
//获取前三条数据
// text.first(3).print()
//根据数据中的第一列进行分组,获取每组的前2个元素
// text.groupBy(0).first(2).print()
//根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素
//分组排序取TopN
text.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print()
}
}
Java代码
package com.imooc.java.batch;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.ArrayList;
public class BatchFirstNJava {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<Integer,String>(2,"zs"));
data.add(new Tuple2<Integer,String>(4,"ls"));
data.add(new Tuple2<Integer,String>(3,"ww"));
data.add(new Tuple2<Integer,String>(1,"aw"));
data.add(new Tuple2<Integer,String>(1,"xw"));
data.add(new Tuple2<Integer,String>(1,"mw"));
DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);
//获取前3条数据,按照数据插入的顺序
// text.first(3).print();
// text.groupBy(0).first(2).print();
//根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素
text.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();
}
}
三、DataSet API之DataSink
Flink针对DataSet提供了一些已经实现好的数据目的地
其中最常见的是向HDFS中写入数据
(1)writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
(2)writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的,每个字段的值来自对象的toString()方法
(3)还有一个是print:打印每个元素的toString()方法的值,这个print是测试的时候使用的。