UE TArray 源码剖析及分摊时间复杂度推导

发布于 2023-05-27  160 次阅读


UE TArray 源码剖析及分摊时间复杂度推导

研究下 UE 里各个容器的实现逻辑,横向对比下 stl,本文不谈接口及用法,只看实现。
初始打算一文完成,内容太多,分开各个容器剖析吧,后续链接至此,本文分析 TArray。

成员变量

在 UE 中最常用到的容器,对应 stl 的 vector:

template<typename InElementType, typename InAllocatorType>
class TArray
{
    template <typename OtherInElementType, typename OtherAllocator>
    friend class TArray;

public:
    typedef typename InAllocatorType::SizeType SizeType;
    typedef InElementType ElementType;
    typedef InAllocatorType AllocatorType;

private:
    using USizeType = typename TMakeUnsigned<SizeType>::Type;
    ...

    typedef typename TChooseClass<
        AllocatorType::NeedsElementType,
        typename AllocatorType::template ForElementType<ElementType>,
        typename AllocatorType::ForAnyElementType
    >::Result ElementAllocatorType;
    ...

protected:
    template<typename ElementType, typename AllocatorType>
    friend class TIndirectArray;

    ElementAllocatorType AllocatorInstance;
    SizeType             ArrayNum;
    SizeType             ArrayMax;
}

TArray 需要两个模板参数,元素类型 InElementType 和一个内存分配器 InAllocatorType。
其包含三个成员变量,宏定义都在上面,这里为了方便就地展开这几个宏看下:

    typename TChooseClass<InAllocatorType::NeedsElementType,
    typename InAllocatorType::ForElementType<ElementType>,
    typename InAllocatorType::ForAnyElementType>::Result llocatorInstance;
    typename InAllocatorType::SizeType ArrayNum;
    typename InAllocatorType::SizeType ArrayMax;

ArrayNum 与 ArrayMax 是两个第二个模板参数内存分配器的 SizeType 类型的变量,ArrayNum 记录了当前数组内元素的实际个数。ArrayMax 则是记录的当前数组最大可容纳元素的数量,当 ArrayNum 接近 ArrayMax 等一些策略时,会触发数组扩容,分配更大的内存空间。
那这个 SizeType 是什么类型呢,这里接着往下看下,AllocatorInstance 是用来控制数组内存的分配和释放的分配器,类型是使用两个模板参数构建的 TChooseClass 模板类的 Result 类型:

template<bool Predicate,typename TrueClass,typename FalseClass>
class TChooseClass;

template<typename TrueClass,typename FalseClass>
class TChooseClass<true,TrueClass,FalseClass>
{
public:
    typedef TrueClass Result;
};

template<typename TrueClass,typename FalseClass>
class TChooseClass<false,TrueClass,FalseClass>
{
public:
    typedef FalseClass Result;
};

TChooseClass 这里无非是使用分配器的一个布尔参数,来进行不同的展开。
因此,关键还是在 UE 的这个分配器里,UE 里提供了多个不同的内存分配器,TArray 默认使用的内存分配器是 FDefaultAllocator:

// Source\Runtime\Core\Public\Containers\ContainersFwd.h
template<typename T, typename Allocator = FDefaultAllocator> class TArray;

这里也以这个 FDefaultAllocator 为例:

using FDefaultAllocator = TSizedDefaultAllocator<32>;

template <int IndexSize> class TSizedDefaultAllocator : public TSizedHeapAllocator<IndexSize> { public: typedef TSizedHeapAllocator<IndexSize> Typedef; };

FDefaultAllocator 只是取了一个默认 32 大小的 TSizedDefaultAllocator 的别名,TSizedDefaultAllocator 是使用了一个模板参数的继承 TSizedHeapAllocator 的模板类,后者是真正实现内存分配释放逻辑的位置,因此我们来看下这个 TSizedHeapAllocator:

template <int IndexSize, typename BaseMallocType = FMemory>
class TSizedHeapAllocator
{
public:
    using SizeType = typename TBitsToSizeType<IndexSize>::Type;

    enum { NeedsElementType = true };

    enum { RequireRangeCheck = true };

    class ForAnyElementType{ ... };

    template<typename ElementType>
    class ForElementType{ ... };

    ...
    FScriptContainerElement* Data;
}

template <> struct TBitsToSizeType<32> { using Type = int32; };

SizeType 是内存分配器和容器使用的元素计数和索引,在这里从 using 别名一路特化下来就是 TBitsToSizeType<32>::Type,所以这里最终展开是 int32。
NeedsElementType 表示是否需要元素类型信息,RequireRangeCheck 代表访问数组时是否需要边界检查。

ForAnyElementType 是针对任意元素类型的类,提供一些通用的数组操作,ForElementType 是针对特定元素类型的类,用于分配特定类型的数组,并提供类型安全的数组操作。

这里 BaseMallocType 是底层使用的内存分配器类型,分配器暂时到这里,后续有功夫再仔细研究下抽篇文章单写,本文中遇到具体方法再具体谈及。

这里有个 TArray 里会常调用到的方法 GetData,此方法返回的是 TSizedHeapAllocator 里的 Data,其实就是指向当前容器头的指针,后文中用到此方法时不在解释:

FORCEINLINE ElementType* GetData()
{
    return (ElementType*)AllocatorInstance.GetAllocation();
}

回到我们的 TArray 里,ArrayNum 与 ArrayMax 其实是 int32 类型的:

FORCEINLINE TArray()
    : ArrayNum(0)
    , ArrayMax(AllocatorInstance.GetInitialCapacity())
{}

// TSizedHeapAllocator::ForAnyElementType 中
SizeType GetInitialCapacity() const
{
    return 0;
}

默认构造的这里也可以看到,使用默认 Allocator 的当前元素数量和容量都直接在初始化列表时赋值了 0,其他的构造、移动构造、赋值、交换之类的操作不看了,看下几处关键的逻辑,先找下扩容的位置研究下:

扩容跟踪

FORCEINLINE SizeType Add(const ElementType& Item)
{
    CheckAddress(&Item);
    return Emplace(Item);
}

这里 Check 会校验保证入参 Item 的地址范围不在分配器容器指针和最大可容纳元素位置的指针之间,因此核心逻辑在 Emplace 这里:

template <typename... ArgsType>
FORCEINLINE SizeType Emplace(ArgsType&&... Args)
{
    const SizeType Index = AddUninitialized();

    new(GetData() + Index) ElementType(Forward<ArgsType>(Args)...);
    return Index;
}

这里可以看到 AddUninitialized 计算出了当前的内存偏移,然后直接分配内存返回下标,所以扩容应当在 AddUninitialized 中完成:

FORCEINLINE SizeType AddUninitialized()
{
    CheckInvariants();

    const USizeType OldNum = (USizeType)ArrayNum;
    const USizeType NewNum = OldNum + (USizeType)1;
    ArrayNum = (SizeType)NewNum;
    if (NewNum > (USizeType)ArrayMax)
    {
        ResizeGrow((SizeType)OldNum);
    }
    return OldNum;
}

当新范围大于当前范围时,这里传入 ArrayNum 执行了 ResizeGrow 方法,注意这里的 ArrayNum 已经是新的大小的,也就是可以说是 TargetArrayNum:

FORCENOINLINE void ResizeGrow(SizeType OldNum)
{
    SizeType LocalArrayNum = ArrayNum;

    // This should only happen when we've underflowed or overflowed SizeType in the caller
    if (LocalArrayNum < OldNum)
    {
        OnInvalidNum((USizeType)LocalArrayNum - (USizeType)OldNum);
    }
    ArrayMax = AllocatorCalculateSlackGrow(LocalArrayNum, ArrayMax);
    AllocatorResizeAllocation(OldNum, ArrayMax);
}

这里使用 AllocatorCalculateSlackGrow 重新计算了 TargetArrayNum 期望的大小,入参为需求容纳的元素数量和当前最大可容纳量,然后在 AllocatorResizeAllocation 执行了真正的扩容,先看下计算的这一部分:

SizeType AllocatorCalculateSlackGrow(SizeType CurrentArrayNum, SizeType NewArrayMax)
{
    if constexpr (TAllocatorTraits<AllocatorType>::SupportsElementAlignment)
    {
        return AllocatorInstance.CalculateSlackGrow(CurrentArrayNum, NewArrayMax, sizeof(ElementType), alignof(ElementType));
    }
    else
    {
        return AllocatorInstance.CalculateSlackGrow(CurrentArrayNum, NewArrayMax, sizeof(ElementType));
    }
}

这里可以看到,执行了内存分配器的 CalculateSlackGrow 方法,默认的 HeapAllocator 会执行 if 结构体中逻辑,入参为 需求容纳的元素数量、当前最大元素容纳量、元素大小、内存对齐粒度,继续跟踪下:

FORCEINLINE SizeType CalculateSlackGrow(SizeType NumElements, SizeType NumAllocatedElements, SIZE_T NumBytesPerElement, uint32 AlignmentOfElement) const
{
    return DefaultCalculateSlackGrow(NumElements, NumAllocatedElements, NumBytesPerElement, true, (uint32)AlignmentOfElement);
}
template <typename SizeType>
FORCEINLINE SizeType DefaultCalculateSlackGrow(SizeType NumElements, SizeType NumAllocatedElements, SIZE_T BytesPerElement, bool bAllowQuantize, uint32 Alignment = DEFAULT_ALIGNMENT)
{
#if !defined(AGGRESSIVE_MEMORY_SAVING)
    #error "AGGRESSIVE_MEMORY_SAVING must be defined"
#endif
#if AGGRESSIVE_MEMORY_SAVING
    const SIZE_T FirstGrow = 1;
    const SIZE_T ConstantGrow = 0;
#else
    const SIZE_T FirstGrow = 4;
    const SIZE_T ConstantGrow = 16;
#endif

    SizeType Retval;
    checkSlow(NumElements > NumAllocatedElements && NumElements > 0);

    SIZE_T Grow = FirstGrow; // this is the amount for the first alloc

#if CONTAINER_INITIAL_ALLOC_ZERO_SLACK
    if (NumAllocatedElements)
    {
        // Allocate slack for the array proportional to its size.
        Grow = SIZE_T(NumElements) + 3 * SIZE_T(NumElements) / 8 + ConstantGrow;
    }
    else if (SIZE_T(NumElements) > Grow)
    {
        Grow = SIZE_T(NumElements);
    }
#else
    if (NumAllocatedElements || SIZE_T(NumElements) > Grow)
    {
        // Allocate slack for the array proportional to its size.
        Grow = SIZE_T(NumElements) + 3 * SIZE_T(NumElements) / 8 + ConstantGrow;
    }
#endif
    if (bAllowQuantize)
    {
        Retval = (SizeType)(FMemory::QuantizeSize(Grow * BytesPerElement, Alignment) / BytesPerElement);
    }
    else
    {
        Retval = (SizeType)Grow;
    }
    // NumElements and MaxElements are stored in 32 bit signed integers so we must be careful not to overflow here.
    if (NumElements > Retval)
    {
        Retval = TNumericLimits<SizeType>::Max();
    }

    return Retval;
}

可以看到,默认的扩容实现为 DefaultCalculateSlackGrow,这里的 AGGRESSIVE_MEMORY_SAVING 是使用 MallocBinned2 等来最大优化内存的策略,笔者这里是未开启的。

便于分析,总结一下这里方法的入参:

  • 需求容纳的元素数量 NumElements
  • 最大元素容纳量 NumAllocatedElements
  • 元素大小 BytesPerElement
  • 是否需要量化对齐 bAllowQuantize(add 调用链路这里是 true,为了更高的 cache 性能,这里可以不关注)
  • 内存对齐粒度 Alignment

对于首次执行到这里的 TArray,因为 NumAllocatedElements 是 0,所以不会触发 if 语句,当需求扩容量大于 FirstGrow 时,会使用需求量直接分配,反之直接使用 FirstGrow 作为初次扩容目标值,即 4。

后续调用时,会走 if 代码段,扩容策略为:

Grow = NumElements + 3 \cdot \cfrac{NumElements}{8} + ConstantGrow

这里 NumElements 是目标扩容的大小,ConstantGrow 是常量 16,假定初始化一个空数组后一直执行 add,那么扩容表现如下:

第n次扩容 UE Tarray lib std++
0 4 1
1 22 2
2 47 4
3 82 8
4 130 16
5 196 32
6 286 64
7 410 128
8 581 256
9 816 512
10 1139 1024
11 1583 2048
12 2194 4096
13 3034 8192
14 4189 16384
15 5777 32768
16 7960 65536
17 10962 131072
18 15090 262144
19 20766 524288
20 28570 1048576

看起来是比较保守的策略,扩容激进时会导致占用更多未使用内存,扩容保守则可能导致频繁扩容,后文中再尝试进行分摊时间复杂度分析。
相较于大部分libstd++、MSVC++ 等的 2、1.5、1.618 倍扩容的策略 UE 还是增速略缓,这里的 3/8 网上谈及甚少(虽然很像是 1-0.618 哈哈),但估计肯定是经过了大量性能测试,对于游戏而言实践得出的最优或最适用的策略吧,毕竟是 UE 官方提供的。

我们继续跟踪扩容的流程,这里计算出扩容后的大小后会逐层返回,直到 TArray 的 ResizeGrow 方法,将当前已容纳元素数量和返回值作为入参调用了 AllocatorResizeAllocation 方法:

void AllocatorResizeAllocation(SizeType CurrentArrayNum, SizeType NewArrayMax)
{
    if constexpr (TAllocatorTraits<AllocatorType>::SupportsElementAlignment)
    {
        AllocatorInstance.ResizeAllocation(CurrentArrayNum, NewArrayMax, sizeof(ElementType), alignof(ElementType));
    }
    else
    {
        AllocatorInstance.ResizeAllocation(CurrentArrayNum, NewArrayMax, sizeof(ElementType));
    }
}

然后转调了内存分配器的 ResizeAllocation 方法,这里还是先忽略内存对齐,跟踪下面的方法(TSizedHeapAllocator):

FORCEINLINE void ResizeAllocation(SizeType PreviousNumElements, SizeType NumElements, SIZE_T NumBytesPerElement)
{
    // Avoid calling FMemory::Realloc( nullptr, 0 ) as ANSI C mandates returning a valid pointer which is not what we want.
    if (Data || NumElements)
    {
        //checkSlow(((uint64)NumElements*(uint64)ElementTypeInfo.GetSize() < (uint64)INT_MAX));
        Data = (FScriptContainerElement*)BaseMallocType::Realloc( Data, NumElements*NumBytesPerElement );
    }
}

这里的三个入参分别代表:PreviousNumElements: 之前的数组元素个数、NumElements: 重新分配后的数组元素个数、NumBytesPerElement: 每个元素占用的字节数。
这里核心就是执行了 BaseMallocType::Realloc 方法,UE 里有多种策略,本文不再分析,后续研究内存时再写,但理解上无非尝试就地扩容,成功返回,失败时重新分配内存,memcpy 数据,free 旧内存块。

这样,回到扩容最初调用的位置:

    template <typename... ArgsType>
    FORCEINLINE SizeType Emplace(ArgsType&&... Args)
    {
        const SizeType Index = AddUninitialized();
        new(GetData() + Index) ElementType(Forward<ArgsType>(Args)...);
        return Index;
    }

这里返回的 Index 就是 OldNum,因此在 GetData() + Index 位置构建当前值即完成了 TArray 的 add 操作。

分摊时间复杂度分析:

首先,我们忽略掉常数及初始扩容策略,简单理解为按 3/8 扩容,且假定就地扩容均会失败,那么,每次扩容时,元素搬迁额外花费的时间为:O(\frac{11}{8}n) = O(n),因为扩容的频率不会太过频繁,因此此处使用大 O 计算复杂度不太合理,应当考虑分摊分析( 3/8 = 0.375):

此时,不妨设数组初始容量为 N,且恰好已达当前数组分配的最大容量,此时执行 add 时即需要扩容,此时考虑连续 add n 次的场景下(n \gg N)。首先定义如下函数:

  • size(n) = 连续插入 n 个元素后向量的规模
  • capacity(n) = 连续插入 n 个元素后数组的容量
  • T(n) = 为连续插入 n 个元素而花费于扩容的时间

其中,数组规模从 N 开始随着操作的进程逐步递增,故有:

size(n) = N + n

考虑 capacity(n) 与 size(n),因为 UE 的扩容策略,只有在当前容量已经不满足当前使用需求时才会触发扩容,也就是“懒惰”的扩容策略,因此 capacity(n) 始终存在:

size(n) < capacity(n) < size(\frac{11}{8}n)

考虑到 N 为常数,故有:

capacity(n) = \theta(size(n)) = \theta(n)

容量以 11/8 为比例按指数速度增长,在容量达到 capacity(n) 之前,共做过 \theta(\log_\frac{11}{8}n) 次扩容,每次扩容所需的时间线性正比于当时的容量或规模,且同样以 1.375 为比例按指数速度增长。因此消耗于扩容的时间累计不过:

T(n) = \cfrac{11}{8}N + (\cfrac{11}{8})^2N + (\cfrac{11}{8})^3N + ... + capacity(n) < \cfrac{11}{8} capacity(n) = \theta(n)

因此 n 次操作消耗于扩容的时间为确界 n,故而但此操作的分摊运行时间应该为:
O(1)

也可按等比公式推导:

T(n) = \cfrac{11}{8}N + (\cfrac{11}{8})^2N + (\cfrac{11}{8})^3N + ... +(\cfrac{11}{8})^{\log_\frac{11}{8}n}N
T(n) = \cfrac{11}{8} N \cfrac{1-\cfrac{11}{8}^{\log_\frac{11}{8}n} } {1 - \cfrac{11}{8}}
T(n) = (n-1)\cfrac{11}{3}N 
T(n) = \theta(n)








其他的的方法如 Shrink、Remove、Sort相关操作本文暂不再探讨,与 stl 基本类似,不过 UE 的 TArray 默认提供了 heap 相关操作,不再像 stl 一样需要调用 algorithm 的相关堆方法,读者关心可自行查阅源码或官方文档:TArray:虚幻引擎中的数组,本文不再总结。

参考文章:
邓俊辉. 数据结构: C++ 语言版[M]. Qing hua da xue chu ban she, 2013.
quabqi. 知乎:UE4的TArray(三)