Flink BLOB架构

时间:2022-10-01 20:20:22

Flink中支持的BLOB文件类型

  • jar包

      被user classloader使用的jar包
  • 高负荷RPC消息

      1. RPC消息长度超出了akka.framesize的大小
    2. 在HA摸式中,利用底层分布式文件系统分发单个高负荷RPC消息,比如: TaskDeploymentDescriptor,给多个接受对象。
    3. 失败导致重新部署过程中复用RPC消息
  • TaskManager的日志文件

      为了在web ui上展示taskmanager的日志

按存储特性又分为两类

  • PERMANENT_BLOB

      生命周期和job的生命周期一致,并且是可恢复的。会上传到BlobStore分布式文件系统中。
  • TRANSIENT_BLOB

      生命周期由用户自行管理,并且是不可恢复的。不会上传到BlobStore分布式文件系统中。

架构图

Flink BLOB架构

BlobStore

BLOB底层存储,支持多种实现`HDFS`,`S3`,`FTP`等,HA中使用BlobStore进行文件的恢复。

BlobServer

* 提供了基于jobId和BlobKey进行文件上传和下载的方法 
* 本地文件系统的读写,基于`<path>/<jobId>/<BlobKey>`目录结构
* HA 分布式文件系统的读写,基于`<path>/<jobId>/<BlobKey>`目录结构
* 负责本地文件系统和分布式文件系统的清理工作
* 先存储到本地文件系统中,然后如果需要的话再存储到分布式文件系统中
* 下载请求优先使用本地文件系统中的文件
* 进行HA恢复中,下载分布式系统中的文件到本地文件系统中

BlobClient

* 基于jobId和BlobKey对BlobServer中的文件进行本地文件缓存
* 本地文件的读写,基于`<path>/<jobId>/<BlobKey>`目录结构
* 优先使用本地文件系统中的文件,然后尝试从HA分布式文件中获取,最后才尝试从BlobServer中下载
* 负责本地文件系统的清理工作

LibraryCacheManager

桥接task的classloader和缓存的库文件,其`registerJob`,`registerTask`会构建并缓存job,task运行需要的classloader

示例解析:standalone模式中的jar包管理

  JobManager会创建BlobStore、BlobServer、BlobLibraryCacheManager具体过程见JobManager的createJobManagerComponents方法

    try {
blobServer = new BlobServer(configuration, blobStore)
blobServer.start()
instanceManager = new InstanceManager()
scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor))
libraryCacheManager =
new BlobLibraryCacheManager(
blobServer,
ResolveOrder.fromString(classLoaderResolveOrder),
alwaysParentFirstLoaderPatterns) instanceManager.addInstanceListener(scheduler)
}

  TaskManager注册到Jobmanager后会创建BlobCacheService、BlobLibraryCacheManager具体过程见TaskManager的associateWithJobManager方法

    try {
val blobcache = new BlobCacheService(
address,
config.getConfiguration(),
highAvailabilityServices.createBlobStore())
blobCache = Option(blobcache)
libraryCacheManager = Some(
new BlobLibraryCacheManager(
blobcache.getPermanentBlobService,
config.getClassLoaderResolveOrder(),
config.getAlwaysParentFirstLoaderPatterns))
}

  JobClient在向集群提交job的过程中会调用JobSubmissionClientActor的tryToSubmitJob方法进而调用JobGraph对象的uploadUserJars方法

				try {
jobGraph.uploadUserJars(blobServerAddress, clientConfig);
} catch (IOException exception) {
getSelf().tell(
decorateMessage(new JobManagerMessages.JobResultFailure(
new SerializedThrowable(
new JobSubmissionException(
jobGraph.getJobID(),
"Could not upload the jar files to the job manager.",
exception)
)
)),
ActorRef.noSender()); return null;
} LOG.info("Submit job to the job manager {}.", jobManager.path()); jobManager.tell(
decorateMessage(
new JobManagerMessages.SubmitJob(
jobGraph,
ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
getSelf());
	public void uploadUserJars(
InetSocketAddress blobServerAddress,
Configuration blobClientConfig) throws IOException {
if (!userJars.isEmpty()) {
List<PermanentBlobKey> blobKeys = BlobClient.uploadJarFiles(
blobServerAddress, blobClientConfig, jobID, userJars); for (PermanentBlobKey blobKey : blobKeys) {
if (!userJarBlobKeys.contains(blobKey)) {
userJarBlobKeys.add(blobKey);
}
}
}
}

  然后在JobManager的submitJob方法中会调用BlobLibraryCacheManager的registerJob创建并缓存该job的classloader

        try {
libraryCacheManager.registerJob(
jobGraph.getJobID, jobGraph.getUserJarBlobKeys, jobGraph.getClasspaths)
}
catch {
case t: Throwable =>
throw new JobSubmissionException(jobId,
"Cannot set up the user code libraries: " + t.getMessage, t)
} val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)

  TaskManager在执行Task时,首先会调用LibraryCacheManager的registerTask从BlobServer下载相应的jar包并创建classloader

		blobService.getPermanentBlobService().registerJob(jobId);

		// first of all, get a user-code classloader
// this may involve downloading the job's JAR files and/or classes
LOG.info("Loading JAR files for task {}.", this); userCodeClassLoader = createUserCodeClassloader();
	private ClassLoader createUserCodeClassloader() throws Exception {
long startDownloadTime = System.currentTimeMillis(); // triggers the download of all missing jar files from the job manager
libraryCache.registerTask(jobId, executionId, requiredJarFiles, requiredClasspaths); LOG.debug("Getting user code class loader for task {} at library cache manager took {} milliseconds",
executionId, System.currentTimeMillis() - startDownloadTime); ClassLoader userCodeClassLoader = libraryCache.getClassLoader(jobId);
if (userCodeClassLoader == null) {
throw new Exception("No user code classloader available.");
}
return userCodeClassLoader;
}

涉及到的相关配置

参数 默认值 描述
high-availability.storageDir HA BlobStore根目录
blob.storage.directory <java.io.tmpdir> BlobServer 本地文件根目录
blob.fetch.num-concurrent 50 BlobServer fetch文件的最大并行度
blob.fetch.backlog 1000 允许最大的排队等待链接数
blob.service.cleanup.interval 3600 BlobServer cleanup 线程运行的间隔
blob.fetch.retries 5 从BlobServer下载文件错误重试次数
blob.server.port 0 BlobServer端口范围
blob.offload.minsize 1024 * 1024 运行通过BlobServer传递的最小消息大小
classloader.resolve-order child-first classloader类加载顺序

Flink BLOB架构的更多相关文章

  1. Flink资料(3)-- Flink一般架构和处理模型

    Flink一般架构和处理模型 本文翻译自General Architecture and Process Model ----------------------------------------- ...

  2. Flink&vert; 运行架构

    1. Flink运行时组件 作业管理器(JobManager) 任务管理器(TaskManager) 资源管理器(ResourceManager) 分发器(Dispatcher) 2. 任务提交流程 ...

  3. Flink架构,源码及debug

    序 工作中用Flink做批量和流式处理有段时间了,感觉只看Flink文档是对Flink ProgramRuntime的细节描述不是很多, 程序员还是看代码最简单和有效.所以想写点东西,记录一下,如果能 ...

  4. flink架构介绍

    前言 flink作为基于流的大数据计算引擎,可以说在大数据领域的红人,下面对flink-1.7的架构进行逻辑上的分析并和spark做了一些关键点的对比. 架构 如图1,flink架构分为3个部分,cl ...

  5. Flink入门(二)——Flink架构介绍

    1.基本组件栈 了解Spark的朋友会发现Flink的架构和Spark是非常类似的,在整个软件架构体系中,同样遵循着分层的架构设计理念,在降低系统耦合度的同时,也为上层用户构建Flink应用提供了丰富 ...

  6. Flink原理(一)——基础架构

    Flink系列博客,基于Flink1.6,打算分为三部分:原理.源码.实例以及API使用分析,后期等系列博客完成后再弄一个目录. 该系列博客是我自己学习过程中的一些理解,若有不正确.不准确的地方欢迎大 ...

  7. 揭秘 Flink 1&period;9 新架构,Blink Planner 你会用了吗?

    本文为 Apache Flink 新版本重大功能特性解读之 Flink SQL 系列文章的开篇,Flink SQL 系列文章由其核心贡献者们分享,涵盖基础知识.实践.调优.内部实现等各个方面,带你由浅 ...

  8. 开篇 &vert; 揭秘 Flink 1&period;9 新架构,Blink Planner 你会用了吗?

    本文为 Apache Flink 新版本重大功能特性解读之 Flink SQL 系列文章的开篇,Flink SQL 系列文章由其核心贡献者们分享,涵盖基础知识.实践.调优.内部实现等各个方面,带你由浅 ...

  9. Flink的应用场景和架构

    Flink的应用场景 Flink项目的理念就是:Flink是为分布式,高性能,随时可用以及准确的流处理应用程序打造的开源流处理框架.自2019年开源以来,迅速成为大数据实时计算领域炙手可热的技术框架. ...

随机推荐

  1. 第3章 Linux常用命令(6)&lowbar;关机重启命令

    8. 关机重启命令 8.1 关机重启命令 (1)shutdown [选项] 时间     ①选项 -c:取消前一个关机命令 -h:关机 -r:重启 ②应用举例:#shutdown –h now.shu ...

  2. 【读书笔记】javascript 继承

    在JavaScript中继承不像C#那么直接,C#中子类继承父类之后马上获得了父类的属性和方法,但JavaScript需要分步进行. 让Brid 继承 Animal,并扩展自己fly的方法. func ...

  3. 转:Java环境变量配置

    工具/原料 windows系统 jak安装包 方法/步骤   安装jdk,这个没有的百度就能搜到,不再赘述 首先,右键计算机---属性---高级系统设置---环境变量   建议是都设置系统变量,一般P ...

  4. Linux程序存储结构与进程结构堆和栈的区别【转】

    转自:http://www.hongkevip.com/caozuoxitong/Unix_Linux/24581.html 红客VIP(http://www.hongkevip.com):Linux ...

  5. 手机页面关于头部固定定位与input出现的问题

    前些天写了一个页面,要求头部导航进行固定定位position:fixed.内容区域有输入框input.在大多数手机上都是显示正常的,可偏在一些低版本的手机却出现问题了. 把我给苦恼的不行. 问题:头部 ...

  6. Database name和SID

    http://docs.oracle.com/cd/B19306_01/install.102/b15667/rev_precon_db.htm#i1027493 The Oracle Databas ...

  7. Python函数篇(2)-递归函数、匿名函数及高阶函数

    1.全局变量和局部变量 一般定义在程序的最开始的变量称为函数变量,在子程序中定义的变量称为局部变量,可以简单的理解为,无缩进的为全局变量,有缩进的是局部变量,全局变量的作用域是整个程序,而局部变量的作 ...

  8. 【译】&period;NET Core 2&period;2 Preview 2 发布

    原文出自.Net Blog Announcing .NET Core 2.2 Preview 2 今天,我们宣布推出.NET Core 2.2 Preview 2.我们有很多重要改进要和你分享,而且我 ...

  9. EJB部署规则报错

    javax.naming.NameNotFoundException: Unable to resolve 'RuleDeployer'. Resolved ''; remaining name 'R ...

  10. MediaPlayer播放音频,也可以播放视频

    使用MediaPlayer播放音频或者视频的最简单例子: JAVA代码部分: public class MediaPlayerStudy extends Activity { private Butt ...