###########################################实验步骤######################################
①启动集群
$FLINK_HOME/bin/
②开启一个socket
nc -lk 9999
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 muamua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
③编译命令:
mvn scala:compile package
④提交flink任务,产生另外一个socket
flink run -c WordCount target/scala-module-dependency-sample-1. --port 9999
然后上述两个socket就会通过tcp进行通信
###########################实验结果###################################################
输出结果在:
$FLINK_HOME/log/
3> WordWithCount(mua3,2)
4> WordWithCount(mua2,4)
4> WordWithCount(mua2,14)
3> WordWithCount(mua3,7)
3> WordWithCount(mua3,12)
4> WordWithCount(mua2,24)
3> WordWithCount(muamua,1)
3> WordWithCount(muamua,1)
4> WordWithCount(mua2,24)
3> WordWithCount(mua3,12)
3> WordWithCount(muamua,1)
4> WordWithCount(mua2,24)
3> WordWithCount(mua3,12)
3> WordWithCount(muamua,1)
4> WordWithCount(mua2,20)
3> WordWithCount(mua3,10)
3> WordWithCount(muamua,1)
4> WordWithCount(mua2,10)
3> WordWithCount(mua3,5)
#####################################附录-项目结构##################################
├──
├── src
│ └── main
│ └── scala
│ └──
└── 运行方法.txt
#########################附录-代码#######################################
完整的:
<project xmlns="/POM/4.0.0" xmlns:xsi="http:///2001/XMLSchema-instance"
xsi:schemaLocation="/POM/4.0.0 /maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>sample</groupId>
<artifactId>scala-module-dependency-sample</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<!-- <dependency>
<groupId></groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.2</version>
</dependency>-->
<dependency>
<groupId></groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<includes>
<include>**/*.scala</include>
</includes>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId></groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
import ._
import
object WordCount {
// Data type for words with count
case class WordWithCount(word: String, count: Long)
def main(args: Array[String]): Unit = {
// 获取执行器的环境
val env: StreamExecutionEnvironment =
//获取数据: 从socket中获取
val textDataStream = ("Desktop", 9999, '\n')
val tupDataStream = (_.split(" ")).map(WordWithCount(_,1))
//groupby: 按照指定的字段聚合
val windowDstram = ("word").timeWindow((5), (1))//窗口bsize=5秒, slid=1s
("count").print()
//启动执行器,执行程序
("Socket Window WordCount")
}
}
####################################附录-排查######################################################
Flink运行实验后找不到输出结果的原因有如下几种:
①前面的job没有cancel影响下一个,可以输入flink list后用flink cancel删除。
②yarn的queue资源被占满了。(flink on yarn模式)
③ 输出结果在yarn的界面中的某个节点或者某个节点的#$FLINK_HOME/log下面
④不要删除$FLINK/log下面的flink-用户名-,这个不会在新job时自动生成,只会在启动时生成,如果删除了,除非重启flink集群,否则是看不到实验结果了。
⑤启动集群前6123端口没有关闭,导致重启后,任务无法提交
Caused by: : Could not start actor system on any port in port range 6123
注意提交任务前,首先要确保$FLINK_HOME/log下面输入grep -ri error没有error