kafka-connect-hdfs连接hadoop hdfs时候,竟然是单点的,太可怕了。。。果断改成HA

时间:2021-12-07 06:42:53

2017-08-16 11:57:28,237 WARN [org.apache.hadoop.hdfs.LeaseRenewer][458] - <Failed to renew lease for [DFSClient_NONMAPREDUCE_-1756242047_26] for 30 seconds. Will retry shortly ...> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category WRITE is not supported in state standby. Visit https://s.apache.org/sbnn-error at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:88) at org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1826) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1404) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewLease(FSNamesystem.java:4968) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewLease(NameNodeRpcServer.java:875) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.renewLease(AuthorizationProviderProxyClientProtocol.java:357) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewLease(ClientNamenodeProtocolServerSideTranslatorPB.java:633) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080) at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy50.renewLease(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.renewLease(ClientNamenodeProtocolTranslatorPB.java:571) at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy51.renewLease(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.renewLease(DFSClient.java:879) at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:417) at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:442) at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:71) at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:298) at java.lang.Thread.run(Thread.java:745)
提示信息中的网址说的很清楚https://s.apache.org/sbnn-error
3.17. What does the message "Operation category READ/WRITE is not supported in state standby" mean?

In an HA-enabled cluster, DFS clients cannot know in advance which namenode is active at a given time. So when a client contacts a namenode and it happens to be the standby, the READ or WRITE operation will be refused and this message is logged. The client will then automatically contact the other namenode and try the operation again. As long as there is one active and one standby namenode in the cluster, this message can be safely ignored.

If an application is configured to contact only one namenode always, this message indicates that the application is failing to perform any read/write operation. In such situations, the application would need to be modified to use the HA configuration for the cluster. The jira HDFS-3447 deals with lowering the severity of this message (and similar ones) to DEBUG so as to reduce noise in the logs, but is unresolved as of July 2015.

kafka-connect-hdfs中操作hdfs的HdfsStorage.class中需要做修改

/**
 * Copyright 2015 Confluent Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 **/

package io.confluent.connect.hdfs.storage;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.kafka.common.TopicPartition;

import java.io.IOException;
import java.net.URI;

import io.confluent.connect.hdfs.wal.FSWAL;
import io.confluent.connect.hdfs.wal.WAL;

public class HdfsStorage implements Storage {

  private final FileSystem fs;
  private final Configuration conf;
  private final String url;

  public HdfsStorage(Configuration conf,  String url) throws IOException {
    //fs = FileSystem.newInstance(URI.create(url), conf);原来的
    fs = FileSystem.newInstance(conf);修改后的
    this.conf = conf;
    this.url = url;
  }

  @Override
  public FileStatus[] listStatus(String path, PathFilter filter) throws IOException {
    return fs.listStatus(new Path(path), filter);
  }

  @Override
  public FileStatus[] listStatus(String path) throws IOException {
    return fs.listStatus(new Path(path));
  }

  @Override
  public void append(String filename, Object object) throws IOException {

  }

  @Override
  public boolean mkdirs(String filename) throws IOException {
    return fs.mkdirs(new Path(filename));
  }

  @Override
  public boolean exists(String filename) throws IOException {
    return fs.exists(new Path(filename));
  }

  @Override
  public void commit(String tempFile, String committedFile) throws IOException {
    renameFile(tempFile, committedFile);
  }


  @Override
  public void delete(String filename) throws IOException {
    fs.delete(new Path(filename), true);
  }

  @Override
  public void close() throws IOException {
    if (fs != null) {
      fs.close();
    }
  }

  @Override
  public WAL wal(String topicsDir, TopicPartition topicPart) {
    return new FSWAL(topicsDir, topicPart, this);
  }

  @Override
  public Configuration conf() {
    return conf;
  }

  @Override
  public String url() {
    return url;
  }

  private void renameFile(String sourcePath, String targetPath) throws IOException {
    if (sourcePath.equals(targetPath)) {
      return;
    }
    final Path srcPath = new Path(sourcePath);
    final Path dstPath = new Path(targetPath);
    if (fs.exists(srcPath)) {
      fs.rename(srcPath, dstPath);
    }
  }
}

当然 url的相应配置得改成hdfs://nameservice/*,因为要HA 啊。不能按照原来的要求了,原来的要求如下:

// HDFS Group
  public static final String HDFS_URL_CONFIG = "hdfs.url";
  private static final String HDFS_URL_DOC =
      "The HDFS connection URL. This configuration has the format of hdfs:://hostname:port and "
      + "specifies the HDFS to export data to.";
  private static final String HDFS_URL_DISPLAY = "HDFS URL";

虽然实例化storage时候不用url了,往hive load还是要的。

    url = connectorConfig.getString(HdfsSinkConnectorConfig.HDFS_URL_CONFIG);
      topicsDir = connectorConfig.getString(HdfsSinkConnectorConfig.TOPICS_DIR_CONFIG);
      String logsDir = connectorConfig.getString(HdfsSinkConnectorConfig.LOGS_DIR_CONFIG);

      @SuppressWarnings("unchecked")
      Class<? extends Storage> storageClass = (Class<? extends Storage>) Class
              .forName(connectorConfig.getString(HdfsSinkConnectorConfig.STORAGE_CLASS_CONFIG));
      storage = StorageFactory.createStorage(storageClass, conf, url);

 kafka-connect-hdfs连接hadoop hdfs时候,竟然是单点的,太可怕了。。。果断改成HA