目前采用的Flink的版本是1.4.2,运行在yarn上,总是时不时的报错“Invalid AMRMToken from appattempt”,导致AM挂掉。
简而言之,就是AM和RM沟通的过程中,突然AM提供的Token不被认可,导致拒绝连接,进而AM挂掉。
后来发现早期版本的yarn-client是存在问题的,至少在2.6.0还是存在问题,在2.7.0之后解决了。
具体的错误描述可参见:https://issues.apache.org/jira/browse/YARN-3103。
但由于Flink为了减少依赖,将相关的依赖直接转换成自己的类,去看了下这部分代码,果然是有问题。
更新了flink-shaded-hadoop项目中flink-shaded-hadoop2-uber项目引用的hadoop的包的版本,原先引用的是2.4.0,改完2.7.2,重新打包,部署。
正确的代码是:
private void updateAMRMToken(org.apache.hadoop.yarn.api.records.Token token) throws IOException { org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken = new org.apache.hadoop.security.token.Token(token.getIdentifier().array(), token.getPassword().array(), new Text(token.getKind()), new Text(token.getService())); UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); currentUGI.addToken(amrmToken); amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig())); }
在构建完了token之后,才会更新服务。
错误的代码是:
private void updateAMRMToken(org.apache.hadoop.yarn.api.records.Token token) throws IOException { org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken = new org.apache.hadoop.security.token.Token(token.getIdentifier().array(), token.getPassword().array(), new Text(token.getKind()), new Text(token.getService())); amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig())); UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); if (UserGroupInformation.isSecurityEnabled()) { currentUGI = UserGroupInformation.getLoginUser(); } currentUGI.addToken(amrmToken); }
在增加token之前,先设置了service,导致上下文发生变化,就导致了多个不同的token的产生,后续如果选择了特定的token,就会报错。