flink--DateSet开发--简单入门

时间:2023-03-08 21:33:23
flink--DateSet开发--简单入门

开发流程

1.    获得一个execution environment,
2. 加载/创建初始数据,
3. 指定这些数据的转换,
4. 指定将计算结果放在哪里,
5. 触发程序执行

例子:

object DataSet_WordCount {
def main(args: Array[String]) {
//TODO 初始化环境
val env = ExecutionEnvironment.getExecutionEnvironment
//TODO 加载/创建初始数据
val text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?")
//TODO 指定这些数据的转换
val split_words = text.flatMap(line => line.toLowerCase().split("\\W+"))
val filter_words = split_words.filter(x=> x.nonEmpty)
val map_words = filter_words.map(x=> (x,1))
val groupBy_words = map_words.groupBy(0)
val sum_words = groupBy_words.sum(1)
//todo 指定将计算结果放在哪里
// sum_words.setParallelism(1)//汇总结果
sum_words.writeAsText(args(0))//"/Users/niutao/Desktop/flink.txt"
//TODO 触发程序执行
env.execute("DataSet wordCount")
}
}

将程序打包,提交到yarn

添加maven打包插件:

<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins> <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin> <plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!--<arg>-make:transitive</arg>-->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args> </configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF
-->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.nt.DEMO.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution> </executions>
</plugin>
</plugins>
</build>

使用rz命令上传jar包,然后执行程序:

bin/flink run -m yarn-cluster -yn 2 /home/elasticsearch/flinkjar/itcast_learn_flink-1.0-SNAPSHOT.jar com.nt.DEMO.WordCount

在yarn的8088页面可以观察到提交的程序

去/opt/cdh/flink-1.3.2/flinkJAR文件夹下可以找到输出的运行结果