Unsafe实现Spark中的ShuffleReader
以下Spark版本为1.4.1。优化部分可以参考Lifetime Based Memory Manager for Distributed Data Processing Systems (PVLDB 2016)。
前文讲到,ShuffleWriter不支持mapSideCombine的关键一点是V的size在aggregator中不固定。而这也正好是ShuffleReader无法用Unsafe实现的原因。因为ShuffleWriter端可以不支持aggregator,将所有KV对写入Unsafe中;而ShuffleReader必须实现aggregator。
所以,如果要用Unsafe实现ShuffleReader,原理其实和前文提到的K的检索和V的aggregator一样。实际在Spark中有一个约束是,aggregation操作定义了mapSideCombine为true,而non-aggregation操作定义了mapSideCombine为false。这个信息是实现V的aggregator的一个重要信息。另外一个不同点是read是来源于多个不同的地方,需要聚合到一个迭代器中。
1. V的size的判断
通过两个简单的例子来分析:
reduceByKey(func: (V, V) => V),将多个V聚合为一个V值,V的长度不变。mapSideCombine为true。
groupByKey(),将多个V聚集到一个Iterable的Collection(CompactBuffer)中,V中的元素依次增加,但是size只在array写满后才会增加。mapSideCombine为false。
在我们的初始版本中,我们就先利用mapSideCombine来做一个分类,true表示V的长度不变,而false表示长度会改变。
2. ShuffleReader
ShuffleReader首先根据block ID从其他节点获取数据,然后展开成一个迭代器返回给aggregator做操作。做操作时存放的sorter是生命周期最长的对象,所以要将这部分数据存放到UNSAFE中。
从其他节点或者本地获取数据并返回为一个迭代器的方法在BlockStoreShuffleFetcher的fetch方法中:
def fetch[T](
shuffleId: Int,
reduceId: Int,
context: TaskContext,
serializer: Serializer)
: Iterator[T] = {
// ···
// 获取数据块的address
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
for (((address, size), index) <- statuses.zipWithIndex) {
splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
}
// 获取Block的信息
val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {
case (address, splits) =>
(address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
}
// 根据地址拉取数据的方法
def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = {
val blockId = blockPair._1
val blockOption = blockPair._2
blockOption match {
case Success(block) => {
block.asInstanceOf[Iterator[T]]
}
case Failure(e) => {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
case _ =>
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block", e)
}
}
}
}
// 拉取的列表,继承迭代器,所以通过flatMap展开融合
val blockFetcherItr = new ShuffleBlockFetcherIterator(
context,
SparkEnv.get.blockManager.shuffleClient,
blockManager,
blocksByAddress,
serializer,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)
val itr = blockFetcherItr.flatMap(unpackBlock)
val completionIter = CompletionIterator[T, Iterator[T]](itr, {
context.taskMetrics.updateShuffleReadMetrics()
})
// 返回的迭代器
new InterruptibleIterator[T](context, completionIter) {
val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
override def next(): T = {
readMetrics.incRecordsRead(1)
delegate.next()
}
}
}
这里返回的迭代器是临时数据!只有在做aggregator的时候,这些对象会一直存活,所以只需要改接下来的aggregator为Unsafe即可。实际上这个rdd就相当于ShuffleWriter里write的参数,后面的写法就和UnsafeShuffleWriter类似了。
3. UnsafeShuffleReader
将上面产生的rdd(Iterable[T])作为参数传入,接下来做的就是aggregator操作和sort操作。完全可以仿照UnsafeShuffleWriter的做法。
Aggregator操作
aggregator操作分为两种,即前文V的size是否可变。为了实现初步的版本,以K和V均以Int为例来实现。ShuffleHandler中定义了key和value的类型,我们可以获取这些类型,但是这里仅考虑Int。
mapSideCombine为true,表示V的size不可变,直接写入KV,调用insertRecordIntoSorter方法。
mapSideCombine为false,表示V的size可变,先写入K,再申请一块额外的空间,返回一个地址,写入K后面。申请空间的初始长度为16+4,第一个字节写入存入的V的个数,后面依次写入V。
做排序时需要按照UnsafeShuffleWriter的方法写一个pointer指针。
读取操作
写完后read是返回的一个迭代器,因此需要定义Unsafe上的KV读取方法。
hasNext()方法:判断offset是否达到最后一个position
next()方法:根据写入的方式读取,如果是mapSideCombine为true,则按序读取K和V(按照ShuffleHandler的类型读取);如果mapSideCombine为false,则读取K后面的地址后还需要重新读取一遍数据并重组为V。
这样操作的目的是减少了ShuffleReader阶段aggregator操作部分的大量对象。后续更新实现后的效果吧,也可以参考开头的论文。^_^