深度了解flink(九) JobManager(3) HA分析

时间:2024-11-01 07:11:05

HA核心类、接口

HighAvailabilityServices

HighAvailabilityServices是HA Service的核心接口,具体功能如下:

1.定义了高可用组件(Dispatcher、ResourceManager等)的leader选举接口和leader获取接口

2.检查点元数据的持久:将检查点的元数据存储到持久化存储中,以便在系统重启或故障恢复时能够恢复状态。

3.注册最新的完成检查点:记录最新的完成检查点,以便在需要时能够快速恢复到该检查点的状态。

4.BLOB 存储的持久化:将大对象(BLOB)数据存储到持久化存储中,确保数据不会因系统重启而丢失。

5.标记作业状态的注册表:维护一个注册表,记录每个作业的状态(如运行中、已完成、失败等),以便进行状态管理和监控。

6.RPC 端点的命名:为远程过程调用(RPC)端点分配唯一的名称,以便在分布式系统中进行通信和调用。

HighAvailabilityServices的UML类图如下:

有两个实现类:

AbstractHaServices

不具有高可用服务的haService

AbstractNonHaServices

提供具有高可用的haService

HA Service初始化流程
ClusterEntry#initializeServices

ClusterEntry#initializeServices进行了集群启动前服务初始化工作,其中也初始化了高可用服务

haServices = createHaServices(configuration, ioExecutor, rpcSystem);
HighAvailabilityServicesUtils.createHighAvailabilityServices

调用createHaServices->会调用HighAvailabilityServicesUtils.createHighAvailabilityServices工具类的方法进行高可用服务的创建

public static HighAvailabilityServices createHighAvailabilityServices(
            Configuration configuration,
            Executor executor,
            AddressResolution addressResolution,
            RpcSystemUtils rpcSystemUtils,
            FatalErrorHandler fatalErrorHandler)
            throws Exception {
        //获取ha模式,默认是NONE,可选项有ZOOKEEPER和KUBERNETES或者自定义,如果为NONE不开启高可用
        HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);
        //根据高可用mode返回对应的haService的实现类
        switch (highAvailabilityMode) {
            case NONE:
                final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);
                //省略

                return new StandaloneHaServices(
                        resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);
            case ZOOKEEPER:
                return createZooKeeperHaServices(configuration, executor, fatalErrorHandler);
            case KUBERNETES:
                return createCustomHAServices(
                        "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory",
                        configuration,
                        executor);

            case FACTORY_CLASS:
                return createCustomHAServices(configuration, executor);

            default:
                throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
        }
    }

步骤:

1.通过配置获取高可用的模式

2.根据模式创建对应的haService的实现类

Leader获取

LeaderRetrievalListener
public interface LeaderRetrievalListener {
    void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);
    void handleError(Exception exception);
}

LeaderRetrievalListener接口主要服务于当Leader地址修改了,会触发接口中的notifyLeaderAddress(LeaderRetrievalService内部持有该接口的对象实例),方法功能,

1.notifyLeaderAddress Leader地址选举后(第一次,或者Leander变更)会调用该方法

2.handleError 出现异常时会调用该方法

LeaderRetrievalService
public interface LeaderRetrievalService {

    /**
     * Starts the leader retrieval service with the given listener to listen for new leaders. This
     * method can only be called once.
     *
     * @param listener The leader retrieval listener which will be notified about new leaders.
     * @throws Exception
     */
    void start(LeaderRetrievalListener listener) throws Exception;

    /**
     * Stops the leader retrieval service.
     *
     * @throws Exception
     */
    void stop() throws Exception;
}

LeaderRetrievalService是Flink中一个重要的接口,它主要用于在leader变更时收到通知,并回调注册的LeaderRetrievalListener,方法,

1.start:开始监听Leader,该方法之启动一次

2.stop:停止监听

Leader 选举

LeaderElectionService
public interface LeaderElectionService {

    /**
     * Creates a new {@link LeaderElection} instance that is registered to this {@code
     * LeaderElectionService} instance.
     *
     * @param componentId a unique identifier that refers to the stored leader information that the
     *     newly created {@link LeaderElection} manages.
     */
    LeaderElection createLeaderElection(String componentId);
}

1.选举服务接口,它允许在一组参选者中选出一个领导者

2.createLeaderElection 创建DefaultLeaderElection,DefaultLeaderElection才会进行具体的选举操作

LeaderElection
public interface LeaderElectionService {

    /**
     * Creates a new {@link LeaderElection} instance that is registered to this {@code
     * LeaderElectionService} instance.
     *
     * @param componentId a unique identifier that refers to the stored leader information that the
     *     newly created {@link LeaderElection} manages.
     */
    LeaderElection createLeaderElection(String componentId);
}

1.LeaderElection 是LeaderElectionService和LeaderContender之间的代理

2.startLeaderElection启动选举者 将LeaderContender作为参数

3.confirmLeadership选举成功后会调用该方法

4.hasLeadership判断是否拥有指定session下的leadership

5.close停止高可用选举等服务

LeaderContender

参选者,组件必须实现了该接口才能进行Leader选举(WebMonitorEndpoint、ResourceManagerServiceImpl)