Flink on yarn的问题:Invalid AMRMToken

时间:2022-07-20 04:51:43

目前采用的Flink的版本是1.4.2,运行在yarn上,总是时不时的报错“Invalid AMRMToken from appattempt”,导致AM挂掉。

 

简而言之,就是AM和RM沟通的过程中,突然AM提供的Token不被认可,导致拒绝连接,进而AM挂掉。

 

后来发现早期版本的yarn-client是存在问题的,至少在2.6.0还是存在问题,在2.7.0之后解决了。

 

具体的错误描述可参见:https://issues.apache.org/jira/browse/YARN-3103。

 

但由于Flink为了减少依赖,将相关的依赖直接转换成自己的类,去看了下这部分代码,果然是有问题。

 

更新了flink-shaded-hadoop项目中flink-shaded-hadoop2-uber项目引用的hadoop的包的版本,原先引用的是2.4.0,改完2.7.2,重新打包,部署。

 

正确的代码是:

private void updateAMRMToken(org.apache.hadoop.yarn.api.records.Token token)
    throws IOException
  {
    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken = new org.apache.hadoop.security.token.Token(token.getIdentifier().array(), token.getPassword().array(), new Text(token.getKind()), new Text(token.getService()));
    
    UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
    currentUGI.addToken(amrmToken);
    amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig()));
  }

在构建完了token之后,才会更新服务。

 

错误的代码是:

private void updateAMRMToken(org.apache.hadoop.yarn.api.records.Token token)
    throws IOException
  {
    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken = new org.apache.hadoop.security.token.Token(token.getIdentifier().array(), token.getPassword().array(), new Text(token.getKind()), new Text(token.getService()));
    
    amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig()));
    UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
    if (UserGroupInformation.isSecurityEnabled()) {
      currentUGI = UserGroupInformation.getLoginUser();
    }
    currentUGI.addToken(amrmToken);
  }

在增加token之前,先设置了service,导致上下文发生变化,就导致了多个不同的token的产生,后续如果选择了特定的token,就会报错。