flink入门

时间:2024-01-10 14:01:20

wordCount

POM文件需要导入的依赖:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.7.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.7.1</version>
</dependency>

  

离线代码:

java版本:

package flink;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2; public class WordExample {
public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //创建构建字符串的数据集
DataSet<String> text = env.fromElements(
"flink test","" +
"I think I hear them. Stand, ho! Who's there?"); //分割字符串,按照key进行分组,统计相同的key个数
DataSet<Tuple2<String, Integer>> wordCount = text.flatMap(new LineSplitter())
.groupBy(0).sum(1); wordCount.print();
}
}
package flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector; public class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
@Override
public void flatMap(String o, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : o.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word,1));
}
}
}

scala版本:

package flink

import org.apache.flink.api.scala._

object WordCountExample {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment val text = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?") val counts = text.flatMap(_.toLowerCase().split("\\W+")filter(_.nonEmpty))
.map((_,1)).groupBy(0).sum(1) counts.print()
}
}

流式:

 java版本:

package flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector; public class WordCount {
public static void main(String[] args) throws Exception {
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.out.println("No port specified.Please run 'SocketWindowWordCount--port <port>'");
return;
}
//get the execution enviroment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, '\n');
//parse the data,group it.window it,and aggregeate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String s, Collector<WordWithCount> collector) {
for (String word : s.split("\\s")) {
collector.collect(new WordWithCount(word, 1L));
}
}
}).keyBy("word").timeWindow(Time.seconds(10), Time.seconds(5))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount wordWithCount, WordWithCount t1) throws Exception {
return new WordWithCount(wordWithCount.word, wordWithCount.count + t1.count);
}
}); //print the result with a single thread,rather than in parallel
windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount");
}
}
package flink;

public class WordWithCount {
public String word;
public long count; public WordWithCount() {
} public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
} @Override
public String toString() {
return word + ":" + count;
}
}

  scala版本

package flink

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time object SokcetWindowWordCount { case class WordWithCount(word: String, count: Long) def main(args: Array[String]): Unit = {
//the port to connect to
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
System.err.println("No port specified.Please run 'SocketWindowWordCount --port<port>'")
return
}
}
//get the execution enviroment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //parse input data by connecting to the socket
val text = env.socketTextStream("localhost", port, '\n') //parse the data.group it.window it.and aggregate the counts val windowCount = text
.flatMap{w => w.split("\\s")}
.map{w => WordWithCount(w, 1)}
.keyBy("word")
.timeWindow(Time.seconds(10), Time.seconds(5)) .sum("count") //print the results with a single thread ,rather than in parallel
windowCount.print().setParallelism(1) env.execute("Socket Window WordCount")
}
}

  运行,传参:

终端使用nc命令进行模拟发送数据到9999端口

flink入门

flink入门

  运行结果:

flink入门

  注意事项:

    千万不要把包导错了,java就导java,scala就导scala,如果导错,程序跑不起来