Spark中Task的执行内存是通过TaskMemoryManger统一管理的,不论是ShuffleMapTask还是ResultTask,Spark都会生成一个专用的TaskMemoryManger对象,然后通过TaskContext将TaskMemoryManger对象共享给该task attempt的所有memory consumers。 TaskMemoryManger自建了一套内存页管理机制,并统一对ON_HEAP和OFF_HEAP内存进行编址,分配和释放。
概述
TaskMemoryManger负责管理单个任务的堆外执行内存和堆内执行内存,是Tungsten内存管理机制的核心实现类。对于堆外内存,可以内存地址直接使用64位长整型地址寻址;对于堆内内存,内存地址由一个obj对象和一个offset对象组合起来表示,主要有以下三方面作用:
-
建立类似于操作系统内存页管理的机制,对ON_HEAP和OFF_HEAP内存统一编址和管理;
-
通过调用MemoryManager和MemoryAllocator,将逻辑内存的申请&释放与物理内存的分配&释放结合起来;
-
记录和管理task attempt的所有memory consumer。
那么页管理机制到底是如何设计的,堆内内存如何管理,如何避免堆内内存由于JVM的GC的存在引起的内存地址变化,数据在内存页中是如何寻址的?本文通过分析整体设计&实现细节来解决这些问题。
MemoryLocation
为了统一on-heap<JVM管理>和off-heap<自行管理>的执行内存,抽象出来一个MemoryLocation,包含了obj对象和offset属性,这个类可以用来内存寻址,具体的寻址如下:
Object obj
处于堆内内存模式时,数据作为对象存储在JVM的堆上,此时的obj不为空;处于堆外内存模式时,数据存储在JVM的堆外内存[操作系统内存]中,因而不会在JVM中存在对象,所以obj为NULL;long offset
offset属性主要用来定位数据,处于堆内内存模式时,首先从堆内找到对象MemoryBlock,然后使用offset定位数据的具体位置;处于堆外内存模式时,则直接使用offset从堆外内存中定位。
MemoryBlock
MemoryBlock继承于MemoryLocation,它代表的是一个Page对象,表示从obj和offset定位的起始位置开始的固定长度[length]的连续内存块。Page代表了具体的内存区域以及内存里面具体的数据,Page中的数据可能是On-heap的数据,也可能是Off-heap中的数据。
另外提供了通过已经分配的array创建MemoryBlock和以指定的字节填充整个MemoryBlock的方法。
/** 创建一个指向由长整型数组使用的内存的MemoryBlock */
public static MemoryBlock fromLongArray(final long[] array) {
return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
}
/** 以指定的字节填充整个MemoryBlock, 即将obj对象从offset开始,长度为length的堆内存替换为指定字节的值。
Platform中封装了对sun.misc.Unsafe的API调用,Platform的setMemory方法实际调用了sun.misc.Unsafe的setMemory<在给定的内存块中设置值> */
public void fill(byte value) {
Platform.setMemory(obj, offset, length, value);
}
Platform.setMemory(obj, offset, length, value);
在给定的内存块中设置值,这里是指在将obj对象从offset开始,长度为length的堆内存替换为指定字节的值Platform.LONG_ARRAY_OFFSET
是long array数组类型中,数组第一个元素相对数组的偏移。
MemoryAllocator
有了内存页的抽象,就需要给它分配内存页,是通过MemoryAllocator
实现的,该接口定义了allocate
和free
方法,等待具体类实现,目前有两个实现类HeapMemoryAllocator
和UnsafeMemoryAllocator
,分别负责堆内和堆外内存页分配。
HeapMemoryAllocator
HeapMemoryAllocator
是Tungsten内存管理中在堆内存模式下使用的内存分配器,与onHeapExecutionMemoryPool配合使用,主要负责分配堆内内存,其主要分配long型数组,最大分配内存为16GB。实现了内存页分配以及回收的方法,另外维护了一个MemoryBlock的弱引用[只有弱引用时候,如果GC运行, 那么这个对象就会被回收,不论当前的内存空间是否足够,这个对象都会被回收]的缓冲池,用于Page页[即MemoryBlock]的快速分配。
@GuardedBy("this")
private final Map<Long, LinkedList<WeakReference<long[]>>> bufferPoolsBySize = new HashMap<>();
// 池化阈值,只有在池化的MemoryBlock大于该值时,才需要被池化: 1M
private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
申请内存页
申请内存页的步骤如下:
- 申请一个内存页时,首先需要进行内存对齐<8字节对齐>得到对齐后的申请的内存大小,然后根据是否满足池化条件<大于1M>,进行不同操作;
- 如果满足池化条件,从缓存池中如果可以拿取到相同大小的内存,进行构建MemoryBlock;
- 不满足池化条件或者缓存池中没有同样大小的array,则HeapMemoryAllocator利用long数组[new long[size]]向JVM堆申请内存,通过在MemoryBlock对象中维护对long数组的引用,来防止JVM将long数组所占内存垃圾回收掉。
@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
/**
* MemoryBlock中以Long类型数组装载数据,所以需要对申请的大小进行转换,
* 由于申请的是字节数,因此先为其多分配7个字节,避免最终分配的字节数不够,除以8是按照Long类型由8个字节组成来计算的。
* 例如:申请字节数为50,理想情况应该分配56字节,即7个Long型数据。
* 如果直接除以8,会得到6,即6个Long型数据,导致只会分配48个字节,
* 但先加7后再除以8,即 (50 + 7) / 8 = 7个Long型数据,满足分配要求。
*/
int numWords = (int) ((size + 7) / 8);
long alignedSize = numWords * 8L; // 补齐后的字节数
assert (alignedSize >= size);
if (shouldPool(alignedSize)) { // 需要从池化中拿取
synchronized (this) {
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool != null) {
while (!pool.isEmpty()) {
// 取出池链头的MemoryBlock
final WeakReference<long[]> arrayReference = pool.pop();
final long[] array = arrayReference.get(); // 拿取array
if (array != null) {
// MemoryBlock的大小要比分配的大小大
assert (array.length * 8L >= size);
// 从MemoryBlock的缓存中获取指定大小的MemoryBlock并返回
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
return memory;
}
}
bufferPoolsBySize.remove(alignedSize);
}
}
}
/**
* 走到此处,说明满足以下任意一点:
* 1. 指定大小的MemoryBlock不需要采用池化机制。
* 2. bufferPoolsBySize中没有指定大小的MemoryBlock。
*
*/
long[] array = new long[numWords];
// 创建MemoryBlock并返回
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
return memory;
}
回收内存页
回收内存页步骤如下:
- 首先把要释放的内存数据使用free标志位覆盖,pageNumber置为占位的page number。
- 然后取出其内部的长整型数组赋值给临时变量,并且把base对象置为null,offset置为0。
- 取出的长整型数组计算其对齐大小,内存页的大小不一定等于数组的长度 * 8,此时的size是内存页的大小,需要进行对齐操作。对齐之后的内存页大小如果满足缓存池条件,则将其暂存缓存池,等待下次回收再用或者JVM的GC回收。
@Override
public void free(MemoryBlock memory) {
final long size = memory.size();
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
}
// Mark the page as freed (so we can detect double-frees).
memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
long[] array = (long[]) memory.obj;
memory.setObjAndOffset(null, 0);
long alignedSize = ((size + 7) / 8) * 8;
if (shouldPool(alignedSize)) {
// 将MemoryBlock的弱引用放入bufferPoolsBySize中
synchronized (this) {
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool == null) {
pool = new LinkedList<>();
bufferPoolsBySize.put(alignedSize, pool);
}
pool.add(new WeakReference<>(array));
}
} else {
// Do nothing
}
}
对于堆内内存上的数据,在发生GC时候,真实数据的内存地址会发生改变,Tungsten巧妙使用数组这种容器以及偏移量巧妙地将这个问题规避了,数据回收也可以使用缓存池机制来减少数组频繁初始化带来的开销。其内部使用虚引用来引用释放的数组,也不会导致无法回收导致内存泄漏。
UnsafeMemoryAllocator
UnsafeMemoryAllocator
是Tungsten在堆外内存模式下使用的内存分配器,与offHeap ExecutionMemoryPool配合使用,主要是通过java的Unsage API进行内存的分配和回收。
申请内存页
申请比较简单,直接调用Platform的allocateMemory方法,返回分配size大小内存的起始内存地址,然后创建MemoryBlock对象即可。
@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
long address = Platform.allocateMemory(size); // 直接调用unsafe的API进行获取地址,该方法将返回分配的内存地址。
MemoryBlock memory = new MemoryBlock(null, address, size); // 构建MemoryBlock
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
return memory;
}
回收内存页
回收内存只需要将pageNumber置为占位符,释放内存即可。
@Override
public void free(MemoryBlock memory) {
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
}
// 释放内存
Platform.freeMemory(memory.offset);
// As an additional layer of defense against use-after-free bugs, we mutate the
// MemoryBlock to reset its pointer.
memory.offset = 0;
// Mark the page as freed (so we can detect double-frees).
memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
}
TaskMemoryManager
首先我们看下TaskMemoryManager是何时被创建出来的,如下图所示,在Executor中当要启动一个Task时候,在TaskRunner里面会创建TaskMemoryManager,然后将其封装在TaskContext中,供后续使用。
逻辑内存管理
TaskMemoryManager用页表来管理内存,维护了一个MemoryBlock数组用于存放该TMM分配得到的pages[pageTable],逻辑地址用一个Long类型(64-bit)来表示,高13位来保存内存页数,低51位来保存这个页中的offset,使用page表来保存base对象,其在page表中的索引就是该内存的内存页数。页数最多有8192页,理论上允许索引 8192 * (2^31 -1)* 8 bytes,相当于140TB的数据。其中 2^31 -1 是整数的最大值,因为page表中记录索引的是一个long型数组,这个数组的最大长度是2^31 -1。实际上没有那么大。因为64位中除了用来设计页数和页内偏移量外还用于存放数据的分区信息。
/** 13位用来表示能存储的最大页数:8092 */
private static final int PAGE_NUMBER_BITS = 13;
/** 最大页数 8092*/
private static final int PAGE_TABLE_SIZE = 1 << PAGE_NUMBER_BITS;
/** 用于保存编码后的偏移量的位数。静态常量OFFSET_BITS的值为51。 */
static final int OFFSET_BITS = 64 - PAGE_NUMBER_BITS;
/** 具体的内存存储是由一个数组存储,数组最大是2^32-1,数组元素是long类型,所以是8字节;
所以得到最大的Page大小。静态常量MAXIMUM_PAGE_SIZE_BYTES的值为17179869176,即(2^32-1)× 8==17G。*/
public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L;
/** 长整型的低51位的位掩码。静态常量MASK_LONG_LOWER_51_BITS的值为2251799813685247 */
private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;
/* 维护一个Page表。pageTable实际为Page(即MemoryBlock)的数组,数组长度为PAGE_TABLE_SIZE。*/
private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE];
TaskMemoryManager提供了编解码的一些实用函数:
/** 用于根据给定的Page[即MemoryBlock]和Page中偏移量的地址,返回页号和相对于内存块起始地址的偏移量(64位长整型)。*/
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {
// 此时的参数offsetInPage是操作系统内存的绝对地址,
// offsetInPage与MemoryBlock的起始地址之差就是相对于起始地址的偏移量
offsetInPage -= page.getBaseOffset();
}
// 通过位运算将页号存储到64位长整型的高13位中,并将偏移量存储到64位长整型的低51位中,返回生成的64位的长整型。
return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
}
// 获取页号相对于内存块起始地址的偏移量,偏移量的后51位,合并上页号即为物理地址
public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
assert (pageNumber >= 0) : "encodePageNumberAndOffset called with invalid page";
return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);
}
// 用于解码页号,将64位的长整型右移51位(只剩下页号),然后转换为整型以获得Page的页号
public static int decodePageNumber(long pagePlusOffsetAddress) {
return (int) (pagePlusOffsetAddress >>> OFFSET_BITS);
}
// 解码偏移量,用于将64位的长整型与51位的掩码按位进行与运算,以获得在Page中的偏移量。
private static long decodeOffset(long pagePlusOffsetAddress) {
return (pagePlusOffsetAddress & MASK_LONG_LOWER_51_BITS);
}
内存申请
TaskMemoryManager提供了直接操控执行内存和通过页表操控内存的两种申请方式,直接从执行内存申请和释放内存是通过acquireExecutionMemory
和releaseExecutionMemory
进行的,而通过页表进行逻辑地址管理是通过allocatePage
和freePage
进行的。
对于一些没有用到Tungsten内存管理机制的memory consumer (比如都继承自Spillable抽象类的ExternalSorter和ExternalAppendOnlyMap),它们会通过MemoryConsumer的acquireMemory和freeMemory方法调用TaskMemoryManager的acquireExecutionMemory和releaseExecutionMemory进行物理内存的申请和释放,不会使用Tungsten的逻辑内存管理功能,也就是不会调用TaskMemoryManager的allocatePage和freePage方法;当然,也有很多memory consumer用到了TaskMemoryManager的物理内存管理功能,比如UnsafeShuffleWriter中的ShuffleExternalSorter.
申请内存
获取内存思路比较简单,首先先去MemoryManager中去申请执行内存,如果内存不够,则获取所有的MemoryConsumer,调用其spill方法将内存数据溢出到磁盘,直到释放内存空间满足申请的内存空间则停止spill操作,如果还不行只能spill当前consumer的一些执行内存到磁盘,最后返回能获取的不大于申请内存大小的内存。
/** 为内存消费者获得指定大小(单位为字节)的内存。 当Task没有足够的内存时,将调用MemoryConsumer的spill方法释放内存。 */
public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
assert(required >= 0);
assert(consumer != null);
MemoryMode mode = consumer.getMode();
synchronized (this) {
// 获取内存,可能涉及到借用或者收回借用给存储的内存等操作
long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);
if (got < required) { // 如果不足
// 内存不足以这次分配,需要获取当前的consumers,看哪个可以释放内存spill到磁盘来腾出空间
TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
for (MemoryConsumer c: consumers) {
// 前置条件:不能是当前发出申请的消费者,且已使用内存要大于0,同时内存模式与申请的一样
if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
long key = c.getUsed(); // 按照使用量进行排序
List<MemoryConsumer> list =
sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
list.add(c);
}
}
while (!sortedConsumers.isEmpty()) {
// 筛选使用量大于还需要借用的量的consumer
Map.Entry<Long, List<MemoryConsumer>> currentEntry =
sortedConsumers.ceilingEntry(required - got);
// 如果没有满足的就用最大使用量的consumer
if (currentEntry == null) {
currentEntry = sortedConsumers.lastEntry();
}
List<MemoryConsumer> cList = currentEntry.getValue();
MemoryConsumer c = cList.get(cList.size() - 1);
try {
long released = c.spill(required - got, consumer); // 尝试溢出最大使用内存的到磁盘
if (released > 0) {
logger.debug("Task {} released {} from {} for {}", taskAttemptId,
Utils.bytesToString(released), c, consumer);
// 释放写入到磁盘的大小,再次尝试获取内存
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
if (got >= required) { // 够本次申请的大小
break;
}
} else {
cList.remove(cList.size() - 1);
if (cList.isEmpty()) {
sortedConsumers.remove(currentEntry.getKey());
}
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
logger.error("error while calling spill() on " + c, e);
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
logger.error("error while calling spill() on " + c, e);
// checkstyle.off: RegexpSinglelineJava
throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
+ e.getMessage());
// checkstyle.on: RegexpSinglelineJava
}
}
}
/** 走到这里,说明已经对其他MemoryConsumer尝试了溢写操作了,
如果申请的内存还不够,那么只能让当前申请内存的MemoryConsumer尝试溢写以空闲一部分内存了 */
if (got < required) {
try {
long released = consumer.spill(required - got, consumer);
if (released > 0) {
logger.debug("Task {} released {} from itself ({})", taskAttemptId,
Utils.bytesToString(released), consumer);
// 做最后的尝试
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
logger.error("error while calling spill() on " + consumer, e);
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
logger.error("error while calling spill() on " + consumer, e);
// checkstyle.off: RegexpSinglelineJava
throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
+ e.getMessage());
// checkstyle.on: RegexpSinglelineJava
}
}
consumers.add(consumer);
logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
return got;
}
}
释放内存
释放内存比较简单直接委托给memoryManager进行释放就行。
public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
logger.debug("Task {} release {} from {}", taskAttemptId, Utils.bytesToString(size), consumer);
memoryManager.releaseExecutionMemory(size, taskAttemptId, consumer.getMode());
}
申请页
申请页需要如下步骤:
- 先申请满足size大小的内存空间,如果不满足,则无法进行分配页
- 申请到空闲的Page number号
- 通过MemoryAllocator进行实际的页分配,得到一个页MemoryBlock
- 将Page number赋给pageTable
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
assert(consumer != null);
assert(consumer.getMode() == tungstenMemoryMode);
if (size > MAXIMUM_PAGE_SIZE_BYTES) { // 不能超过最大页能承受的内存大小
throw new TooLargePageException(size);
}
// 获取执行内存<堆内还是堆外根据consumer的MemoryMode决定>
long acquired = acquireExecutionMemory(size, consumer);
if (acquired <= 0) { // 内存不足,无法分配页
return null;
}
final int pageNumber;
synchronized (this) {
// 申请到空闲的Page number号
pageNumber = allocatedPages.nextClearBit(0);
if (pageNumber >= PAGE_TABLE_SIZE) { // 页已经满了,释放内存
releaseExecutionMemory(acquired, consumer);
throw new IllegalStateException(
"Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
}
allocatedPages.set(pageNumber);
}
MemoryBlock page = null;
try {
// 分配页,构建MemoryBlock
page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
} catch (OutOfMemoryError e) {
logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
// there is no enough memory actually, it means the actual free memory is smaller than
// MemoryManager thought, we should keep the acquired memory.
synchronized (this) {
acquiredButNotUsed += acquired;
allocatedPages.clear(pageNumber);
}
// this could trigger spilling to free some pages.
return allocatePage(size, consumer);
}
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
}
return page;
}
释放页
释放页需要如下操作:
- 释放pageTable中的页号
- 通过MemoryManager释放内存页
- 释放执行内存
public void freePage(MemoryBlock page, MemoryConsumer consumer) {
pageTable[page.pageNumber] = null;
synchronized (this) { // 清除页号
allocatedPages.clear(page.pageNumber);
}
if (logger.isTraceEnabled()) {
logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
}
long pageSize = page.size();
// Clear the page number before passing the block to the MemoryAllocator's free().
// Doing this allows the MemoryAllocator to detect when a TaskMemoryManager-managed
// page has been inappropriately directly freed without calling TMM.freePage().
page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
memoryManager.tungstenMemoryAllocator().free(page); // 释放内存
releaseExecutionMemory(pageSize, consumer); // 释放执行内存
}
MemoryConsumer
最后看一下MemoryConsumer,是具体申请内存的消费方,在shuffle过程会经常用到,提供了数据落磁盘以及内存申请的接口,后面使用到了再详细分析。
每个task attempt都有对应的TaskMemoryManger对象,而一个task attempt可能有不止一个memory consumer需要消耗内存,所以TaskMemoryManger会维护当前task attempt的所有memory consumers,在TaskMemoryManger中定义了私有成员变量consumers,可以看到它就是一个MemoryConsumer类型的HashSet.
-
consumers在TaskMemoryManger构造函数中被初始化为一个空的HashSet,并且在每次acquireExecutionMemory方法调用的最后都会将申请逻辑内存的memory consumer添加到TMM的consumers中。
-
当某个memory consumer调用acquireExecutionMemory申请逻辑内存时遇到可用逻辑内存不足的情况,TaskMemoryManger首先会遍历consumers集合,生成一个以consumer占用的memory大小为key的TreeMap对象,也就是生成一颗以已用内存大小为key的红黑树。然后,在这个treeMap中查找占有目标内存大小的memory consumer,找到后依次调用这些consumer的spill方法,直到释放出足够的内存空间或treeMap遍历完毕。
总结
本篇文章主要剖析了Task在任务执行时内存的管理相关的内容,sort-based-Shuffle过程中,会频繁的使用基于内存的sorter,此时的sorter包含大量的数据,是需要内存管理,都是通过这里介绍的方法来进行统一管理的。