UE TArray 源码剖析及分摊时间复杂度推导
研究下 UE 里各个容器的实现逻辑,横向对比下 stl,本文不谈接口及用法,只看实现。
初始打算一文完成,内容太多,分开各个容器剖析吧,后续链接至此,本文分析 TArray。
成员变量
在 UE 中最常用到的容器,对应 stl 的 vector:
template<typename InElementType, typename InAllocatorType>
class TArray
{
template <typename OtherInElementType, typename OtherAllocator>
friend class TArray;
public:
typedef typename InAllocatorType::SizeType SizeType;
typedef InElementType ElementType;
typedef InAllocatorType AllocatorType;
private:
using USizeType = typename TMakeUnsigned<SizeType>::Type;
...
typedef typename TChooseClass<
AllocatorType::NeedsElementType,
typename AllocatorType::template ForElementType<ElementType>,
typename AllocatorType::ForAnyElementType
>::Result ElementAllocatorType;
...
protected:
template<typename ElementType, typename AllocatorType>
friend class TIndirectArray;
ElementAllocatorType AllocatorInstance;
SizeType ArrayNum;
SizeType ArrayMax;
}
TArray 需要两个模板参数,元素类型 InElementType 和一个内存分配器 InAllocatorType。
其包含三个成员变量,宏定义都在上面,这里为了方便就地展开这几个宏看下:
typename TChooseClass<InAllocatorType::NeedsElementType,
typename InAllocatorType::ForElementType<ElementType>,
typename InAllocatorType::ForAnyElementType>::Result llocatorInstance;
typename InAllocatorType::SizeType ArrayNum;
typename InAllocatorType::SizeType ArrayMax;
ArrayNum 与 ArrayMax 是两个第二个模板参数内存分配器的 SizeType 类型的变量,ArrayNum 记录了当前数组内元素的实际个数。ArrayMax 则是记录的当前数组最大可容纳元素的数量,当 ArrayNum 接近 ArrayMax 等一些策略时,会触发数组扩容,分配更大的内存空间。
那这个 SizeType 是什么类型呢,这里接着往下看下,AllocatorInstance 是用来控制数组内存的分配和释放的分配器,类型是使用两个模板参数构建的 TChooseClass 模板类的 Result 类型:
template<bool Predicate,typename TrueClass,typename FalseClass>
class TChooseClass;
template<typename TrueClass,typename FalseClass>
class TChooseClass<true,TrueClass,FalseClass>
{
public:
typedef TrueClass Result;
};
template<typename TrueClass,typename FalseClass>
class TChooseClass<false,TrueClass,FalseClass>
{
public:
typedef FalseClass Result;
};
TChooseClass 这里无非是使用分配器的一个布尔参数,来进行不同的展开。
因此,关键还是在 UE 的这个分配器里,UE 里提供了多个不同的内存分配器,TArray 默认使用的内存分配器是 FDefaultAllocator:
// Source\Runtime\Core\Public\Containers\ContainersFwd.h
template<typename T, typename Allocator = FDefaultAllocator> class TArray;
这里也以这个 FDefaultAllocator 为例:
using FDefaultAllocator = TSizedDefaultAllocator<32>;
template <int IndexSize> class TSizedDefaultAllocator : public TSizedHeapAllocator<IndexSize> { public: typedef TSizedHeapAllocator<IndexSize> Typedef; };
FDefaultAllocator 只是取了一个默认 32 大小的 TSizedDefaultAllocator 的别名,TSizedDefaultAllocator 是使用了一个模板参数的继承 TSizedHeapAllocator 的模板类,后者是真正实现内存分配释放逻辑的位置,因此我们来看下这个 TSizedHeapAllocator:
template <int IndexSize, typename BaseMallocType = FMemory>
class TSizedHeapAllocator
{
public:
using SizeType = typename TBitsToSizeType<IndexSize>::Type;
enum { NeedsElementType = true };
enum { RequireRangeCheck = true };
class ForAnyElementType{ ... };
template<typename ElementType>
class ForElementType{ ... };
...
FScriptContainerElement* Data;
}
template <> struct TBitsToSizeType<32> { using Type = int32; };
SizeType 是内存分配器和容器使用的元素计数和索引,在这里从 using 别名一路特化下来就是 TBitsToSizeType<32>::Type,所以这里最终展开是 int32。
NeedsElementType 表示是否需要元素类型信息,RequireRangeCheck 代表访问数组时是否需要边界检查。
ForAnyElementType 是针对任意元素类型的类,提供一些通用的数组操作,ForElementType 是针对特定元素类型的类,用于分配特定类型的数组,并提供类型安全的数组操作。
这里 BaseMallocType 是底层使用的内存分配器类型,分配器暂时到这里,后续有功夫再仔细研究下抽篇文章单写,本文中遇到具体方法再具体谈及。
这里有个 TArray 里会常调用到的方法 GetData,此方法返回的是 TSizedHeapAllocator 里的 Data,其实就是指向当前容器头的指针,后文中用到此方法时不在解释:
FORCEINLINE ElementType* GetData()
{
return (ElementType*)AllocatorInstance.GetAllocation();
}
回到我们的 TArray 里,ArrayNum 与 ArrayMax 其实是 int32 类型的:
FORCEINLINE TArray()
: ArrayNum(0)
, ArrayMax(AllocatorInstance.GetInitialCapacity())
{}
// TSizedHeapAllocator::ForAnyElementType 中
SizeType GetInitialCapacity() const
{
return 0;
}
默认构造的这里也可以看到,使用默认 Allocator 的当前元素数量和容量都直接在初始化列表时赋值了 0,其他的构造、移动构造、赋值、交换之类的操作不看了,看下几处关键的逻辑,先找下扩容的位置研究下:
扩容跟踪
FORCEINLINE SizeType Add(const ElementType& Item)
{
CheckAddress(&Item);
return Emplace(Item);
}
这里 Check 会校验保证入参 Item 的地址范围不在分配器容器指针和最大可容纳元素位置的指针之间,因此核心逻辑在 Emplace 这里:
template <typename... ArgsType>
FORCEINLINE SizeType Emplace(ArgsType&&... Args)
{
const SizeType Index = AddUninitialized();
new(GetData() + Index) ElementType(Forward<ArgsType>(Args)...);
return Index;
}
这里可以看到 AddUninitialized 计算出了当前的内存偏移,然后直接分配内存返回下标,所以扩容应当在 AddUninitialized 中完成:
FORCEINLINE SizeType AddUninitialized()
{
CheckInvariants();
const USizeType OldNum = (USizeType)ArrayNum;
const USizeType NewNum = OldNum + (USizeType)1;
ArrayNum = (SizeType)NewNum;
if (NewNum > (USizeType)ArrayMax)
{
ResizeGrow((SizeType)OldNum);
}
return OldNum;
}
当新范围大于当前范围时,这里传入 ArrayNum 执行了 ResizeGrow 方法,注意这里的 ArrayNum 已经是新的大小的,也就是可以说是 TargetArrayNum:
FORCENOINLINE void ResizeGrow(SizeType OldNum)
{
SizeType LocalArrayNum = ArrayNum;
// This should only happen when we've underflowed or overflowed SizeType in the caller
if (LocalArrayNum < OldNum)
{
OnInvalidNum((USizeType)LocalArrayNum - (USizeType)OldNum);
}
ArrayMax = AllocatorCalculateSlackGrow(LocalArrayNum, ArrayMax);
AllocatorResizeAllocation(OldNum, ArrayMax);
}
这里使用 AllocatorCalculateSlackGrow 重新计算了 TargetArrayNum 期望的大小,入参为需求容纳的元素数量和当前最大可容纳量,然后在 AllocatorResizeAllocation 执行了真正的扩容,先看下计算的这一部分:
SizeType AllocatorCalculateSlackGrow(SizeType CurrentArrayNum, SizeType NewArrayMax)
{
if constexpr (TAllocatorTraits<AllocatorType>::SupportsElementAlignment)
{
return AllocatorInstance.CalculateSlackGrow(CurrentArrayNum, NewArrayMax, sizeof(ElementType), alignof(ElementType));
}
else
{
return AllocatorInstance.CalculateSlackGrow(CurrentArrayNum, NewArrayMax, sizeof(ElementType));
}
}
这里可以看到,执行了内存分配器的 CalculateSlackGrow 方法,默认的 HeapAllocator 会执行 if 结构体中逻辑,入参为 需求容纳的元素数量、当前最大元素容纳量、元素大小、内存对齐粒度,继续跟踪下:
FORCEINLINE SizeType CalculateSlackGrow(SizeType NumElements, SizeType NumAllocatedElements, SIZE_T NumBytesPerElement, uint32 AlignmentOfElement) const
{
return DefaultCalculateSlackGrow(NumElements, NumAllocatedElements, NumBytesPerElement, true, (uint32)AlignmentOfElement);
}
template <typename SizeType>
FORCEINLINE SizeType DefaultCalculateSlackGrow(SizeType NumElements, SizeType NumAllocatedElements, SIZE_T BytesPerElement, bool bAllowQuantize, uint32 Alignment = DEFAULT_ALIGNMENT)
{
#if !defined(AGGRESSIVE_MEMORY_SAVING)
#error "AGGRESSIVE_MEMORY_SAVING must be defined"
#endif
#if AGGRESSIVE_MEMORY_SAVING
const SIZE_T FirstGrow = 1;
const SIZE_T ConstantGrow = 0;
#else
const SIZE_T FirstGrow = 4;
const SIZE_T ConstantGrow = 16;
#endif
SizeType Retval;
checkSlow(NumElements > NumAllocatedElements && NumElements > 0);
SIZE_T Grow = FirstGrow; // this is the amount for the first alloc
#if CONTAINER_INITIAL_ALLOC_ZERO_SLACK
if (NumAllocatedElements)
{
// Allocate slack for the array proportional to its size.
Grow = SIZE_T(NumElements) + 3 * SIZE_T(NumElements) / 8 + ConstantGrow;
}
else if (SIZE_T(NumElements) > Grow)
{
Grow = SIZE_T(NumElements);
}
#else
if (NumAllocatedElements || SIZE_T(NumElements) > Grow)
{
// Allocate slack for the array proportional to its size.
Grow = SIZE_T(NumElements) + 3 * SIZE_T(NumElements) / 8 + ConstantGrow;
}
#endif
if (bAllowQuantize)
{
Retval = (SizeType)(FMemory::QuantizeSize(Grow * BytesPerElement, Alignment) / BytesPerElement);
}
else
{
Retval = (SizeType)Grow;
}
// NumElements and MaxElements are stored in 32 bit signed integers so we must be careful not to overflow here.
if (NumElements > Retval)
{
Retval = TNumericLimits<SizeType>::Max();
}
return Retval;
}
可以看到,默认的扩容实现为 DefaultCalculateSlackGrow,这里的 AGGRESSIVE_MEMORY_SAVING 是使用 MallocBinned2 等来最大优化内存的策略,笔者这里是未开启的。
便于分析,总结一下这里方法的入参:
- 需求容纳的元素数量 NumElements
- 最大元素容纳量 NumAllocatedElements
- 元素大小 BytesPerElement
- 是否需要量化对齐 bAllowQuantize(add 调用链路这里是 true,为了更高的 cache 性能,这里可以不关注)
- 内存对齐粒度 Alignment
对于首次执行到这里的 TArray,因为 NumAllocatedElements 是 0,所以不会触发 if 语句,当需求扩容量大于 FirstGrow 时,会使用需求量直接分配,反之直接使用 FirstGrow 作为初次扩容目标值,即 4。
后续调用时,会走 if 代码段,扩容策略为:
Grow = NumElements + 3 \cdot \cfrac{NumElements}{8} + ConstantGrow
这里 NumElements 是目标扩容的大小,ConstantGrow 是常量 16,假定初始化一个空数组后一直执行 add,那么扩容表现如下:
第n次扩容 | UE Tarray | lib std++ |
---|---|---|
0 | 4 | 1 |
1 | 22 | 2 |
2 | 47 | 4 |
3 | 82 | 8 |
4 | 130 | 16 |
5 | 196 | 32 |
6 | 286 | 64 |
7 | 410 | 128 |
8 | 581 | 256 |
9 | 816 | 512 |
10 | 1139 | 1024 |
11 | 1583 | 2048 |
12 | 2194 | 4096 |
13 | 3034 | 8192 |
14 | 4189 | 16384 |
15 | 5777 | 32768 |
16 | 7960 | 65536 |
17 | 10962 | 131072 |
18 | 15090 | 262144 |
19 | 20766 | 524288 |
20 | 28570 | 1048576 |
看起来是比较保守的策略,扩容激进时会导致占用更多未使用内存,扩容保守则可能导致频繁扩容,后文中再尝试进行分摊时间复杂度分析。
相较于大部分libstd++、MSVC++ 等的 2、1.5、1.618 倍扩容的策略 UE 还是增速略缓,这里的 3/8 网上谈及甚少(虽然很像是 1-0.618 哈哈),但估计肯定是经过了大量性能测试,对于游戏而言实践得出的最优或最适用的策略吧,毕竟是 UE 官方提供的。
我们继续跟踪扩容的流程,这里计算出扩容后的大小后会逐层返回,直到 TArray 的 ResizeGrow 方法,将当前已容纳元素数量和返回值作为入参调用了 AllocatorResizeAllocation 方法:
void AllocatorResizeAllocation(SizeType CurrentArrayNum, SizeType NewArrayMax)
{
if constexpr (TAllocatorTraits<AllocatorType>::SupportsElementAlignment)
{
AllocatorInstance.ResizeAllocation(CurrentArrayNum, NewArrayMax, sizeof(ElementType), alignof(ElementType));
}
else
{
AllocatorInstance.ResizeAllocation(CurrentArrayNum, NewArrayMax, sizeof(ElementType));
}
}
然后转调了内存分配器的 ResizeAllocation 方法,这里还是先忽略内存对齐,跟踪下面的方法(TSizedHeapAllocator):
FORCEINLINE void ResizeAllocation(SizeType PreviousNumElements, SizeType NumElements, SIZE_T NumBytesPerElement)
{
// Avoid calling FMemory::Realloc( nullptr, 0 ) as ANSI C mandates returning a valid pointer which is not what we want.
if (Data || NumElements)
{
//checkSlow(((uint64)NumElements*(uint64)ElementTypeInfo.GetSize() < (uint64)INT_MAX));
Data = (FScriptContainerElement*)BaseMallocType::Realloc( Data, NumElements*NumBytesPerElement );
}
}
这里的三个入参分别代表:PreviousNumElements: 之前的数组元素个数、NumElements: 重新分配后的数组元素个数、NumBytesPerElement: 每个元素占用的字节数。
这里核心就是执行了 BaseMallocType::Realloc 方法,UE 里有多种策略,本文不再分析,后续研究内存时再写,但理解上无非尝试就地扩容,成功返回,失败时重新分配内存,memcpy 数据,free 旧内存块。
这样,回到扩容最初调用的位置:
template <typename... ArgsType>
FORCEINLINE SizeType Emplace(ArgsType&&... Args)
{
const SizeType Index = AddUninitialized();
new(GetData() + Index) ElementType(Forward<ArgsType>(Args)...);
return Index;
}
这里返回的 Index 就是 OldNum,因此在 GetData() + Index 位置构建当前值即完成了 TArray 的 add 操作。
分摊时间复杂度分析:
首先,我们忽略掉常数及初始扩容策略,简单理解为按 3/8 扩容,且假定就地扩容均会失败,那么,每次扩容时,元素搬迁额外花费的时间为:O(\frac{11}{8}n) = O(n)
,因为扩容的频率不会太过频繁,因此此处使用大 O 计算复杂度不太合理,应当考虑分摊分析( 3/8 = 0.375):
此时,不妨设数组初始容量为 N,且恰好已达当前数组分配的最大容量,此时执行 add 时即需要扩容,此时考虑连续 add n 次的场景下(n \gg N
)。首先定义如下函数:
- size(n) = 连续插入 n 个元素后向量的规模
- capacity(n) = 连续插入 n 个元素后数组的容量
- T(n) = 为连续插入 n 个元素而花费于扩容的时间
其中,数组规模从 N 开始随着操作的进程逐步递增,故有:
size(n) = N + n
考虑 capacity(n) 与 size(n),因为 UE 的扩容策略,只有在当前容量已经不满足当前使用需求时才会触发扩容,也就是“懒惰”的扩容策略,因此 capacity(n) 始终存在:
size(n) < capacity(n) < size(\frac{11}{8}n)
考虑到 N 为常数,故有:
capacity(n) = \theta(size(n)) = \theta(n)
容量以 11/8 为比例按指数速度增长,在容量达到 capacity(n) 之前,共做过 \theta(\log_\frac{11}{8}n)
次扩容,每次扩容所需的时间线性正比于当时的容量或规模,且同样以 1.375 为比例按指数速度增长。因此消耗于扩容的时间累计不过:
T(n) = \cfrac{11}{8}N + (\cfrac{11}{8})^2N + (\cfrac{11}{8})^3N + ... + capacity(n) < \cfrac{11}{8} capacity(n) = \theta(n)
因此 n 次操作消耗于扩容的时间为确界 n,故而但此操作的分摊运行时间应该为:
O(1)
也可按等比公式推导:
T(n) = \cfrac{11}{8}N + (\cfrac{11}{8})^2N + (\cfrac{11}{8})^3N + ... +(\cfrac{11}{8})^{\log_\frac{11}{8}n}N
T(n) = \cfrac{11}{8} N \cfrac{1-\cfrac{11}{8}^{\log_\frac{11}{8}n} } {1 - \cfrac{11}{8}}
T(n) = (n-1)\cfrac{11}{3}N
T(n) = \theta(n)
其他的的方法如 Shrink、Remove、Sort相关操作本文暂不再探讨,与 stl 基本类似,不过 UE 的 TArray 默认提供了 heap 相关操作,不再像 stl 一样需要调用 algorithm 的相关堆方法,读者关心可自行查阅源码或官方文档:TArray:虚幻引擎中的数组,本文不再总结。
参考文章:
邓俊辉. 数据结构: C++ 语言版[M]. Qing hua da xue chu ban she, 2013.
quabqi. 知乎:UE4的TArray(三)
Comments NOTHING