当先锋百科网

首页 1 2 3 4 5 6 7

Spark中Task的执行内存是通过TaskMemoryManger统一管理的,不论是ShuffleMapTask还是ResultTask,Spark都会生成一个专用的TaskMemoryManger对象,然后通过TaskContext将TaskMemoryManger对象共享给该task attempt的所有memory consumers。 TaskMemoryManger自建了一套内存页管理机制,并统一对ON_HEAP和OFF_HEAP内存进行编址,分配和释放。

概述

TaskMemoryManger负责管理单个任务的堆外执行内存和堆内执行内存,是Tungsten内存管理机制的核心实现类。对于堆外内存,可以内存地址直接使用64位长整型地址寻址;对于堆内内存,内存地址由一个obj对象和一个offset对象组合起来表示,主要有以下三方面作用:

  1. 建立类似于操作系统内存页管理的机制,对ON_HEAP和OFF_HEAP内存统一编址和管理;

  2. 通过调用MemoryManager和MemoryAllocator,将逻辑内存的申请&释放与物理内存的分配&释放结合起来;

  3. 记录和管理task attempt的所有memory consumer。

那么页管理机制到底是如何设计的,堆内内存如何管理,如何避免堆内内存由于JVM的GC的存在引起的内存地址变化,数据在内存页中是如何寻址的?本文通过分析整体设计&实现细节来解决这些问题。

MemoryLocation

为了统一on-heap<JVM管理>和off-heap<自行管理>的执行内存,抽象出来一个MemoryLocation,包含了obj对象和offset属性,这个类可以用来内存寻址,具体的寻址如下:

  1. Object obj处于堆内内存模式时,数据作为对象存储在JVM的堆上,此时的obj不为空;处于堆外内存模式时,数据存储在JVM的堆外内存[操作系统内存]中,因而不会在JVM中存在对象,所以obj为NULL;
  2. long offsetoffset属性主要用来定位数据,处于堆内内存模式时,首先从堆内找到对象MemoryBlock,然后使用offset定位数据的具体位置;处于堆外内存模式时,则直接使用offset从堆外内存中定位。

MemoryBlock

MemoryBlock继承于MemoryLocation,它代表的是一个Page对象,表示从obj和offset定位的起始位置开始的固定长度[length]的连续内存块Page代表了具体的内存区域以及内存里面具体的数据,Page中的数据可能是On-heap的数据,也可能是Off-heap中的数据。

另外提供了通过已经分配的array创建MemoryBlock和以指定的字节填充整个MemoryBlock的方法。

/**  创建一个指向由长整型数组使用的内存的MemoryBlock */
public static MemoryBlock fromLongArray(final long[] array) {
  return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
}

/** 以指定的字节填充整个MemoryBlock, 即将obj对象从offset开始,长度为length的堆内存替换为指定字节的值。
   Platform中封装了对sun.misc.Unsafe的API调用,Platform的setMemory方法实际调用了sun.misc.Unsafe的setMemory<在给定的内存块中设置值> */
public void fill(byte value) {
  Platform.setMemory(obj, offset, length, value);
}
  1. Platform.setMemory(obj, offset, length, value);在给定的内存块中设置值,这里是指在将obj对象从offset开始,长度为length的堆内存替换为指定字节的值
  2. Platform.LONG_ARRAY_OFFSET是long array数组类型中,数组第一个元素相对数组的偏移。

MemoryAllocator

有了内存页的抽象,就需要给它分配内存页,是通过MemoryAllocator实现的,该接口定义了allocatefree方法,等待具体类实现,目前有两个实现类HeapMemoryAllocatorUnsafeMemoryAllocator,分别负责堆内和堆外内存页分配。

HeapMemoryAllocator

HeapMemoryAllocator是Tungsten内存管理中在堆内存模式下使用的内存分配器,与onHeapExecutionMemoryPool配合使用,主要负责分配堆内内存,其主要分配long型数组,最大分配内存为16GB。实现了内存页分配以及回收的方法,另外维护了一个MemoryBlock的弱引用[只有弱引用时候,如果GC运行, 那么这个对象就会被回收,不论当前的内存空间是否足够,这个对象都会被回收]的缓冲池,用于Page页[即MemoryBlock]的快速分配。

@GuardedBy("this")
private final Map<Long, LinkedList<WeakReference<long[]>>> bufferPoolsBySize = new HashMap<>();

// 池化阈值,只有在池化的MemoryBlock大于该值时,才需要被池化: 1M
private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;

申请内存页

申请内存页的步骤如下:

  1. 申请一个内存页时,首先需要进行内存对齐<8字节对齐>得到对齐后的申请的内存大小,然后根据是否满足池化条件<大于1M>,进行不同操作;
  2. 如果满足池化条件,从缓存池中如果可以拿取到相同大小的内存,进行构建MemoryBlock;
  3. 不满足池化条件或者缓存池中没有同样大小的array,则HeapMemoryAllocator利用long数组[new long[size]]向JVM堆申请内存,通过在MemoryBlock对象中维护对long数组的引用,来防止JVM将long数组所占内存垃圾回收掉。
@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
  /**
     * MemoryBlock中以Long类型数组装载数据,所以需要对申请的大小进行转换,
     *  由于申请的是字节数,因此先为其多分配7个字节,避免最终分配的字节数不够,除以8是按照Long类型由8个字节组成来计算的。
     *  例如:申请字节数为50,理想情况应该分配56字节,即7个Long型数据。
     *  如果直接除以8,会得到6,即6个Long型数据,导致只会分配48个字节,
     *  但先加7后再除以8,即 (50 + 7) / 8 = 7个Long型数据,满足分配要求。
     */
  int numWords = (int) ((size + 7) / 8);
  long alignedSize = numWords * 8L; // 补齐后的字节数
  assert (alignedSize >= size);
  if (shouldPool(alignedSize)) { // 需要从池化中拿取
    synchronized (this) {
      final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
      if (pool != null) {
        while (!pool.isEmpty()) {
          // 取出池链头的MemoryBlock
          final WeakReference<long[]> arrayReference = pool.pop();
          final long[] array = arrayReference.get(); // 拿取array
          if (array != null) {
            // MemoryBlock的大小要比分配的大小大
            assert (array.length * 8L >= size);
            // 从MemoryBlock的缓存中获取指定大小的MemoryBlock并返回
            MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
            if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
              memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
            }
            return memory;
          }
        }
        bufferPoolsBySize.remove(alignedSize);
      }
    }
  }
  /**
     *  走到此处,说明满足以下任意一点:
     *   1. 指定大小的MemoryBlock不需要采用池化机制。
     *   2. bufferPoolsBySize中没有指定大小的MemoryBlock。
     *
     */
  long[] array = new long[numWords];
  // 创建MemoryBlock并返回
  MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
  if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
    memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
  }
  return memory;
}

回收内存页

回收内存页步骤如下:

  1. 首先把要释放的内存数据使用free标志位覆盖,pageNumber置为占位的page number。
  2. 然后取出其内部的长整型数组赋值给临时变量,并且把base对象置为null,offset置为0。
  3. 取出的长整型数组计算其对齐大小,内存页的大小不一定等于数组的长度 * 8,此时的size是内存页的大小,需要进行对齐操作。对齐之后的内存页大小如果满足缓存池条件,则将其暂存缓存池,等待下次回收再用或者JVM的GC回收。
@Override
public void free(MemoryBlock memory) {
  final long size = memory.size();
  if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
    memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
  }

  // Mark the page as freed (so we can detect double-frees).
  memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;

  long[] array = (long[]) memory.obj;
  memory.setObjAndOffset(null, 0);

  long alignedSize = ((size + 7) / 8) * 8;
  if (shouldPool(alignedSize)) {
    // 将MemoryBlock的弱引用放入bufferPoolsBySize中
    synchronized (this) {
      LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
      if (pool == null) {
        pool = new LinkedList<>();
        bufferPoolsBySize.put(alignedSize, pool);
      }
      pool.add(new WeakReference<>(array));
    }
  } else {
    // Do nothing
  }
}

对于堆内内存上的数据,在发生GC时候,真实数据的内存地址会发生改变,Tungsten巧妙使用数组这种容器以及偏移量巧妙地将这个问题规避了,数据回收也可以使用缓存池机制来减少数组频繁初始化带来的开销。其内部使用虚引用来引用释放的数组,也不会导致无法回收导致内存泄漏。

UnsafeMemoryAllocator

UnsafeMemoryAllocator是Tungsten在堆外内存模式下使用的内存分配器,与offHeap ExecutionMemoryPool配合使用,主要是通过java的Unsage API进行内存的分配和回收。

申请内存页

申请比较简单,直接调用Platform的allocateMemory方法,返回分配size大小内存的起始内存地址,然后创建MemoryBlock对象即可。

@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
  long address = Platform.allocateMemory(size); // 直接调用unsafe的API进行获取地址,该方法将返回分配的内存地址。
  MemoryBlock memory = new MemoryBlock(null, address, size); // 构建MemoryBlock
  if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
    memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
  }
  return memory;
}

回收内存页

回收内存只需要将pageNumber置为占位符,释放内存即可。

 @Override
public void free(MemoryBlock memory) {
  if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
    memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
  }
  // 释放内存
  Platform.freeMemory(memory.offset);
  // As an additional layer of defense against use-after-free bugs, we mutate the
  // MemoryBlock to reset its pointer.
  memory.offset = 0;
  // Mark the page as freed (so we can detect double-frees).
  memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
}

TaskMemoryManager

首先我们看下TaskMemoryManager是何时被创建出来的,如下图所示,在Executor中当要启动一个Task时候,在TaskRunner里面会创建TaskMemoryManager,然后将其封装在TaskContext中,供后续使用。
在这里插入图片描述

逻辑内存管理

TaskMemoryManager用页表来管理内存,维护了一个MemoryBlock数组用于存放该TMM分配得到的pages[pageTable],逻辑地址用一个Long类型(64-bit)来表示,高13位来保存内存页数,低51位来保存这个页中的offset,使用page表来保存base对象,其在page表中的索引就是该内存的内存页数。页数最多有8192页,理论上允许索引 8192 * (2^31 -1)* 8 bytes,相当于140TB的数据。其中 2^31 -1 是整数的最大值,因为page表中记录索引的是一个long型数组,这个数组的最大长度是2^31 -1。实际上没有那么大。因为64位中除了用来设计页数和页内偏移量外还用于存放数据的分区信息。

/** 13位用来表示能存储的最大页数:8092 */
private static final int PAGE_NUMBER_BITS = 13;

/** 最大页数 8092*/
private static final int PAGE_TABLE_SIZE = 1 << PAGE_NUMBER_BITS;

/** 用于保存编码后的偏移量的位数。静态常量OFFSET_BITS的值为51。 */
static final int OFFSET_BITS = 64 - PAGE_NUMBER_BITS; 

/** 具体的内存存储是由一个数组存储,数组最大是2^32-1,数组元素是long类型,所以是8字节;
    所以得到最大的Page大小。静态常量MAXIMUM_PAGE_SIZE_BYTES的值为17179869176,即(2^32-1)× 8==17G。*/
public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L;

/** 长整型的低51位的位掩码。静态常量MASK_LONG_LOWER_51_BITS的值为2251799813685247 */
private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;

/* 维护一个Page表。pageTable实际为Page(即MemoryBlock)的数组,数组长度为PAGE_TABLE_SIZE。*/
private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE];

TaskMemoryManager提供了编解码的一些实用函数:

/**  用于根据给定的Page[即MemoryBlock]和Page中偏移量的地址,返回页号和相对于内存块起始地址的偏移量(64位长整型)。*/
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
  if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {
    // 此时的参数offsetInPage是操作系统内存的绝对地址,
    // offsetInPage与MemoryBlock的起始地址之差就是相对于起始地址的偏移量
    offsetInPage -= page.getBaseOffset();
  }
  // 通过位运算将页号存储到64位长整型的高13位中,并将偏移量存储到64位长整型的低51位中,返回生成的64位的长整型。
  return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
}

// 获取页号相对于内存块起始地址的偏移量,偏移量的后51位,合并上页号即为物理地址
public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
  assert (pageNumber >= 0) : "encodePageNumberAndOffset called with invalid page";
  return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);
}

// 用于解码页号,将64位的长整型右移51位(只剩下页号),然后转换为整型以获得Page的页号
public static int decodePageNumber(long pagePlusOffsetAddress) {
  return (int) (pagePlusOffsetAddress >>> OFFSET_BITS);
}

// 解码偏移量,用于将64位的长整型与51位的掩码按位进行与运算,以获得在Page中的偏移量。
private static long decodeOffset(long pagePlusOffsetAddress) {
  return (pagePlusOffsetAddress & MASK_LONG_LOWER_51_BITS);
}

内存申请

TaskMemoryManager提供了直接操控执行内存和通过页表操控内存的两种申请方式,直接从执行内存申请和释放内存是通过acquireExecutionMemoryreleaseExecutionMemory进行的,而通过页表进行逻辑地址管理是通过allocatePagefreePage进行的。

对于一些没有用到Tungsten内存管理机制的memory consumer (比如都继承自Spillable抽象类的ExternalSorter和ExternalAppendOnlyMap),它们会通过MemoryConsumer的acquireMemory和freeMemory方法调用TaskMemoryManager的acquireExecutionMemory和releaseExecutionMemory进行物理内存的申请和释放,不会使用Tungsten的逻辑内存管理功能,也就是不会调用TaskMemoryManager的allocatePage和freePage方法;当然,也有很多memory consumer用到了TaskMemoryManager的物理内存管理功能,比如UnsafeShuffleWriter中的ShuffleExternalSorter.

申请内存

获取内存思路比较简单,首先先去MemoryManager中去申请执行内存,如果内存不够,则获取所有的MemoryConsumer,调用其spill方法将内存数据溢出到磁盘,直到释放内存空间满足申请的内存空间则停止spill操作,如果还不行只能spill当前consumer的一些执行内存到磁盘,最后返回能获取的不大于申请内存大小的内存。

/** 为内存消费者获得指定大小(单位为字节)的内存。 当Task没有足够的内存时,将调用MemoryConsumer的spill方法释放内存。  */
public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
  assert(required >= 0);
  assert(consumer != null);
  MemoryMode mode = consumer.getMode();
  synchronized (this) {
    // 获取内存,可能涉及到借用或者收回借用给存储的内存等操作
    long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);
    
    if (got < required) { // 如果不足
      // 内存不足以这次分配,需要获取当前的consumers,看哪个可以释放内存spill到磁盘来腾出空间
      TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
      for (MemoryConsumer c: consumers) {
        // 前置条件:不能是当前发出申请的消费者,且已使用内存要大于0,同时内存模式与申请的一样
        if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
          long key = c.getUsed(); // 按照使用量进行排序
          List<MemoryConsumer> list =
          sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
          list.add(c);
        }
      }
      while (!sortedConsumers.isEmpty()) {
        // 筛选使用量大于还需要借用的量的consumer
        Map.Entry<Long, List<MemoryConsumer>> currentEntry =
        sortedConsumers.ceilingEntry(required - got);
        // 如果没有满足的就用最大使用量的consumer
        if (currentEntry == null) {
          currentEntry = sortedConsumers.lastEntry();
        }
        List<MemoryConsumer> cList = currentEntry.getValue();
        MemoryConsumer c = cList.get(cList.size() - 1);
        try {
          long released = c.spill(required - got, consumer); // 尝试溢出最大使用内存的到磁盘
          if (released > 0) {
            logger.debug("Task {} released {} from {} for {}", taskAttemptId,
                         Utils.bytesToString(released), c, consumer);
            // 释放写入到磁盘的大小,再次尝试获取内存
            got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
            if (got >= required) { // 够本次申请的大小
              break;
            }
          } else {
            cList.remove(cList.size() - 1);
            if (cList.isEmpty()) {
              sortedConsumers.remove(currentEntry.getKey());
            }
          }
        } catch (ClosedByInterruptException e) {
          // This called by user to kill a task (e.g: speculative task).
          logger.error("error while calling spill() on " + c, e);
          throw new RuntimeException(e.getMessage());
        } catch (IOException e) {
          logger.error("error while calling spill() on " + c, e);
          // checkstyle.off: RegexpSinglelineJava
          throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
                                          + e.getMessage());
          // checkstyle.on: RegexpSinglelineJava
        }
      }
    }
 
    /**  走到这里,说明已经对其他MemoryConsumer尝试了溢写操作了, 
    如果申请的内存还不够,那么只能让当前申请内存的MemoryConsumer尝试溢写以空闲一部分内存了   */
    if (got < required) {
      try {
        long released = consumer.spill(required - got, consumer);
        if (released > 0) {
          logger.debug("Task {} released {} from itself ({})", taskAttemptId,
                       Utils.bytesToString(released), consumer);
          // 做最后的尝试
          got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
        }
      } catch (ClosedByInterruptException e) {
        // This called by user to kill a task (e.g: speculative task).
        logger.error("error while calling spill() on " + consumer, e);
        throw new RuntimeException(e.getMessage());
      } catch (IOException e) {
        logger.error("error while calling spill() on " + consumer, e);
        // checkstyle.off: RegexpSinglelineJava
        throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
                                        + e.getMessage());
        // checkstyle.on: RegexpSinglelineJava
      }
    }

    consumers.add(consumer);
    logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
    return got;
  }
}

释放内存

释放内存比较简单直接委托给memoryManager进行释放就行。

public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
  logger.debug("Task {} release {} from {}", taskAttemptId, Utils.bytesToString(size), consumer);
  memoryManager.releaseExecutionMemory(size, taskAttemptId, consumer.getMode());
}

申请页

申请页需要如下步骤:

  1. 先申请满足size大小的内存空间,如果不满足,则无法进行分配页
  2. 申请到空闲的Page number号
  3. 通过MemoryAllocator进行实际的页分配,得到一个页MemoryBlock
  4. 将Page number赋给pageTable
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
  assert(consumer != null);
  assert(consumer.getMode() == tungstenMemoryMode);
  if (size > MAXIMUM_PAGE_SIZE_BYTES) { // 不能超过最大页能承受的内存大小
    throw new TooLargePageException(size);
  }

  // 获取执行内存<堆内还是堆外根据consumer的MemoryMode决定>
  long acquired = acquireExecutionMemory(size, consumer);
  if (acquired <= 0) { // 内存不足,无法分配页
    return null;
  }

  final int pageNumber;
  synchronized (this) {
    // 申请到空闲的Page number号
    pageNumber = allocatedPages.nextClearBit(0);
    if (pageNumber >= PAGE_TABLE_SIZE) { // 页已经满了,释放内存
      releaseExecutionMemory(acquired, consumer);
      throw new IllegalStateException(
        "Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
    }
    allocatedPages.set(pageNumber);
  }
  MemoryBlock page = null;
  try {
    // 分配页,构建MemoryBlock
    page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
  } catch (OutOfMemoryError e) {
    logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
    // there is no enough memory actually, it means the actual free memory is smaller than
    // MemoryManager thought, we should keep the acquired memory.
    synchronized (this) {
      acquiredButNotUsed += acquired;
      allocatedPages.clear(pageNumber);
    }
    // this could trigger spilling to free some pages.
    return allocatePage(size, consumer);
  }
  page.pageNumber = pageNumber;
  pageTable[pageNumber] = page;
  if (logger.isTraceEnabled()) {
    logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
  }
  return page;
}

释放页

释放页需要如下操作:

  1. 释放pageTable中的页号
  2. 通过MemoryManager释放内存页
  3. 释放执行内存
public void freePage(MemoryBlock page, MemoryConsumer consumer) {
  pageTable[page.pageNumber] = null;
  synchronized (this) { // 清除页号
    allocatedPages.clear(page.pageNumber);
  }
  if (logger.isTraceEnabled()) {
    logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
  }
  long pageSize = page.size();
  // Clear the page number before passing the block to the MemoryAllocator's free().
  // Doing this allows the MemoryAllocator to detect when a TaskMemoryManager-managed
  // page has been inappropriately directly freed without calling TMM.freePage().
  page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
  memoryManager.tungstenMemoryAllocator().free(page); // 释放内存
  releaseExecutionMemory(pageSize, consumer); // 释放执行内存
}

MemoryConsumer

最后看一下MemoryConsumer,是具体申请内存的消费方,在shuffle过程会经常用到,提供了数据落磁盘以及内存申请的接口,后面使用到了再详细分析。

每个task attempt都有对应的TaskMemoryManger对象,而一个task attempt可能有不止一个memory consumer需要消耗内存,所以TaskMemoryManger会维护当前task attempt的所有memory consumers,在TaskMemoryManger中定义了私有成员变量consumers,可以看到它就是一个MemoryConsumer类型的HashSet.

  1. consumers在TaskMemoryManger构造函数中被初始化为一个空的HashSet,并且在每次acquireExecutionMemory方法调用的最后都会将申请逻辑内存的memory consumer添加到TMM的consumers中。

  2. 当某个memory consumer调用acquireExecutionMemory申请逻辑内存时遇到可用逻辑内存不足的情况,TaskMemoryManger首先会遍历consumers集合,生成一个以consumer占用的memory大小为key的TreeMap对象,也就是生成一颗以已用内存大小为key的红黑树。然后,在这个treeMap中查找占有目标内存大小的memory consumer,找到后依次调用这些consumer的spill方法,直到释放出足够的内存空间或treeMap遍历完毕。

总结

本篇文章主要剖析了Task在任务执行时内存的管理相关的内容,sort-based-Shuffle过程中,会频繁的使用基于内存的sorter,此时的sorter包含大量的数据,是需要内存管理,都是通过这里介绍的方法来进行统一管理的。

参考

  1. https://www.jianshu.com/p/1176b8c637d5
  2. https://www.cnblogs.com/johnny666888/p/11277947.html
  3. https://www.jianshu.com/p/34729f9f833c