下面的示例程序将展示flink的不同应用程序从简单的单词计数到图计算。示例代码演示使用Flink的DataSet API。
- 运行一个示例
- Word Count 单词计数
- Page Rank 网页排名
- Connected Componexts 连通分支
- Relational Query 关系查询
最简单的方法是运行 ./bin/ 脚本。这将启动一个本地的JobManager。
./bin/flink run ./examples/batch/WordCount.jar
./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result
WordCount是大数据系统中的"hello world"入门程序。它计算单词在文本集合中的频率。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> text = env.readTextFile("/path/to/file");DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1);counts.writeAsCsv(outputPath, "\n", " ");// User-defined functionspublic static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } }}
val env = ExecutionEnvironment.getExecutionEnvironment// get input dataval text = env.readTextFile("/path/to/file")val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .groupBy(0) .sum(1)counts.writeAsCsv(outputPath, "\n", " ")此链接中的 wordCount example代码实现了输入参数:--input <path> --output <path>, 作为测试数据,任何文件文件都可以。
Page Rank 网页排名
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// read the pages and initial ranks by parsing a CSV fileDataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath) .types(Long.class, Double.class)// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);// set iterative data setIterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);DataSet<Tuple2<Long, Double>> newRanks = iteration // join pages with outgoing edges and distribute rank .join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch()) // collect and sum ranks .groupBy(0).sum(1) // apply dampening factor .map(new Dampener(DAMPENING_FACTOR, numPages));DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith( newRanks, newRanks.join(iteration).where(0).equalTo(0) // termination condition .filter(new EpsilonFilter()));finalPageRanks.writeAsCsv(outputPath, "\n", " ");// User-defined functionspublic static final class JoinVertexWithEdgesMatch implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>, Tuple2<Long, Double>> { @Override public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj, Collector<Tuple2<Long, Double>> out) { Long[] neighbors = adj.f1; double rank = page.f1; double rankToDistribute = rank / ((double) neigbors.length); for (int i = 0; i < neighbors.length; i++) { out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute)); } }}public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> { private final double dampening, randomJump; public Dampener(double dampening, double numVertices) { this.dampening = dampening; this.randomJump = (1 - dampening) / numVertices; } @Override public Tuple2<Long, Double> map(Tuple2<Long, Double> value) { value.f1 = (value.f1 * dampening) + randomJump; return value; }}public static final class EpsilonFilter implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> { @Override public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) { return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON; }}
// User-defined typescase class Link(sourceId: Long, targetId: Long)case class Page(pageId: Long, rank: Double)case class AdjacencyList(sourceId: Long, targetIds: Array[Long])// set up execution environmentval env = ExecutionEnvironment.getExecutionEnvironment// read the pages and initial ranks by parsing a CSV fileval pages = env.readCsvFile[Page](pagesInputPath)// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))val links = env.readCsvFile[Link](linksInputPath)// assign initial ranks to pagesval pagesWithRanks = => Page(p, 1.0 / numPages))// build adjacency list from link inputval adjacencyLists = links // initialize lists .map(e => AdjacencyList(e.sourceId, Array(e.targetId))) // concatenate lists .groupBy("sourceId").reduce { (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds) }// start iterationval finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) { currentRanks => val newRanks = currentRanks // distribute ranks to target pages .join(adjacencyLists).where("pageId").equalTo("sourceId") { (page, adjacent, out: Collector[Page]) => for (targetId <- adjacent.targetIds) { out.collect(Page(targetId, page.rank / adjacent.targetIds.length)) } } // collect ranks and sum them up .groupBy("pageId").aggregate(SUM, "rank") // apply dampening factor .map { p => Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages)) } // terminate if no rank update was significant val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") { (current, next, out: Collector[Int]) => // check for significant update if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1) } (newRanks, termination)}val result = finalRanks// emit resultresult.writeAsCsv(outputPath, "\n", " ")
--pages <path> --links <path> --output <path> --numPages <n> --iterations <n>
- pages 使用一个long型的id进行表示,通过换行符进行分割。
- 例如: "1\n2\n12\n42\n63\n" 表示给了5个pages 1, 2, 12, 42, 和63.
- links 使用一对page id进行表示,这一对page id通过空格进行分割。Links通过换行符进行分割
- 例如:"1 2\n2 12\n1 12\n42 63\n" 表示给了4对 links (1)->(2), (2)->(12), (1)->(12), 和(42)->(63)
Connected Componexts 连通分支
// read vertex and edge dataDataSet<Long> vertices = getVertexDataSet(env);DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());// assign the initial component IDs (equal to the vertex ID)DataSet<Tuple2<Long, Long>> verticesWithInitialId = DuplicateValue<Long>());// open a delta iterationDeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);// apply the step logic:DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset() // join with the edges .join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin()) // select the minimum neighbor component ID .groupBy(0).aggregate(Aggregations.MIN, 1) // update if the component ID of the candidate is smaller .join(iteration.getSolutionSet()).where(0).equalTo(0) .flatMap(new ComponentIdFilter());// close the delta iteration (delta and new workset are identical)DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);// emit resultresult.writeAsCsv(outputPath, "\n", " ");// User-defined functionspublic static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> { @Override public Tuple2<T, T> map(T vertex) { return new Tuple2<T, T>(vertex, vertex); }}public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>(); @Override public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) { invertedEdge.f0 = edge.f1; invertedEdge.f1 = edge.f0; out.collect(edge); out.collect(invertedEdge); }}public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { @Override public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) { return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1); }}public static final class ComponentIdFilter implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> { @Override public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) { if (value.f0.f1 < value.f1.f1) { out.collect(value.f0); } }}
// set up execution environmentval env = ExecutionEnvironment.getExecutionEnvironment// read vertex and edge data// assign the initial components (equal to the vertex id)val vertices = getVerticesDataSet(env).map { id => (id, id) }// undirected edges by emitting for each input edge the input edges itself and an inverted// versionval edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }// open a delta iterationval verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) { (s, ws) => // apply the step logic: join with the edges val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) => (edge._2, vertex._2) } // select the minimum neighbor val minNeighbors = allNeighbors.groupBy(0).min(1) // update if the component of the candidate is smaller val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) { (newVertex, oldVertex, out: Collector[(Long, Long)]) => if (newVertex._2 < oldVertex._2) out.collect(newVertex) } // delta and new workset are identical (updatedComponents, updatedComponents)}verticesWithComponents.writeAsCsv(outputPath, "\n", " ")ConnectedComponents程序实现了上面的例子。它需要以下参数运行:
--vertices <path> --edges <path> --output <path> --iterations <n>
- Vertices(顶点)使用id表示,使用换行符隔开
- 例如 "1\n2\n12\n42\n63\n" 表示指定5个顶点 (1), (2), (12), (42), and (63).
- 顶点的边缘表示为两个id,使用空格隔开,边缘由换行符隔开
- 例如: "1 2\n2 12\n1 12\n42 63\n" 表示4个无向链接 (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
Relational Query 关系查询
关系查询的例子假定两个表,一个订单和指定的其他与lineitem tpc - h基准决策支持。tpc - h是一个标准的数据库行业的基准。见下文说明如何生成输入数据
SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue FROM orders, lineitemWHERE l_orderkey = o_orderkey AND o_orderstatus = "F" AND YEAR(o_orderdate) > 1993 AND o_orderpriority LIKE "5%"GROUP BY l_orderkey, o_shippriority;
// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);// get lineitem data set: (orderkey, extendedprice)DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);// orders filtered by year: (orderkey, custkey)DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear = // filter orders orders.filter( new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() { @Override public boolean filter(Tuple5<Integer, String, String, String, Integer> t) { // status filter if(!t.f1.equals(STATUS_FILTER)) { return false; // year filter } else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) { return false; // order priority filter } else if(!t.f3.startsWith(OPRIO_FILTER)) { return false; } return true; } }) // project fields out that are no longer required .project(0,4).types(Integer.class, Integer.class);// join orders with lineitems: (orderkey, shippriority, extendedprice)DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders = ordersFilteredByYear.joinWithHuge(lineitems) .where(0).equalTo(0) .projectFirst(0,1).projectSecond(1) .types(Integer.class, Integer.class, Double.class);// extendedprice sums: (orderkey, shippriority, sum(extendedprice))DataSet<Tuple3<Integer, Integer, Double>> priceSums = // group by order and sum extendedprice lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);// emit resultpriceSums.writeAsCsv(outputPath);
这个Relational Query实现了上面的查询。它需要以下参数运行:
--orders <path> --lineitem <path> --output <path>
可以生成订单和lineitem文件使用tpc - h基准测试套件的数据生成器工具(DBGEN)。采取以下步骤生成任意大Flink提供程序的输入文件
- 下载并解压DBGEN
- 复制makefile.suite 为 makefile,并且执行以下修改
- 使用make命令构架DBGEN
- 使用dbgen生成lineitem和命令关系,比例因子(s)在生成的数据集1的结果大约1 GB大小。
./dbgen -T o -s 1