1,我们先看官网,一起从官网看起
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/yarn_setup.html
2,看到上图,我就忽略第一个模式了,在正式生产环境我们一般推崇第二种模式,或者第三种模式
3,查看执行参数命令
./bin/flink run --help
Action "run" compiles and runs a program.
Syntax: run [OPTIONS] <jar-file> <arguments>
"run" action options:
-c,--class <classname> Class with the program entry point
("main()" method). Only needed if the
JAR file does not specify the class in
its manifest.
-C,--classpath <url> Adds a URL to each user code
classloader on all nodes in the
cluster. The paths must specify a
protocol (e.g. file://) and be
accessible on all nodes (e.g. by means
of a NFS share). You can use this
option multiple times for specifying
more than one URL. The protocol must
be supported by the {@link
java.net.URLClassLoader}.
-d,--detached If present, runs the job in detached
mode
-n,--allowNonRestoredState Allow to skip savepoint state that
cannot be restored. You need to allow
this if you removed an operator from
your program that was part of the
program when the savepoint was
triggered.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration.
-py,--python <pythonFile> Python script with the program entry
point. The dependent resources can be
configured with the `--pyFiles`
option.
-pyarch,--pyArchives <arg> Add python archive files for job. The
archive files will be extracted to the
working directory of python UDF
worker. Currently only zip-format is
supported. For each archive file, a
target directory be specified. If the
target directory name is specified,
the archive file will be extracted to
a name can directory with the
specified name. Otherwise, the archive
file will be extracted to a directory
with the same name of the archive
file. The files uploaded via this
option are accessible via relative
path. '#' could be used as the
separator of the archive file path and
the target directory name. Comma (',')
could be used as the separator to
specify multiple archive files. This
option can be used to upload the
virtual environment, the data files
used in Python UDF (e.g.: --pyArchives
file:///tmp/py37.zip,file:///tmp/data.
zip#data --pyExecutable
py37.zip/py37/bin/python). The data
files could be accessed in Python UDF,
e.g.: f = open('data/data.txt', 'r').
-pyexec,--pyExecutable <arg> Specify the path of the python
interpreter used to execute the python
UDF worker (e.g.: --pyExecutable
/usr/local/bin/python3). The python
UDF worker depends on Python 3.5+,
Apache Beam (version == 2.19.0), Pip
(version >= 7.1.0) and SetupTools
(version >= 37.0.0). Please ensure
that the specified environment meets
the above requirements.
-pyfs,--pyFiles <pythonFiles> Attach custom python files for job.
These files will be added to the
PYTHONPATH of both the local client
and the remote python UDF worker. The
standard python resource file suffixes
such as .py/.egg/.zip or directory are
all supported. Comma (',') could be
used as the separator to specify
multiple files (e.g.: --pyFiles
file:///tmp/myresource.zip,hdfs:///$na
menode_address/myresource2.zip).
-pym,--pyModule <pythonModule> Python module with the program entry
point. This option must be used in
conjunction with `--pyFiles`.
-pyreq,--pyRequirements <arg> Specify a requirements.txt file which
defines the third-party dependencies.
These dependencies will be installed
and added to the PYTHONPATH of the
python UDF worker. A directory which
contains the installation packages of
these dependencies could be specified
optionally. Use '#' as the separator
if the optional parameter exists
(e.g.: --pyRequirements
file:///tmp/requirements.txt#file:///t
mp/cached_dir).
-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
from (for example
hdfs:///flink/savepoint-1537).
-sae,--shutdownOnAttachedExit If the job is submitted in attached
mode, perform a best-effort cluster
shutdown when the CLI is terminated
abruptly, e.g., in response to a user
interrupt, such as typing Ctrl + C.
Options for Generic CLI mode:
-D <property=value> Generic configuration options for
execution/deployment and for the configured executor.
The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
e/ops/config.html
-e,--executor <arg> DEPRECATED: Please use the -t option instead which is
also available with the "Application Mode".
The name of the executor to be used for executing the
given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "collection", "remote",
"local", "kubernetes-session", "yarn-per-job",
"yarn-session".
-t,--target <arg> The deployment target for the given application,
which is equivalent to the "execution.target" config
option. The currently available targets are:
"collection", "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session",
"yarn-application" and "kubernetes-application".Options for yarn-cluster mode:
-d,--detached If present, runs the job in detached
mode
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one
specified in the configuration.
-yat,--yarnapplicationType <arg> Set a custom application type for the
application on YARN
-yD <property=value> use value for given property
-yd,--yarndetached If present, runs the job in detached
mode (deprecated; use non-YARN
specific option instead)
-yh,--yarnhelp Help for the Yarn session CLI.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with
optional unit (default: MB)
-ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN
application
-ynm,--yarnname <arg> Set a custom name for the application
on YARN
-yq,--yarnquery Display available YARN resources
(memory, cores)
-yqu,--yarnqueue <arg> Specify YARN queue.
-ys,--yarnslots <arg> Number of slots per TaskManager
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with
optional unit (default: MB)
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability modeOptions for default mode:
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode
以及Flink脚本的配置参数路径:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/config.html#yarn
4,我这里要了解的是 Application模式,需要解释的就是Application模式与前面的cluster模式的区别就是,cluster模式我们提交脚本之后,yarn会自动去拉取Flink lib下的jar包,而Application模式是我们将依赖包手动放入hdfs,要运行的jar也手动放入到hdfs指定的路径。
首先演示的是:
1)脚本提交命令
先上代码:
export HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/jars/*;./bin/flink run-application -t yarn-application -p 3 \
-Dparallelism.default=3 \
-Djobmanager.memory.process.size=1048m \
-Dtaskmanager.memory.process.size=1096m \
-Dyarn.application.name="MyFlinkWordCount2" \
-Dtaskmanager.numberOfTaskSlots=3 \
-Dyarn.containers.vcores=1 \
-Dyarn.provided.lib.dirs="hdfs://dev-ct6-dc-master01:8020/flink/libs" \
hdfs://dev-ct6-dc-master01:8020/flink/SocketWindowWordCount.jar --hostname 192.168.6.31 --port 12345
代码解释:关于参数的解释,第一次看肯定有点懵逼,我在官网怎么没看到这些东西,不要慌,我们看官网写了一句话
说白了,就是在cluser命令脚本指定参数的参数名称前面加了一个大写的D,ok,你对cluser模式不熟悉的话请跟着我的步伐。
我们打开官网页面:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/config.html#yarn
这里就是yarn的配置参数页面。
我们 ctrl+f 搜索关键字'jobmanager.memory.process.size' ,会出现详细的解释
再搜索一个 :
所以我们只要会提交cluser模式的脚本,就会提交application 模式的脚本命令了,无非是在属性前面 + "D" 而已
2)java代码调用脚本执行
这里无非是我们java代码通过ssh2 调用linux的脚本,我就直接上代码了吧,也没啥好说的。
要引入jar包:
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import com.jcraft.jsch.ChannelExec; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import org.apache.log4j.Logger; public class ConnectionSSH { private static final Logger logger = Logger.getLogger(ConnectionSSH.class); public static void main(String[] args) throws JSchException, IOException { JSch jsch = new JSch(); String pubKeyPath = "C:\\Users\\Administrator\\Desktop\\id_rsa"; jsch.addIdentity(pubKeyPath); String username = "OpsUser"; String host = "192.168.6.31"; Session session =jsch.getSession(username, host, 50022);//为了连接做准备 session.setConfig("StrictHostKeyChecking", "no"); session.connect(); // String command = "cd /wyyt/software/flink;export HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/jars/*;./bin/flink run -m yarn-cluster -yD yarn.containers.vcores=2 ./examples/batch/WordCount.jar"; String command = "export HADOOP_CONF_DIR=/etc/hadoop/conf;cd /wyyt/software/flink;sh /wyyt/software/flink/test2.sh 22223545"; // Channel channel=session.openChannel("shell"); ChannelExec channel=(ChannelExec)session.openChannel("exec"); channel.setCommand(command); // channel.setInputStream(System.in); // channel.setOutputStream(System.out); // InputStream in=channel.getInputStream(); BufferedReader in = new BufferedReader(new InputStreamReader(channel.getInputStream())); channel.connect(); String msg; while((msg = in.readLine()) != null){ System.out.println(msg); } channel.disconnect(); session.disconnect(); } }
或者:
package shell; import java.io.IOException; /** * @program: flink-zeppelin * @description: * @author: Mr.Wang * @create: 2020-09-09 11:41 **/ public class CcxTest { public static void main(String[] args) { try { testDemo1(); } catch (IOException e) { e.printStackTrace(); } } public static void testDemo1() throws IOException { System.out.println("--------------------------------------"); //todo 正常需要ip地址,用户和密码 // String commandStr="cd /root;cat aa.txt;rm -rf aa.txt"; // String commandStr="cd /root;rm -rf aa.txt"; String commandStr="cd /wyyt/software/flink;./bin/flink run examples/streaming/WordCount.jar;"; Boolean result=ConnectLinuxCommand.connectLinux("192.168.12.188","root","root",commandStr); System.out.println("结果:"+result); System.out.println("--------------------------------------"); //todo 连接内网,不需要密码 不成功,失败,换个类 // String commandStr="cd /wyyt/software/flink;cat aa.txt;"; // Boolean result=ConnectLinuxCommand.connectLinuxWithoutPwd("192.168.6.31","OpsUser",commandStr); // System.out.println("结果:"+result); // System.out.println("--------------------------------------"); } public static void testDemo2() throws IOException { try { ConnectLinuxCommand.scpGet("192.168.42.201","root","123456", "/home/aa.txt", "d:/aa"); } catch (Exception e) { e.printStackTrace(); } } public static void testDemo3() { try { ConnectLinuxCommand.scpPut("192.168.42.201","root","123456", "d:/aa/aa.txt", "/home/bb"); } catch (Exception e) { e.printStackTrace(); } } }
package shell; /** * @program: flink-zeppelin * @description: * @author: Mr.Wang * @create: 2020-09-09 11:40 **/ import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import ch.ethz.ssh2.Connection; import ch.ethz.ssh2.SCPClient; import ch.ethz.ssh2.Session; import ch.ethz.ssh2.StreamGobbler; /** * * 描述:连接linux服务器并执行相关的shell命令 * 作者:cuicx * 版本:V1.0 * 创建日期:2018年7月4日 * 修改日期: 2018年7月4日 */ public class ConnectLinuxCommand { private static final Logger logger = Logger.getLogger(ConnectLinuxCommand.class); private static String DEFAULTCHARTSET = "UTF-8"; private static Connection conn; /** * * @Title: login * @Description: 用户名密码方式 远程登录linux服务器 * @return: Boolean * @throws */ public static Boolean login(RemoteConnect remoteConnect) { boolean flag = false; try { conn = new Connection(remoteConnect.getIp()); conn.connect();// 连接 flag = conn.authenticateWithPassword(remoteConnect.getUserName(), remoteConnect.getPassword());// 认证 if (flag) { logger.error("认证成功!"); } else { logger.error("认证失败!"); conn.close(); } } catch (IOException e) { e.printStackTrace(); } return flag; } public static Boolean loginWithoutPwd(RemoteConnect remoteConnect) { boolean flag = true; try { conn = new Connection(remoteConnect.getIp()); conn.connect();// 连接 boolean authenticationPartialSuccess = conn.isAuthenticationPartialSuccess(); System.out.println("authenticationPartialSuccess = " + authenticationPartialSuccess); // flag = conn.authenticateWithPassword(remoteConnect.getUserName(), remoteConnect.getPassword());// 认证 if (flag) { logger.error("认证成功!"); } else { logger.error("认证失败!"); conn.close(); } } catch (IOException e) { e.printStackTrace(); } return flag; } /** * * @Title: loginByKey * @Description: 秘钥方式 远程登录linux服务器 * @param remoteConnect * @param keyFile 一个文件对象指向一个文件,该文件包含OpenSSH**格式的用户的DSA或RSA私钥(PEM,不能丢失"-----BEGIN DSA PRIVATE KEY-----" or "-----BEGIN RSA PRIVATE KEY-----"标签 * @param keyfilePass 如果秘钥文件加密 需要用该参数解密,如果没有加密可以为null * @return Boolean * @throws */ public static Boolean loginByFileKey(RemoteConnect remoteConnect, File keyFile, String keyfilePass) { boolean flag = false; // 输入**所在路径 // File keyfile = new File("C:\\temp\\private"); try { conn = new Connection(remoteConnect.getIp()); conn.connect(); // 登录认证 flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keyFile, keyfilePass); if (flag) { logger.error("认证成功!"); } else { logger.error("认证失败!"); conn.close(); } } catch (Exception e) { e.printStackTrace(); } return flag; } /** * * @Title: loginByCharsKey * @Description: 秘钥方式 远程登录linux服务器 * @param remoteConnect * @param keys 一个字符[],其中包含用户的DSA或RSA私钥(OpenSSH密匙格式,您不能丢失“----- begin DSA私钥-----”或“-----BEGIN RSA PRIVATE KEY-----“标签。char数组可以包含换行符/换行符。 * @param keyPass 如果秘钥字符数组加密 需要用该字段解密 否则不需要可以为null * @return Boolean * @throws */ public static Boolean loginByCharsKey(RemoteConnect remoteConnect, char[] keys, String keyPass) { boolean flag = false; // 输入**所在路径 // File keyfile = new File("C:\\temp\\private"); try { conn = new Connection(remoteConnect.getIp()); conn.connect(); // 登录认证 flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keys, keyPass); if (flag) { logger.error("认证成功!"); } else { logger.error("认证失败!"); conn.close(); } } catch (Exception e) { e.printStackTrace(); } return flag; } /** * * @Title: execute * @Description: 远程执行shll脚本或者命令 * @param cmd 脚本命令 * @return: result 命令执行完毕返回结果 * @throws */ public static String execute(String cmd){ String result = ""; try { Session session = conn.openSession();// 打开一个会话 session.execCommand(cmd);// 执行命令 result = processStdout(session.getStdout(), DEFAULTCHARTSET); // 如果为得到标准输出为空,说明脚本执行出错了 if (StringUtils.isBlank(result)) { result = processStdout(session.getStderr(), DEFAULTCHARTSET); } conn.close(); session.close(); } catch (IOException e) { e.printStackTrace(); } return result; } /** * @Title: executeSuccess * @Description: 远程执行shell脚本或者命令 * @return String 命令执行成功后返回的结果值,如果命令执行失败,返回空字符串,不是null * @throws */ public static String executeSuccess(String cmd){ String result = ""; try { Session session = conn.openSession();// 打开一个会话 session.execCommand(cmd);// 执行命令 result = processStdout(session.getStdout(), DEFAULTCHARTSET); conn.close(); session.close(); } catch (IOException e) { e.printStackTrace(); } return result; } /** * * @Title: processStdout * @Description: 解析脚本执行的返回结果 * @param in 输入流对象 * @param charset 编码 * @return String 以纯文本的格式返回 * @throws */ public static String processStdout(InputStream in, String charset){ InputStream stdout = new StreamGobbler(in); StringBuffer buffer = new StringBuffer(); try { BufferedReader br = new BufferedReader(new InputStreamReader(stdout, charset)); String line = null; while ((line = br.readLine()) != null) { buffer.append(line + "\n"); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return buffer.toString(); } /** * * @Title: ConnectLinux * @Description: 通过用户名和密码关联linux服务器 * @return * @return String * @throws */ public static boolean connectLinux(String ip,String userName,String password,String commandStr) { logger.error("ConnectLinuxCommand scpGet===" + "ip:" + ip + " userName:" + userName + " commandStr:" + commandStr); String returnStr = ""; boolean result = true; RemoteConnect remoteConnect = new RemoteConnect(); remoteConnect.setIp(ip); remoteConnect.setUserName(userName); remoteConnect.setPassword(password); try { if (login(remoteConnect)) { returnStr = execute(commandStr); System.out.println(result); } } catch (Exception e) { e.printStackTrace(); } if (StringUtils.isBlank(returnStr)) { result = false; } return result; } public static boolean connectLinuxWithoutPwd(String ip,String userName,String commandStr) { logger.error("ConnectLinuxCommand scpGet===" + "ip:" + ip + " userName:" + userName + " commandStr:" + commandStr); String returnStr = ""; boolean result = true; RemoteConnect remoteConnect = new RemoteConnect(); remoteConnect.setIp(ip); remoteConnect.setUserName(userName); try { if (loginWithoutPwd(remoteConnect)) { returnStr = execute(commandStr); System.out.println(result); } } catch (Exception e) { e.printStackTrace(); } if (StringUtils.isBlank(returnStr)) { result = false; } return result; } /** * * @Title: scpGet * @Description: 从其他服务器获取文件到本服务器指定目录 * @param password 密码(其他服务器) * @param remoteFile 文件位置(其他服务器) * @param localDir 本服务器目录 * @throws IOException * @return void * @throws */ public static void scpGet(String ip, String userName, String password, String remoteFile, String localDir) throws IOException { logger.error("ConnectLinuxCommand scpGet===" + "ip:" + ip + " userName:" + userName + " remoteFile:" + remoteFile + " localDir:" + localDir); RemoteConnect remoteConnect = new RemoteConnect(); remoteConnect.setIp(ip); remoteConnect.setUserName(userName); remoteConnect.setPassword(password); if (login(remoteConnect)) { SCPClient client = new SCPClient(conn); client.get(remoteFile, localDir); conn.close(); } } /** * * @Title: scpPut * @Description: 将文件复制到其他计算机中 * @param password * @param localFile * @param remoteDir * @throws IOException * @return void * @throws */ public static void scpPut(String ip, String userName, String password, String localFile, String remoteDir) throws IOException { logger.error("ConnectLinuxCommand scpPut===" + "ip:" + ip + " userName:" + userName + " localFile:" + localFile + " remoteDir:" + remoteDir); RemoteConnect remoteConnect = new RemoteConnect(); remoteConnect.setIp(ip); remoteConnect.setUserName(userName); remoteConnect.setPassword(password); if (login(remoteConnect)) { SCPClient client = new SCPClient(conn); client.put(localFile, remoteDir); conn.close(); } } }
依赖: <dependency> <groupId>ch.ethz.ganymed</groupId> <artifactId>ganymed-ssh2</artifactId> <version>build210</version> </dependency>
3)代码提交任务到yarn执行
脚本执行吗命令,最后肯定是代码执行,所以脚本调用的,代码都可以执行,先上代码吧:
package yarn; import org.apache.flink.client.deployment.ClusterDeploymentException; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.configuration.*; import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterInformationRetriever; import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.flink.yarn.configuration.YarnDeploymentTarget; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import java.util.Collections; import static org.apache.flink.configuration.JobManagerOptions.TOTAL_PROCESS_MEMORY; import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES; import org.apache.hadoop.fs.FSOutputSummer; /** * @author zhangjun 欢迎关注我的公众号[大数据技术与应用实战],及时获取更多精彩实战内容 * <p> * 通过api的方式以application的模式来提交flink任务到yarn集群 */ public class SubmitJobApplicationMode{ public static void main(String[] args){ //flink的本地配置目录,为了得到flink的配置 String configurationDirectory = "G:/flink_working_tools/yarn-clientconfig/yarn-conf"; //存放flink集群相关的jar包目录 String flinkLibs = "hdfs://dev-ct6-dc-master01:8020/flink/libs"; //用户jar String userJarPath = "hdfs://dev-ct6-dc-master01:8020/flink/SocketWindowWordCount.jar"; String flinkDistJar = "hdfs://dev-ct6-dc-master01:8020/flink/flink-yarn_2.11-1.11.0.jar"; YarnClient yarnClient = YarnClient.createYarnClient(); org.apache.hadoop.conf.Configuration entries = new org.apache.hadoop.conf.Configuration(); entries.addResource(new Path("G:/flink_working_tools/yarn-clientconfig/yarn-conf/yarn-site.xml")); entries.addResource(new Path("G:/flink_working_tools/yarn-clientconfig/yarn-conf/hdfs-site.xml")); entries.addResource(new Path("G:/flink_working_tools/yarn-clientconfig/yarn-conf/core-site.xml")); YarnConfiguration yarnConfiguration = new YarnConfiguration(entries); yarnClient.init(yarnConfiguration); yarnClient.start(); YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever .create(yarnClient); //获取flink的配置 Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration( configurationDirectory); flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); flinkConfiguration.set( PipelineOptions.JARS, Collections.singletonList( userJarPath)); Path remoteLib = new Path(flinkLibs); flinkConfiguration.set( YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(remoteLib.toString())); flinkConfiguration.set( YarnConfigOptions.FLINK_DIST_JAR, flinkDistJar); //设置为application模式 flinkConfiguration.set( DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName()); //yarn application name flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "TestApplication"); flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES)); flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES)); ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder() .createClusterSpecification(); // 设置用户jar的参数和主类 ApplicationConfiguration appConfig = new ApplicationConfiguration(args, null); YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor( flinkConfiguration, yarnConfiguration, yarnClient, clusterInformationRetriever, true); ClusterClientProvider<ApplicationId> clusterClientProvider = null; try { clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster( clusterSpecification, appConfig); } catch (ClusterDeploymentException e){ e.printStackTrace(); } ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient(); ApplicationId applicationId = clusterClient.getClusterId(); System.out.println(applicationId); } }
解释一下要点:
1)依赖 ,因为我是flink 1.11.0,你可以自行根据你自己的版本,当然这个包你要从maven库拷贝出来,待会还需要
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-yarn_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
2)正常,这3个文件是需要的
3)还是跟脚本命令一样,我们要设置配置参数
我们点击到对应的源码里面去,无非是下面3个类,yarn的参数 jm,tm的参数配置:
YarnConfigOptions
TaskManagerOptions
JobManagerOptions
举例:
我要设置jobmanager的配置参数,我们在官网查询到这个属性为:
jobmanager.memory.process.size
我们点击到 类 ‘JobManagerOptions ’ 搜索:jobmanager.memory.process.size
代码参数设置就为:
flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));
后续还有指定日志打印格式哦,savepoint等等:
4)我们将依赖包上传到hdfs,还有
5)后面就是执行了 。如果出现失败
比如:
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn Application Cluster
at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:414)
at org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:64)
at org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:197)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:919)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: The number of requested virtual cores per node 8 exceeds the maximum number of virtual cores 4 available in the Yarn Cluster. Please note that the number of virtual cores is set to the number of task slots by default unless configured in the Flink config with 'yarn.containers.vcores.'
at org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:294)
at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:480)
at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:407)
或者需要hadoop_classpath等,就是参数没设置对,要加上hadoop依赖,参考上面的脚本。
设置yarn参数。