flink内存管理(三):MemorySegment内存使用场景:托管内存与网络内存-一.ManagedMemory(算子)内存的申请与使用

时间:2024-01-25 22:29:19

1. tm内存申请与使用大致流程

Task使用的物理计算资源主要是TaskSlot提供的,TaskSlot由TaskManager中TaskSlotTable组件创建和管理。

  • 创建MemoryManager:JobManager申请到足够的Slot计算资源后,会在TaskSlotTable中创建相应的TaskSlot,然后对TaskSlot基本环境进行初始化,包括在TaskSlot内部创建MemoryManager组件。最终使用MemoryManager管理当前TaskSlot的内存计算资源。
  • task线程使用内存:当Task线程启动时,会直接从TaskSlot中获取MemoryManager组件申请内存空间。通过MemoryManager对MemorySegment内存空间进行管理,这一步对应内存模型中的ManagedMemory,也被称为托管内存。

 

2. 创建MemoryManager实例

在TaskSlot的构造器中调用createMemoryManager()方法创建MemoryManager实例,管理当前TaskSlot(代表一个线程的资源) 中的内存空间

/**
创建具有**给定**容量和给定页面大小的内存管理器。
这是 MemoryManager 的生产版本,一旦 MemoryManager 
的所有者准备好处置,它就会检查内存泄漏 ( verifyEmpty() )。

参数:
memorySize – 该内存管理器管理的堆外内存的总大小。 
pageSize – 内存管理器分配的页面大小。
**/
private static MemoryManager createMemoryManager(  
        ResourceProfile resourceProfile, int pageSize) {  
    return MemoryManager.create(resourceProfile.getManagedMemory().getBytes(), pageSize);  
}

在TaskSlot.createMemoryManager()方法中,会根据ResourceProfile参数获取内存空间大小,默认设置为非堆ing。其中pageSize参数就是MemorySegment的大小,如下代码默认为32kb。

TaskManagerOptions.
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)  
public static final ConfigOption<MemorySize> MEMORY_SEGMENT_SIZE =  
        key("taskmanager.memory.segment-size")  
                .memoryType()  
                .defaultValue(MemorySize.parse("32kb"))  
                .withDescription(  
                        "Size of memory buffers used by the network stack and the memory manager.");
                    

 

3. 算子使用通过MemoryManager使用内存

MemoryManager创建完毕后,会通过TaskSlot将MemoryManager对象传递给Task,此时Task会通过将MemoryManager封装在Environment变量中,然后传递给算子
算子接收到MemoryManager对象后,通过MemoryManager动态申请内存空间,最终用于算子的具体计算过程。

需要注意的是:并不是所有的算子都会使用MemoryManager申请内存空间,这个步骤主要针对批计算类型的算子,例如HashJoinOperator、SortMergeJoinOperator和SortOperator等,这些算子往往需要借助非常大的内存空间进行数据的排序等操作。

 

4. ManagedMemory内存空间申请流程

申请ManagedMemory内存空间,是调用MemoryManager.allocatePages()方法执行的,见如下逻辑。

  • 1)从AllocationRequest参数中获取MemorySegment的空集合、申请Pages总数量以及资源Owner(与内存关联的所有者:slot?还是算子?)等参数,并对参数进行非空和状态检查;
  • 2)计算申请内存大小,并预留出内存空间;
  • 3)根据page数、pageCleanup、owner等,开始分配内存,将内存以MemorySegment为单位,并维护一个set集合,最终返回给算子使用。
/**
从此内存管理器分配一组内存段。
分配的总内存不会超过构造函数中声明的大小限制。
参数:
owner – 与内存段关联的所有者,用于后备释放。 
target – 将分配的内存页放入其中的列表。 numberOfPages – 要分配的页数。
**/
public void allocatePages(Object owner, Collection<MemorySegment> target, int numberOfPages)  
        throws MemoryAllocationException {  
    // sanity check  
    Preconditions.checkNotNull(owner, "The memory owner must not be null.");  
    Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");  
    Preconditions.checkArgument(  
            numberOfPages <= totalNumberOfPages,  
            "Cannot allocate more segments %s than the max number %s",  
            numberOfPages,  
            totalNumberOfPages);  
  
    // reserve array space, if applicable  
    if (target instanceof ArrayList) {  
        ((ArrayList<MemorySegment>) target).ensureCapacity(numberOfPages);  
    }  
    //计算申请内存大小,并预留空间(以免申请过程中被用掉)
    long memoryToReserve = numberOfPages * pageSize;  
    try {  
        memoryBudget.reserveMemory(memoryToReserve);  
    } catch (MemoryReservationException e) {  
        throw new MemoryAllocationException(  
                String.format("Could not allocate %d pages", numberOfPages), e);  
    }  
    //创建pageCleanup方法用于清理unsafe内存
    Runnable pageCleanup = this::releasePage;  
    allocatedSegments.compute(  
            owner,  
            (o, currentSegmentsForOwner) -> {  
                Set<MemorySegment> segmentsForOwner =  
                        currentSegmentsForOwner == null  
                                ? new HashSet<>(numberOfPages)  
                                : currentSegmentsForOwner;  
                for (long i = numberOfPages; i > 0; i--) {  
                 //分配内存
                    MemorySegment segment =  
                            allocateOffHeapUnsafeMemory(getPageSize(), owner, pageCleanup);  
                    target.add(segment);  
                    segmentsForOwner.add(segment);  
                }  
                return segmentsForOwner;  
            });  
  
    Preconditions.checkState(!isShutDown, "Memory manager has been concurrently shut down.");  
}					

如下如下算子会申请内存使用:
在这里插入图片描述