内存池是Spark内存的抽象,它记录了总内存大小,已使用内存大小,剩余内存大小,提供给MemoryManager进行分配/回收内存。它包括两个实现类:ExecutionMemoryPool和StorageMemoryPool,分别对应execution memory和storage memory。当需要新的内存时,spark通过memoryPool来判断内存是否充足。需要注意的是memoryPool以及子类方法只是用来标记内存使用情况,而不实际分配/回收内存。
内存池MemoryPool
MemoryPool管理的内存大小是构建在Executor的JVM上面的,由于一个Executor可能有多个Task(取决于分配的内存大小和core数目),所以需要一个锁来进行同步操作,这里的锁一般是MemoryManager对象,后续在分析MemoryManager时候会说到,源码如下所示:
// org.apache.spark.memory.MemoryPool
private[memory] abstract class MemoryPool(lock: Object) {
...
}
MemoryPool主要用来标记内存的使用情况,所以包含总的内存大小,已经使用的大小,增加或者缩减内存等操作。
/** 内存池的大小(单位为字节)。**/
private[this] var _poolSize: Long = 0
/** 获取已经使用的内存大小(单位为字节)。由子类实现。 **/
def memoryUsed: Long
/** 给内存池扩展delta给定的大小(单位为字节)。delta必须为正整数。 */
final def incrementPoolSize(delta: Long): Unit = lock.synchronized {
require(delta >= 0)
_poolSize += delta
}
/** 将内存池缩小delta给定的大小(单位为字节);delta必须为正整数且_poolSize与delta的差要大于等于memoryUsed[已经使用的内存不能从内存池中移除]。 */
final def decrementPoolSize(delta: Long): Unit = lock.synchronized {
require(delta >= 0)
require(delta <= _poolSize)
require(_poolSize - delta >= memoryUsed)
_poolSize -= delta
}
执行内存池ExecutionMemoryPool
概述
执行内存池属于计算引擎的一部分,它的内存只会分配给Task进行使用,主要用于Task中的Shuffle、Join、Aggregation等操作时候的内存提供。由于执行内存会由多个Task进行共享,所以为了保证Task合理地进行内存使用,避免某些Task过度使用内存导致其它的Task频繁将数据溢写到磁盘,拖垮整体执行速度,执行内存池需要保证在N个Task的情况下,每个Task所能分配到的内存在总内存的 1/2N~1/N 之间,由于Task数量是动态的,因此会跟踪所有激活的Task的数量以便重新计算 1/2N 和 1/N 的值。
源码实现中,ExecutionMemoryPool用一个HashMap来维护了一个TaskAttempt[身份标识为taskAttemptId]与所消费内存的大小之间的映射关系,很明显重写的memoryUsed方法是这个Map中所有value的和,另外给外界提供了可以获取某个taskAttempt目前使用内存的值。
// org.apache.spark.memory.ExecutionMemoryPool
/** 维护了一个TaskAttempt的身份标识(taskAttemptId)与所消费内存的大小之间的映射关系。 */
@GuardedBy("lock")
private val memoryForTask = new mutable.HashMap[Long, Long]()
/** 获取TaskAttempt使用的内存大小,即memoryForTask中taskAttemptId对应的value值。 */
def getMemoryUsageForTask(taskAttemptId: Long): Long = lock.synchronized {
memoryForTask.getOrElse(taskAttemptId, 0L)
}
/** 重写了memoryUsed方法: 所有TaskAttempt所消费的内存大小之和,即memoryForTask这个Map中所有value的和。**/
override def memoryUsed: Long = lock.synchronized {
memoryForTask.values.sum
}
分配内存
acquireMemory
为每个任务分配内存,返回实际分配的内存大小,如果当任务数量增多,而老任务已经占据大量内存时,新来的任务不能获取到至少1 / 2N的内存时,来保证每个任务都有机会获取到execution总内存的1 / 2N时候,会阻塞申请内存的任务直到其他Task释放内存唤醒当前线程,重新进行计算,尝试获取到足够的空闲内存。
acquireMemory
分配内存需要以下步骤:
如果该task之前没有进行过内存申请,则将其加入memoryForTask,内存大小为0,并且唤醒所有等待申请内存的线程。然后需要不断循环以下操作,直至申请到内存:
- 获取当前task的数量(numActiveTasks),以及当前待申请内存Task的已有内存(curMem);
- 执行内存增长策略maybeGrowPool,如果执行内存不足,在有一些MemoryManager比如UnifiedMemoryManager时候,会向存储内存借用或者回收执行内存挪用给存储内存的内存;
- 执行完内存增长策略后,调用computeMaxPollSize,计算释放存储内存后,执行内存池可用的最大大小(maxPoolSize);
- 计算每个Task可以申请的最大内存:maxPoolSize / numActiveTasks,表示内存增长策略后当前总内存的1/n;
- 计算每个Task可以申请的最小内存:poolSize / (2 * numActiveTasks),表示没有进行增长策略时候执行内存总大小的1/2n;
- 计算当前任务可以申请到最大的内存大小(maxToGrant):math.min(numBytes, math.max(0, maxMemoryPerTask - curMem)),表示不超过numBytes,不超过每个task可申请最大内存。保持在0 <= X <= 1 / numActiveTasks之间;
- 计算当前任务真正可以申请到的内存大小(toGrant):math.min(maxToGrant, memoryFree);
- 如果申请的内存小于待申请内存numBytes,且当前总的Task内存小于Task可以申请的最小内存,说明连Task执行的最基本内存要求都无法满足,则执行lock.wait进行线程等待,等待内存有释放再唤醒;否则,更新memoryForTask的当前Task内存大小为toGrant,并返回toGrant,退出循环。
源码如下所示:
/**
* 用于给taskAttemptId对应的TaskAttempt获取指定大小(即numBytes)的内存
*
* @param numBytes 分配的内存大小
* @param taskAttemptId 指定的TaskAttempt的ID
* @param maybeGrowPool 回调函数,用于处理潜在的内存池增长情况
* @param computeMaxPoolSize 用于限制本次分配的最大内存的回调函数,默认传入() => poolSize,即可分配所有内存。
* 传入回调函数的原因在于,不同的内存管理器对执行内存和存储内存的划分方式是不同的,
* 例如UnifiedMemoryManager可以通过挤压存储内存区域以扩大执行内存区域。
* @return 分配的内存大小
*/
private[memory] def acquireMemory(
numBytes: Long,
taskAttemptId: Long,
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => (),
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
if (!memoryForTask.contains(taskAttemptId)) { // 如果memoryForTask中还没有记录taskAttemptId
// 将taskAttemptId放入memoryForTask,初始状态taskAttemptId所消费的内存为0
memoryForTask(taskAttemptId) = 0L
// 新增了Task,需要唤醒其他等待获取ExecutionMemoryPool的锁的线程,重新计算Task的最大最小能申请到的内存值
lock.notifyAll()
}
// 由于不一定能申请到需要的内存,可能会由于没有足够多的内存需要wait,直到有Task进行释放内存操作或者新加入Task操作来进行唤醒重新计算,所以需要循环等待直到获取的内存在1/N~1/2N之间
while (true) {
// 获取当前激活的Task的数量
val numActiveTasks = memoryForTask.keys.size
// 获取当前TaskAttempt所消费的内存
val curMem = memoryForTask(taskAttemptId)
// 执行内存增长策略,有MemoryManager进行实现
// numBytes - memoryFree计算出不够分配的内存大小,然后尝试从其他内存池[StorageMemoryPool]回收或借用内存
maybeGrowPool(numBytes - memoryFree)
// 当前能获取到的最大内存,通过增长策略或者会增加内存
val maxPoolSize = computeMaxPoolSize()
/** 1/2N ~ 1/N */
// 计算每个TaskAttempt最大可以使用的内存大小,即 可用总内存大小 / 激活任务数量
val maxMemoryPerTask = maxPoolSize / numActiveTasks
// 计算每个TaskAttempt最小保证使用的内存大小,即 当前内存池大小 / (激活任务数量 * 2)
val minMemoryPerTask = poolSize / (2 * numActiveTasks)
// 理论上可以分配给当前任务的最大内存(min(申请内存数,任务可获得的内存数))
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
// 实际可以分配给任务的内存(min(可分配最大内存数,当前剩余内存数))
val toGrant = math.min(maxToGrant, memoryFree)
// toGrant < numBytes:表示可分配大小小于本次申请需要的大小;
// curMem + toGrant < minMemoryPerTask:表示该TaskAttempt申请的大小小于单个TaskAttempt可申请的最小大小
if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
lock.wait() // 没有足够多的内存分配,wait等待有Task变化时候进行notifyAll操作唤醒当前线程,唤醒重新计算while循环,还可能拿不到内存
} else {
memoryForTask(taskAttemptId) += toGrant // 分配到内存了
return toGrant
}
}
0L // Never reached
}
回收内存
回收内存思路比较简单,如果释放内存大于分配给当前Task已经分配的内存,那么需要释放的内存大小是当前Task所申请的内存大小,否则是指定的内存大小。然后更新记录Task占用内存的Map[memoryForTask],对该Task需要释放的内存大小进行收回,如果剩余内存为0,则将该Task移除。最后由于释放了内存,其他正在等待内存分配的Task或许可以申请到需要的内存大小,所以通过notiyAll唤醒等待申请内存的其他线程,进行acquireMemory中的申请尝试,源码如下所示:
/** 用于给taskAttemptId对应的TaskAttempt释放指定大小(即numBytes)的内存。 */
def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized {
val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
val memoryToFree = if (curMem < numBytes) { // 释放内存大于分配给它的内存,只释放可释放的内存大小
logWarning(
s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " +
s"of memory from the $poolName pool")
curMem
} else { // 可释放内存大小大于指定释放的内存
numBytes
}
if (memoryForTask.contains(taskAttemptId)) {
memoryForTask(taskAttemptId) -= memoryToFree
if (memoryForTask(taskAttemptId) <= 0) {
// 如果taskAttemptId代表的TaskAttempt占用的内存大小小于等于零,
// 还需要将taskAttemptId与所消费内存的映射关系从memoryForTask中清除。
memoryForTask.remove(taskAttemptId)
}
}
// 释放了内存,可以唤醒其他等待内存分配的线程Task进行重新计算和申请内存
lock.notifyAll()
}
/** 用于释放taskAttemptId对应的TaskAttempt所消费的所有内存。 */
def releaseAllMemoryForTask(taskAttemptId: Long): Long = lock.synchronized {
val numBytesToFree = getMemoryUsageForTask(taskAttemptId)
releaseMemory(numBytesToFree, taskAttemptId) // 进行释放
numBytesToFree // 返回释放的内存大小
}
存储内存池StorageMemoryPool
概述
存储内存池主要用于RDD的缓存,广播以及备份中。不像执行内存池需要维护每个Task的内存占用情况,存储内存池只提供了一个_memoryUsed的变量来进行当前内存的使用情况,源码如下所示,另外可以看到StorageMemoryPool还维护了一个memoryStore,这个是用来将数据块保存到申请的storage内存中,并提供了从内存/磁盘获取保存的数据的方法,我们会在后续文章中进行分析。
// 已经使用的内存大小(单位为字节)。
@GuardedBy("lock")
private[this] var _memoryUsed: Long = 0L
// 重写内存占用值函数,返回了_memoryUsed属性的值
override def memoryUsed: Long = lock.synchronized {
_memoryUsed
}
// 当前StorageMemoryPool所关联的MemoryStore,存储空间的实际使用是由MemoryStore来进行控制的
private var _memoryStore: MemoryStore = _
// 返回了_memoryStore属性引用的MemoryStore
def memoryStore: MemoryStore = {
if (_memoryStore == null) {
throw new IllegalStateException("memory store not initialized yet")
}
_memoryStore
}
/** 设置当前StorageMemoryPool所关联的MemoryStore,实际设置了_memoryStore属性。 */
final def setMemoryStore(store: MemoryStore): Unit = {
_memoryStore = store
}
分配内存
acquireMemory
用于给指定的BlockId对应的Block获取指定大小的内存,如果存储内存池内存不足,那么需要通过memoryStore.evictBlocksToFreeSpace进行内存释放,释放掉其他Block占用的内存;否则不需要释放内存,直接申请即可。然后判断释放内存后的free内存是否比待申请内存大,如果满足条件,则进行内存分配,否则不进行内存分配,返回false,告知失败。
/** 用于给BlockId对应的Block获取numBytes指定大小的内存。 */
def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
val numBytesToFree = math.max(0, numBytes - memoryFree)
acquireMemory(blockId, numBytes, numBytesToFree)
}
/**
* @param blockId 申请内存的BlockId
* @param numBytesToAcquire 申请的内存大小
* @param numBytesToFree 本次申请需要额外空出来的内存大小
* @return 所需要的内存是否申请成功了
*/
def acquireMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long): Boolean = lock.synchronized {
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
if (numBytesToFree > 0) { // 如果需要腾出额外的内存大小,则腾出numBytesToFree属性指定大小的空间
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
}
// 判断可用内存是否充足
val enoughMemory = numBytesToAcquire <= memoryFree
if (enoughMemory) { // 可用内存充足,增加已经使用的内存大小
_memoryUsed += numBytesToAcquire
}
enoughMemory
}
回收内存
存储内存池的释放非常简单,直接将_memoryUsed=0即可,如果需要释放指定大小内存,那么只要减少一部分即可。
def releaseMemory(size: Long): Unit = lock.synchronized {
if (size > _memoryUsed) { // 释放的大小大于已使用大小,则释放当前内存池的所有内存,即将_memoryUsed设置为0。
logWarning(s"Attempted to release $size bytes of storage " +
s"memory when we only have ${_memoryUsed} bytes")
_memoryUsed = 0
} else { // 否则从已使用内存大小中减去释放的大小
_memoryUsed -= size
}
}
def releaseAllMemory(): Unit = lock.synchronized {
_memoryUsed = 0
}
缩减内存池大小
另外存储内存池还提供了一个缩减内存池大小的函数,作为Spark的ExecutionMemoryPool和StorageMemoryPool动态调整大小的辅助函数。首先判断StorageMemoryPool是否有足够的空间可以释放,如果剩余空间不足需要释放的空间,则通过memoryStore的evictBlocksToFreeSpace来释放其他Block占用的空间,最后返回StorageMemoryPool可释放空间大小。
/** 用于释放指定大小的空间,缩小内存池的大小。*/
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
// 计算当前可释放的最大内存大小
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) { // 如果可释放的内存不满足要求释放的大小,需要尝试腾出一些内存
val spaceFreedByEviction =
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
// 返回最终释放的大小
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else { // 可释放的内存足够,直接返回释放的大小即可
spaceFreedByReleasingUnusedMemory
}
}
总结
相信通过上述的分析,我们已经掌握了存储内存池和执行内存池的精髓,最后我们来对比下这两个内存池异同:
- 两者都是继承与MemoryPool,通过一个变量poolSize来记录内存池所拥有的的最大内存量;
- 执行内存池由于要记录各个Task的内存占用情况,所以它是通过一个Map来进行各个TaskAttempt的内存的记录;而存储内存池只需要知道整个内存池的占用情况,所以只需要一个变量来控制当前已经占用的内存大小;
- 分配内存时候执行内存池要保证每个Task获取的内存大小在[1/2N,1/N]之间,所以可能会阻塞当前Task直到有相应的内存分配为止,而且还可能会有一些内存增长策略<从存储内存中占用或者收回内存操作>;存储内存池在分配时候如果目前内存不足,则会通过memoryStore来进行额外的需要内存的腾出,如果腾出后还不满足直接返回无法满足;
- 回收内存两个比较类似,都是释放当前已经使用的内存和指定释放内存的最小值。
- 存储内存池另外提供了缩减当前内存池大小的功能,方便执行内存池从存储内存池进行占用内存。