batch批处理实例
下面的示例程序将展示flink的不同应用程序从简单的单词计数到图计算。示例代码演示使用Flink的DataSet API。
以下的全部源代码和更多的例子可以在flink源码仓库的flink-examples-batch或者flink-examples-streaming模块中看到。
- 运行一个示例
- Word Count 单词计数
- Page Rank 网页排名
- Connected Componexts 连通分支
- Relational Query 关系查询
运行一个示例
为了运行flink程序,我们假设你已经有一个可以运行的flink集群。在快速开始和实例开发模块中详细描述了如何启动flink。
最简单的方法是运行 ./bin/start-local.sh 脚本。这将启动一个本地的JobManager。
flink的每一个二进制发布包都包含一个examples目录,里面包含了这个页面的所有例子对应的jar包。
运行wordCount程序,执行以下命令:
./bin/flink run ./examples/batch/WordCount.jar
其它例子可以以类型的方式执行。
注意很多例子运行的时候不需要传递任何参数,因为使用的是内置的数据。使用wordCount运行真实数据,你必须通过路径指定具体的数据
./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result
注意,非本地文件系统的话需要模式前缀,例如:hdfs://
Word Count 单词计数
WordCount是大数据系统中的"hello world"入门程序。它计算单词在文本集合中的频率。
该算法有两步骤:首先,该文本被切割成单词,然后,这些单词被分组和求和。
java代码:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> text = env.readTextFile("/path/to/file");DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1);counts.writeAsCsv(outputPath, "\n", " ");// User-defined functionspublic static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } }}
scala代码:
val env = ExecutionEnvironment.getExecutionEnvironment// get input dataval text = env.readTextFile("/path/to/file")val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .groupBy(0) .sum(1)counts.writeAsCsv(outputPath, "\n", " ")此链接中的 wordCount example代码实现了输入参数:--input <path> --output <path>, 作为测试数据,任何文件文件都可以。
Page Rank 网页排名
PageRank算法通过图中定义的链接计算网页重要性,从一个页面到另一个页面。这是一个迭代的图算法,这意味着它需要执行相同的计算。在每个迭代中,每一页分配当前排名给它所有的邻居,并从邻居节点计算其新等级。PageRank算法是由google进行普及的,主要用在计算网页的重要性排名,应用在搜索结果里面。
在这个简单示例中,PageRank实现批量得带和固定数量的迭代。
java代码:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// read the pages and initial ranks by parsing a CSV fileDataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath) .types(Long.class, Double.class)// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);// set iterative data setIterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);DataSet<Tuple2<Long, Double>> newRanks = iteration // join pages with outgoing edges and distribute rank .join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch()) // collect and sum ranks .groupBy(0).sum(1) // apply dampening factor .map(new Dampener(DAMPENING_FACTOR, numPages));DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith( newRanks, newRanks.join(iteration).where(0).equalTo(0) // termination condition .filter(new EpsilonFilter()));finalPageRanks.writeAsCsv(outputPath, "\n", " ");// User-defined functionspublic static final class JoinVertexWithEdgesMatch implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>, Tuple2<Long, Double>> { @Override public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj, Collector<Tuple2<Long, Double>> out) { Long[] neighbors = adj.f1; double rank = page.f1; double rankToDistribute = rank / ((double) neigbors.length); for (int i = 0; i < neighbors.length; i++) { out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute)); } }}public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> { private final double dampening, randomJump; public Dampener(double dampening, double numVertices) { this.dampening = dampening; this.randomJump = (1 - dampening) / numVertices; } @Override public Tuple2<Long, Double> map(Tuple2<Long, Double> value) { value.f1 = (value.f1 * dampening) + randomJump; return value; }}public static final class EpsilonFilter implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> { @Override public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) { return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON; }}
scala代码:
// User-defined typescase class Link(sourceId: Long, targetId: Long)case class Page(pageId: Long, rank: Double)case class AdjacencyList(sourceId: Long, targetIds: Array[Long])// set up execution environmentval env = ExecutionEnvironment.getExecutionEnvironment// read the pages and initial ranks by parsing a CSV fileval pages = env.readCsvFile[Page](pagesInputPath)// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))val links = env.readCsvFile[Link](linksInputPath)// assign initial ranks to pagesval pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))// build adjacency list from link inputval adjacencyLists = links // initialize lists .map(e => AdjacencyList(e.sourceId, Array(e.targetId))) // concatenate lists .groupBy("sourceId").reduce { (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds) }// start iterationval finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) { currentRanks => val newRanks = currentRanks // distribute ranks to target pages .join(adjacencyLists).where("pageId").equalTo("sourceId") { (page, adjacent, out: Collector[Page]) => for (targetId <- adjacent.targetIds) { out.collect(Page(targetId, page.rank / adjacent.targetIds.length)) } } // collect ranks and sum them up .groupBy("pageId").aggregate(SUM, "rank") // apply dampening factor .map { p => Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages)) } // terminate if no rank update was significant val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") { (current, next, out: Collector[Int]) => // check for significant update if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1) } (newRanks, termination)}val result = finalRanks// emit resultresult.writeAsCsv(outputPath, "\n", " ")
这个PageRank程序实现了上面的例子。它需要以下参数运行:
--pages <path> --links <path> --output <path> --numPages <n> --iterations <n>
输入文件必须是纯文本文件,必须格式化为如下格式:
- pages 使用一个long型的id进行表示,通过换行符进行分割。
- 例如: "1\n2\n12\n42\n63\n" 表示给了5个pages 1, 2, 12, 42, 和63.
- links 使用一对page id进行表示,这一对page id通过空格进行分割。Links通过换行符进行分割
- 例如:"1 2\n2 12\n1 12\n42 63\n" 表示给了4对 links (1)->(2), (2)->(12), (1)->(12), 和(42)->(63)
对于这个简单的实现,要求每一页至少有一个输入和输出连接。一个页面可以指向它自己。
Connected Componexts 连通分支
算法识别部分的连接组件的一个更大的图连接通过分配所有顶点在相同的连接部分组件ID。类似于PageRank,连接组件是一个迭代算法。在每一步,每个顶点传播当前所有相邻组件ID。一个顶点接受邻居的组件ID,如果它小于自己的组件ID
这个实现使用一个增量迭代:顶点并没有改变他们的组件ID不参与下一步。这收益更好的性能,因为后来的迭代通常只处理一些异常值顶点
java代码
// read vertex and edge dataDataSet<Long> vertices = getVertexDataSet(env);DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());// assign the initial component IDs (equal to the vertex ID)DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());// open a delta iterationDeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);// apply the step logic:DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset() // join with the edges .join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin()) // select the minimum neighbor component ID .groupBy(0).aggregate(Aggregations.MIN, 1) // update if the component ID of the candidate is smaller .join(iteration.getSolutionSet()).where(0).equalTo(0) .flatMap(new ComponentIdFilter());// close the delta iteration (delta and new workset are identical)DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);// emit resultresult.writeAsCsv(outputPath, "\n", " ");// User-defined functionspublic static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> { @Override public Tuple2<T, T> map(T vertex) { return new Tuple2<T, T>(vertex, vertex); }}public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>(); @Override public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) { invertedEdge.f0 = edge.f1; invertedEdge.f1 = edge.f0; out.collect(edge); out.collect(invertedEdge); }}public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { @Override public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) { return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1); }}public static final class ComponentIdFilter implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> { @Override public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) { if (value.f0.f1 < value.f1.f1) { out.collect(value.f0); } }}
scala代码
// set up execution environmentval env = ExecutionEnvironment.getExecutionEnvironment// read vertex and edge data// assign the initial components (equal to the vertex id)val vertices = getVerticesDataSet(env).map { id => (id, id) }// undirected edges by emitting for each input edge the input edges itself and an inverted// versionval edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }// open a delta iterationval verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) { (s, ws) => // apply the step logic: join with the edges val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) => (edge._2, vertex._2) } // select the minimum neighbor val minNeighbors = allNeighbors.groupBy(0).min(1) // update if the component of the candidate is smaller val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) { (newVertex, oldVertex, out: Collector[(Long, Long)]) => if (newVertex._2 < oldVertex._2) out.collect(newVertex) } // delta and new workset are identical (updatedComponents, updatedComponents)}verticesWithComponents.writeAsCsv(outputPath, "\n", " ")ConnectedComponents程序实现了上面的例子。它需要以下参数运行:
--vertices <path> --edges <path> --output <path> --iterations <n>
输入文件必须是纯文本文件,必须格式化为如下格式:
- Vertices(顶点)使用id表示,使用换行符隔开
- 例如 "1\n2\n12\n42\n63\n" 表示指定5个顶点 (1), (2), (12), (42), and (63).
- 顶点的边缘表示为两个id,使用空格隔开,边缘由换行符隔开
- 例如: "1 2\n2 12\n1 12\n42 63\n" 表示4个无向链接 (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
Relational Query 关系查询
关系查询的例子假定两个表,一个订单和指定的其他与lineitem tpc - h基准决策支持。tpc - h是一个标准的数据库行业的基准。见下文说明如何生成输入数据
这个例子实现了下面的SQL查询
SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue FROM orders, lineitemWHERE l_orderkey = o_orderkey AND o_orderstatus = "F" AND YEAR(o_orderdate) > 1993 AND o_orderpriority LIKE "5%"GROUP BY l_orderkey, o_shippriority;
Flink程序,实现了上面的查询,看起来如下:
java代码:
// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);// get lineitem data set: (orderkey, extendedprice)DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);// orders filtered by year: (orderkey, custkey)DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear = // filter orders orders.filter( new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() { @Override public boolean filter(Tuple5<Integer, String, String, String, Integer> t) { // status filter if(!t.f1.equals(STATUS_FILTER)) { return false; // year filter } else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) { return false; // order priority filter } else if(!t.f3.startsWith(OPRIO_FILTER)) { return false; } return true; } }) // project fields out that are no longer required .project(0,4).types(Integer.class, Integer.class);// join orders with lineitems: (orderkey, shippriority, extendedprice)DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders = ordersFilteredByYear.joinWithHuge(lineitems) .where(0).equalTo(0) .projectFirst(0,1).projectSecond(1) .types(Integer.class, Integer.class, Double.class);// extendedprice sums: (orderkey, shippriority, sum(extendedprice))DataSet<Tuple3<Integer, Integer, Double>> priceSums = // group by order and sum extendedprice lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);// emit resultpriceSums.writeAsCsv(outputPath);
这个Relational Query实现了上面的查询。它需要以下参数运行:
--orders <path> --lineitem <path> --output <path>
可以生成订单和lineitem文件使用tpc - h基准测试套件的数据生成器工具(DBGEN)。采取以下步骤生成任意大Flink提供程序的输入文件
- 下载并解压DBGEN
- 复制makefile.suite 为 makefile,并且执行以下修改
DATABASE = DB2MACHINE = LINUXWORKLOAD = TPCHCC = gcc
- 使用make命令构架DBGEN
- 使用dbgen生成lineitem和命令关系,比例因子(s)在生成的数据集1的结果大约1 GB大小。
./dbgen -T o -s 1
大数据技术问题欢迎加群交流:
QQ群号:555684318