点击这里查看 Flink 1.13 源码解析 目录汇总
点击查看相关章节Flink 1.13 源码解析——启动脚本解析
点击查看相关章节Flink 1.13 源码解析前导——Akka通信模型
点击查看相关章节Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动
点击查看相关章节Flink 1.13 源码解析——TaskManager启动流程概览
点击查看相关章节Flink 1.13 源码解析——TaskManager启动流程 之 与ResourceManager的注册交互
目录
一、前言
二、TaskExecutor的构建
2.1、TaskManager基础服务的初始化
2.1.1、BlobCacheService的初始化
2.2、TaskExecutor的构造过程
2.2.3、TaskSlotTable详解
2.2.3、TaskExecutor的初始化
总结:
一、前言
在之前的章节中我们分析了Flink主节点(JobManager)的启动流程,在接下来这几章里,我们来从源码入手分析一下Flink从节点的启动流程,TaskManager的启动流程中,有很多步骤和主节点的启动是相同的,他没有主节点中那么多的组件,但是启动的步骤要比主节点繁杂很多,在这一章我们首先来了解TaskManager的初始化流程。
二、TaskExecutor的构建
在之前Flink启动脚本分析章节(点此查看 Flink 1.13 源码解析——启动脚本解析)中我们得知,standalone模式下Flink从节点的启动类为,所以我们直接来看这个类的main方法:
// --------------------------------------------------------------------------------------------
// Static entry point
// --------------------------------------------------------------------------------------------
public static void main(String[] args) throws Exception {
// startup checks and logging
(LOG, "TaskManager", args);
(LOG);
(LOG);
long maxOpenFileHandles = ();
if (maxOpenFileHandles != -1L) {
("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
} else {
("Cannot determine the maximum number of open file descriptors");
}
// TODO 启动
runTaskManagerProcessSecurely(args);
}
在main方法中前几行代码做了一些参数、配置校验的工作,我们直接来看runTaskManagerProcessSecurely方法:
public static void runTaskManagerProcessSecurely(String[] args) {
Configuration configuration = null;
try {
// TODO 解析args和文件得到配置信息
configuration = loadConfiguration(args);
} catch (FlinkParseException fpe) {
("Could not load the configuration.", fpe);
(FAILURE_EXIT_CODE);
}
// TODO 启动
runTaskManagerProcessSecurely(checkNotNull(configuration));
}
该方法依然是我们熟悉的从命令以及文件解析配置,然后将解析后的配置传递给runTaskManagerProcessSecurely方法,我们点进来继续看:
public static void runTaskManagerProcessSecurely(Configuration configuration) {
(configuration);
// TODO 启动插件管理器
final PluginManager pluginManager =
(configuration);
(configuration, pluginManager);
int exitCode;
Throwable throwable = null;
try {
(new SecurityConfiguration(configuration));
exitCode =
()
// TODO 启动TaskManager
.runSecured(() -> runTaskManager(configuration, pluginManager));
} catch (Throwable t) {
throwable = (t, );
exitCode = FAILURE_EXIT_CODE;
}
if (throwable != null) {
("Terminating TaskManagerRunner with exit code {}.", exitCode, throwable);
} else {
("Terminating TaskManagerRunner with exit code {}.", exitCode);
}
(exitCode);
}
在该方法里,启动了一个插件管理器,并且执行了一个runTaskManager的方法,通过名字我们不难看出,离TaskManager的构建越来越近了。我们点进runTaskManager方法:
public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
throws Exception {
final TaskManagerRunner taskManagerRunner;
try {
// TODO 构建一个TaskManagerRunner
taskManagerRunner =
new TaskManagerRunner(
configuration,
pluginManager,
// TODO 真正创建TaskExecutor的地方
TaskManagerRunner::createTaskExecutorService);
// TODO 启动TaskManagerRunner
();
} catch (Exception exception) {
throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
}
try {
return ().get().getExitCode();
} catch (Throwable t) {
throw new FlinkException(
"Unexpected failure during runtime of TaskManagerRunner.",
(t));
}
}
在这个方法里做了两件事:
1、构建了一个TaskManagerRunner
2、启动TaskManagerRunner
实际上,TaskManager启动的所有准备工作,都是在这个TaskManagerRunner中完成的。我们继续进来这个TaskManagerRunner的构造方法来看:
2.1、TaskManager基础服务的初始化
public TaskManagerRunner(
Configuration configuration,
PluginManager pluginManager,
TaskExecutorServiceFactory taskExecutorServiceFactory)
throws Exception {
= checkNotNull(configuration);
timeout = (configuration);
// TODO TaskManager 内部线程池,用来处理从节点内部各个组件的Io的线程池
// TODO 线程池大小为当前节点的cpu核心数
=
(
(),
new ExecutorThreadFactory("taskmanager-future"));
// TODO 高可用服务
highAvailabilityServices =
(
configuration,
executor,
.NO_ADDRESS_RESOLUTION);
// TODO 1.12 新功能 JMX服务,提供监控信息
((JMXServerOptions.JMX_SERVER_PORT));
// TODO 启动RPC服务,内部为Akka模型的ActorSystem
rpcService = createRpcService(configuration, highAvailabilityServices);
// TODO 为TaskManager生成了一个ResourceID
=
getTaskManagerResourceID(
configuration, (), ());
// TODO 初始化心跳服务,主要是初始化心跳间隔和心跳超时参数配置
HeartbeatServices heartbeatServices = (configuration);
metricRegistry =
new MetricRegistryImpl(
(configuration),
(configuration, pluginManager));
final RpcService metricQueryServiceRpcService =
(configuration, ());
(metricQueryServiceRpcService, resourceId);
// TODO 在主节点启动的时候,事实上已经启动了有个BolbServer,
// TODO 从节点启动的时候,会启动一个BlobCacheService,做文件缓存的服务
blobCacheService =
new BlobCacheService(
configuration, (), null);
final ExternalResourceInfoProvider externalResourceInfoProvider =
(
configuration, pluginManager);
// TODO 创建得到一个TaskExecutorService,内部封装了TaskExecutor,同时TaskExecutor的构建也在内部完成
taskExecutorService =
(
,
,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
false,
externalResourceInfoProvider,
this);
= new CompletableFuture<>();
= false;
handleUnexpectedTaskExecutorServiceTermination();
(
LOG, configuration, (ignored -> {}));
}
不难看出,这里所做的工作和JobManager启动时一样,是一些基础服务的构建和启动,在这里一共做了以下这些工作:
1、初始化了一个TaskManager内部线程池,用来处理从节点内部各个组件的IO,该线程池的大小为当前节点CPU的核心数。
2、构建了一个高可用服务。
3、初始化JMX服务,用于提供监控信息。
4、启动RPC服务,内部为Akka模型的ActorSystem(点此查看Flink 1.13 源码解析前导——Akka通信模型)
4、为TaskManager生成了一个ResourceID。
5、初始化心跳服务,根据配置文件获取心跳间隔时间参数以及心跳超时参数
6、初始化metric服务
7、启动BlobCacheService服务,做文件缓存的服务。
8、构建了一个TaskExecutorService,内部封装了TaskExecutor。
2.1.1、BlobCacheService的初始化
在以上这些基础环境的初始化中,我们首先来看下BlobCacheService服务的初始化,点进BlobCacheService的构造方法:
public BlobCacheService(
final Configuration blobClientConfig,
final BlobView blobView,
@Nullable final InetSocketAddress serverAddress)
throws IOException {
/*
TODO 初始化了两个文件服务:
1. 持久化Blob缓存服务
2. 临时Blob缓存服务
在这两个服务的内部都会在启动的时候启动一个定时服务
就是把过期的某个Job的对应资源都删除掉
*/
this(
// TODO 持久化
new PermanentBlobCache(blobClientConfig, blobView, serverAddress),
// TODO 缓存
new TransientBlobCache(blobClientConfig, serverAddress));
}
在这个构造方法里,主要做了两件事:
1、初始化了一个持久化Blob缓存服务
2、初始化了一个临时Blob缓存服务
在这两个服务的内部,都会在启动的时候启动一个定时服务,就是将过期的某个Job的对应资源都删除掉。
我们以持久化Blob缓存服务为例,点进PermanentBlobCache对象的构造方法
public PermanentBlobCache(
final Configuration blobClientConfig,
final BlobView blobView,
@Nullable final InetSocketAddress serverAddress)
throws IOException {
super(
blobClientConfig,
blobView,
(),
serverAddress);
// Initializing the clean up task
= new Timer(true);
// TODO 配置过期时间为1小时
= (BlobServerOptions.CLEANUP_INTERVAL) * 1000;
// TODO 启动定时任务,每1小时清理一次
(
new PermanentBlobCleanupTask(), cleanupInterval, cleanupInterval);
}
可以看到,在下面首先配置了一个过期时间,为1小时,接着启动了一个定时服务,每1小时执行一次PermanentBlobCleanupTask,我们继续来看PermanentBlobCleanupTask的run方法
class PermanentBlobCleanupTask extends TimerTask {
/** Cleans up BLOBs which are not referenced anymore. */
@Override
public void run() {
// TODO 通过引用计数的方式获取所有Job引用的文件
synchronized (jobRefCounters) {
Iterator<<JobID, RefCount>> entryIter =
().iterator();
final long currentTimeMillis = ();
// TODO 遍历所有文件
while (()) {
<JobID, RefCount> entry = ();
RefCount ref = ();
// TODO 判断是否过期
if ( <= 0
&& > 0
&& currentTimeMillis >= ) {
JobID jobId = ();
final File localFile =
new File(
(
(), jobId));
/*
* NOTE: normally it is not required to acquire the write lock to delete the job's
* storage directory since there should be no one accessing it with the ref
* counter being 0 - acquire it just in case, to always be on the safe side
*/
().lock();
boolean success = false;
try {
// TODO 删除该资源文件夹
(localFile);
success = true;
} catch (Throwable t) {
(
"Failed to locally delete job directory "
+ (),
t);
} finally {
().unlock();
}
// let's only remove this directory from cleanup if the cleanup was
// successful
// (does not need the write lock)
if (success) {
();
}
}
}
}
}
}
我们可以看到有以下操作:
1、首先在方法里通过引用计数的方式,获取所有job引用的资源文件。
2、遍历这些文件,并判断是否过期。
3、如果过期则删除该资源文件夹。
在临时缓存blob服务中也是一样的工作:
public TransientBlobCache(
final Configuration blobClientConfig, @Nullable final InetSocketAddress serverAddress)
throws IOException {
super(
blobClientConfig,
new VoidBlobStore(),
(),
serverAddress);
// Initializing the clean up task
= new Timer(true);
// TODO 1小时
= (BlobServerOptions.CLEANUP_INTERVAL) * 1000;
(
// TODO 定时服务
new TransientBlobCleanupTask(
blobExpiryTimes, (), storageDir, log),
cleanupInterval,
cleanupInterval);
}
首先获取超时时间为1小时,接着启动了一个定时服务,每1小时清理一次。
接下来到了重要环节,TaskExecutor的初始化
2.2、TaskExecutor的构造过程
我们点进方法里:
public static TaskExecutorService createTaskExecutorService(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
BlobCacheService blobCacheService,
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
FatalErrorHandler fatalErrorHandler)
throws Exception {
// TODO 创建TaskExecutor
final TaskExecutor taskExecutor =
startTaskManager(
configuration,
resourceID,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
localCommunicationOnly,
externalResourceInfoProvider,
fatalErrorHandler);
/*
TODO 封装了一下TaskExecutor
TaskExecutor是TaskExecutorToServiceAdapter的成员变量
TaskExecutorToServiceAdapter是TaskManagerRunner的成员变量
*/
return (taskExecutor);
}
可以看到在这里真正初始化了一个TaskExecutor,并将TaskExecutor封装了一下,我们首先来看TaskExecutor的初始化,我们进入startTaskManager方法:
在该方法内部依然是初始化了一些基础服务:
首先是初始化资源配置,获取硬件资源配置:
// TODO 初始化资源配置,获取硬件资源配置
final TaskExecutorResourceSpec taskExecutorResourceSpec =
(configuration);
接着获取配置:
// TODO 获取配置(args和flink-conf)
TaskManagerServicesConfiguration taskManagerServicesConfiguration =
(
configuration,
resourceID,
externalAddress,
localCommunicationOnly,
taskExecutorResourceSpec);
在这里TaskManagerService初始化了一些核心服务:
// TODO 初始化了一些核心服务
TaskManagerServices taskManagerServices =
(
taskManagerServicesConfiguration,
(),
taskManagerMetricGroup.f1,
ioExecutor,
fatalErrorHandler);
我们进入fromConfiguration方法:
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
PermanentBlobService permanentBlobService,
MetricGroup taskManagerMetricGroup,
ExecutorService ioExecutor,
FatalErrorHandler fatalErrorHandler)
throws Exception {
// pre-start checks
checkTempDirs(());
// TODO 状态机 事件分发器
final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
// start the I/O manager, it will create some temp directories.
final IOManager ioManager =
new IOManagerAsync(());
// TODO 作业执行期间shuffle相关操作工作,后面讲作业执行时再细聊
final ShuffleEnvironment<?, ?> shuffleEnvironment =
createShuffleEnvironment(
taskManagerServicesConfiguration,
taskEventDispatcher,
taskManagerMetricGroup,
ioExecutor);
final int listeningDataPort = ();
// TODO state管理服务
final KvStateService kvStateService =
(taskManagerServicesConfiguration);
();
final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
new UnresolvedTaskManagerLocation(
(),
(),
// we expose the task manager location with the listening port
// iff the external data port is not explicitly defined
() > 0
? ()
: listeningDataPort);
// TODO 广播变量管理服务
final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
// TODO TaskExecutor内部,最重要的一个成员变量
// TODO 一张存放TaskSlot的表
final TaskSlotTable<Task> taskSlotTable =
createTaskSlotTable(
(),
(),
(),
(),
ioExecutor);
final JobTable jobTable = ();
// TODO 监控主节点Leader地址
final JobLeaderService jobLeaderService =
new DefaultJobLeaderService(
unresolvedTaskManagerLocation,
());
。。。 。。。
return new TaskManagerServices(
unresolvedTaskManagerLocation,
().getBytes(),
ioManager,
shuffleEnvironment,
kvStateService,
broadcastVariableManager,
taskSlotTable,
jobTable,
jobLeaderService,
taskStateManager,
taskEventDispatcher,
ioExecutor,
libraryCacheManager);
}
在这里,初始化了事件分发起、IOManager、ShuffleEnvironment、state管理服务、广播变量历服务、TaskSlotJobManager的Leader地址监控服务等等,这里我们着重看一下TableSlot表,其他的核心服务我们会在后续Job的执行流程、Slot分配流程中详细描述,这里就先不聊了。
2.2.3、TaskSlotTable详解
首先在TaskSlotTable,是TaskExecutor中非常非常重要的一个成员变量,它是真正帮助TaskExecutor完成一切和Slot有关操作的组件,在ResourceManager中,也有一个类似的组件,就是在注册的两个定时任务中的其中一个:slot定时任务SlotManager。(点击查看Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动)
在JobMaster申请资源时,是ResourceManager中的SlotManager来完成资源分配的,在完成资源分配后,SlotManager会向TaskExecutor发送RPC请求,然后TaskExecutor再向ResourceManager去做汇报表示已完成分配。我们来看TaskSlotTable的实现类,其中有几个十分重要的变量:
/** The list of all task slots. */
// TODO 所有的slot
// TODO 在TaskManager启动时会将自身的slot汇报给ResourceManager,并将slot封装为taskSlot
private final Map<Integer, TaskSlot<T>> taskSlots;
/** Mapping from allocation id to task slot. */
// TODO 所有已被分配的slot,维护着分配ID和TaskSlot之间的关系
private final Map<AllocationID, TaskSlot<T>> allocatedSlots;
其中taskSlots存放着所有的当前节点的slot,在当前节点的TaskManager启动时,会将自身的slot汇报给ResourceManager,并将slot封装为taskSlot。
而allocatedSlots存放这所有已被分配的slot的信息,维护着分配ID和TaskSlot之间的关系。
2.2.3、TaskExecutor的初始化
我们继续回到方法,看最后一步,初始化TaskExecutor,我们点进TaskExecutor的构造方法,首先看到TaskExecutor继承自RPCEndpoint,那么我们就知道,当TaskExecutor初始化完成之后回去调用自身 的onStart方法(点击查看Flink 1.13 源码解析前导——Akka通信模型),此刻还在初始化之中,所以我们先继续往下看
public TaskExecutor(
RpcService rpcService,
TaskManagerConfiguration taskManagerConfiguration,
HighAvailabilityServices haServices,
TaskManagerServices taskExecutorServices,
ExternalResourceInfoProvider externalResourceInfoProvider,
HeartbeatServices heartbeatServices,
TaskManagerMetricGroup taskManagerMetricGroup,
@Nullable String metricQueryServiceAddress,
BlobCacheService blobCacheService,
FatalErrorHandler fatalErrorHandler,
TaskExecutorPartitionTracker partitionTracker) {
// TaskExecutor为RPCEndpoint的子类,这个构造器调用的RPCEndpoint的构造器
super(rpcService, (TASK_MANAGER_NAME));
checkArgument(
() > 0,
"The number of slots has to be larger than 0.");
= checkNotNull(taskManagerConfiguration);
= checkNotNull(taskExecutorServices);
= checkNotNull(haServices);
= checkNotNull(fatalErrorHandler);
= partitionTracker;
= checkNotNull(taskManagerMetricGroup);
= checkNotNull(blobCacheService);
= metricQueryServiceAddress;
= checkNotNull(externalResourceInfoProvider);
= ();
= ();
= ();
= ();
=
();
= ();
= ();
= ();
= ();
= ();
=
(());
=
(());
= null;
= null;
= null;
final ResourceID resourceId =
().getResourceID();
// TODO 初始化了两个心跳管理器
// TODO TaskExecutor维持和JobMaster的心跳
=
createJobManagerHeartbeatManager(heartbeatServices, resourceId);
// TODO TaskExecutor维持和ResourceManager的心跳
=
createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
ExecutorThreadFactory sampleThreadFactory =
new ()
.setPoolName("flink-thread-info-sampler")
.build();
ScheduledExecutorService sampleExecutor =
(sampleThreadFactory);
= new ThreadInfoSampleService(sampleExecutor);
}
在前半部分进行的一些变量的赋值,在下面初始化了两个心跳管理器,分别为:
1、TaskExecutor维持和JobMaster的心跳的管理器
2、TaskExecutor维持和ResourceManager心跳的管理器
在心跳管理器内部初始化了一个HeartbeatManagerImpl对象,还记得我们在ResourceManager中初始化的心跳管理器为HeartbeatManagerSenderImpl,根据名字能看出这是一个心跳请求发送器,也是在ResourceManager那一章节中我们讲到,在HeartbeatManagerSenderImpl中会有一个定时任务,每10秒钟遍历一次所有的已注册的心跳目标对象,并向每个对象发送心跳请求(点击查看Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动)
public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log) {
/*
TODO
主节点中的心跳管理器为HeartbeatManagerSenderImpl 心跳请求发送器 client
在HeartbeatManagerSenderImpl内部构建了一个定时服务
每10秒 向所有的心跳目标对象,发送心跳请求
从节点(当前)为HeartbeatManagerImpl 心跳请求处理器 Server
*/
return new HeartbeatManagerImpl<>(
heartbeatTimeout, resourceId, heartbeatListener, mainThreadExecutor, log);
}
到此为止,我们的TaskExecutor的正式初始化完成。
总结:
我们在这里总结一下TaskExecutor的初始化流程:
1、首先构建了一个TaskManagerRunner,用于完成TaskManager启动的准备工作,再完成准备工作后,通过调用TaskManagerRunner的start方法来启动。
2、在TaskManagerRunner内部初始化了一个TaskManagerService对象,用来初始化TaskExecutor所需要的基础服务。
3、在TaskManagerService内部,首先会初始化一些基础服务,如TaskEvent Dispatcher、IO管理器、shuffleEnvironment、state管理器、TaskSlotTable等等。
4、在完成基础服务的初始化之后,开始初始化TaskExecutor,首先初始化了两个心跳管理期,分别来维护和JobMaster、ResourceManager的心跳。因为TaskExecutor继承了RpcEndpoint,所以具有生命周期方法onStart。
5、TaskExecutor初始化完成。
在下一章里我们来看已经初始化完成的TaskExecutor的启动流程。
下一章: Flink 1.13 源码解析——TaskManager启动流程概览