1、准备
准备集群
- Zookeeper集群
- Hadoop集群
准备flink jar包
官网地址:https://flink.apache.org/downloads.html
flink-1.8之后没有集成hadoop,需要下载对应的hadoop jar包
1.8之前:
1.8之后:
需要下载对应hadoop的组件(然后放入flink的lib目录下)
配置Hadoop的环境变量
配置flink配置文件
配置jobmanager地址:
jobmanager.rpc.address: bigdata-03
其他自己看情况配置
配置master和slaves
2、Flink yarn
Session Cluster模式
(1) 启动hadoop集群(略)
(2)需要自己自定义配置的话,可以使用来查看参数:
bin/yarn-session.sh –help
Usage: Required -n,--container <arg> Number of YARN container to allocate (=Number of Task Managers) Optional -D <property=value> use value for given property -d,--detached If present, runs the job in detached mode -h,--help Help for the Yarn session CLI. -id,--applicationId <arg> Attach to running YARN session -j,--jar <arg> Path to Flink jar file -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -n,--container <arg> Number of YARN container to allocate (=Number of Task Managers) -nl,--nodeLabel <arg> Specify YARN node label for the YARN application -nm,--name <arg> Set a custom name for the application on YARN -q,--query Display available YARN resources (memory, cores) -qu,--queue <arg> Specify YARN queue. -s,--slots <arg> Number of slots per TaskManager -sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C. -st,--streaming Start Flink in streaming mode -t,--ship <arg> Ship files in the specified directory (t for transfer) -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
yarn-session的参数介绍 -n : 指定TaskManager的数量; -d: 以分离模式运行; -id:指定yarn的任务ID; -j:Flink jar文件的路径; -jm:JobManager容器的内存(默认值:MB); -nl:为YARN应用程序指定YARN节点标签; -nm:在YARN上为应用程序设置自定义名称; -q:显示可用的YARN资源(内存,内核); -qu:指定YARN队列; -s:指定TaskManager中slot的数量; -st:以流模式启动Flink; -tm:每个TaskManager容器的内存(默认值:MB); -z:命名空间,用于为高可用性模式创建Zookeeper子路径;
(3)启动yarn-session
./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
其中: -n(--container):TaskManager的数量。 -s(--slots): 每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。 -jm:JobManager的内存(单位MB)。 -tm:每个taskmanager的内存(单位MB)。 -nm:yarn 的appName(现在yarn的ui上的名字)。 -d:后台执行。
(4)执行程序:
./flink run -c com.duoduo.WordCount wordcount-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777
(5)yarn上查看
(6)取消yarn-session
yarn application --kill application_1577588252906_0001
Per Job Cluster模式
(1)启动hadoop集群(略)
(2)不启动yarn-session,直接执行job
./flink run –m yarn-cluster -c com.atguigu.WordCount WordCount-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777
3、遇到的问题
3.1 java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties
2020-05-21 16:09:11,255 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli - Error while running the Flink session. java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:55) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createTimelineClient(YarnClientImpl.java:181) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:168) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163) at org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:71) at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:56) at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:42) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:529) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785) Caused by: java.lang.ClassNotFoundException: com.sun.jersey.core.util.FeaturesAndProperties at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 26 more ------------------------------------------------------------ The program finished with the following exception: java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:55) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createTimelineClient(YarnClientImpl.java:181) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:168) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163) at org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:71) at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:56) at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:42) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:529) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785) Caused by: java.lang.ClassNotFoundException: com.sun.jersey.core.util.FeaturesAndProperties at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 26 more
解决:
我的hadoop是2.73,放入flink-shaded-hadoop-2-uber-2.7.7-10.0.jar
依旧报上面错误,换成高一个版本就好了flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
3.2 在启动日志中发现如下错误
org.apache.flink.yarn.cli.FlinkYarnSessionCli. java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1076) at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1030) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:957) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 5 more
可以参考以下Flink官网的提示https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html
解决办法就是在环境变量中增加
export HADOOP_CLASSPATH=`hadoop classpath`
3.3 Couldn't deploy Yarn session cluster
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:380) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:548) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785) Caused by: org.apache.flink.configuration.IllegalConfigurationException: The number of requested virtual cores per node 10 exceeds the maximum number of virtual cores 7 available in the YarnCluster. Please note that the number of virtual cores is set to the number of task slots by default unless configured in the Flink config with 'yarn.containers.vcores.' at org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:292) at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444) at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:373) ... 7 more
解决:
yarn.containers.vcores设置了虚拟的cores=7,taskManager的slot我设置了10个,资源不够
方法一:要么调大vcores参数
方法二:减少slot个数taskmanager.numberOfTaskSlots: 5
参考地址:https://www.jianshu.com/p/1b05202c4fb6
参考地址:https://blog.csdn.net/joseph25/article/details/88878350