Flink 的 Batch 批处理例子 Word count 入门

时间:2025-04-13 07:25:43

Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。这篇文章先介绍 Batch例子

 

Word Count

 

WordCounts 是大数据系统处理中的入门案例。它从一个文本集合中计算单词出现的频率。这个算法分成两个步骤:

第一步:文本 split 切分成独立的单词

第二部:分组和统计

 

  • Scala
package 

import 
import ._
import 

/**
 * Implements the "WordCount" program that computes a simple word occurrence histogram
 * over text files. 
 *
 * The input is a plain text file with lines separated by newline characters.
 *
 * Usage:
 * {{{
 *   WordCount --input <path> --output <path>
 * }}}
 *
 * If no parameters are provided, the program is run with default data from
 * [[]]
 *
 * This example shows how to:
 *
 *   - write a simple Flink program.
 *   - use Tuple data types.
 *   - write and use user-defined functions.
 *
 */
object WordCount {

  def main(args: Array[String]) {

    val params: ParameterTool = (args)

    // set up execution environment
    val env = 

    // make parameters available in the web interface
    (params)

    // scala 的 val 赋值语句如此灵活!
    val text =
      if (("input")) {
        (("input"))
      } else {
        println("Executing WordCount example with default input data set.")
        println("Use --input to specify file input.")
        ()
      }

    
    val counts =  { _.("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)
    // 可以再加一个 sort

    if (("output")) {
      (("output"), "\n", " ")
      ("Scala WordCount Example")
    } else {
      println("Printing result to stdout. Use --output to specify output path.")
      ()
    }

  }
}

作为测试数据,任意的文本文件都可以使用

 

Running an example

In order to run a Flink example, we assume you have a running Flink instance available. The “Quickstart” and “Setup” tabs in the navigation describe various ways of starting Flink.

The easiest way is running the ./bin/, which by default starts a local cluster with one JobManager and one TaskManager.

Each binary release of Flink contains an examples directory with jar files for each of the examples on this page.

To run the WordCount example, issue the following command:

./bin/flink run ./examples/batch/

The other examples can be started in a similar way.

Note that many examples run without passing any arguments for them, by using build-in data. To run WordCount with real data, you have to pass the path to the data:

./bin/flink run ./examples/batch/ --input /path/to/some/text/data --output /path/to/result

Note that non-local file systems require a schema prefix, such as hdfs://.

 

比如可以统计 Flink 目录下 LICENSE 文件中的单词计数,如果不指定 output, 计算结果就直接输出到标准输出:

./bin/flink run ./examples/batch/ --input ./LICENSE
输出:
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
Starting execution of program
Printing result to stdout. Use --output to specify output path.
(0,3)
(1,2)
(2,4)
(2004,1)
(3,1)
(4,1)
(5,1)
(50,1)
(6,1)
(7,1)
(8,1)
(9,2)
(a,22)
(above,1)
(acceptance,1)
(accepting,3)
(act,1)
(acting,1)
(acts,1)
(add,2)
(addendum,1)
(additional,5)
(additions,1)
(advised,1)
。。。。