Files
2025-05-18 13:04:45 +08:00

3378 lines
98 KiB
C++

// Copyright Epic Games, Inc. All Rights Reserved.
#include "OnDemandInstallCache.h"
#include "OnDemandHttpClient.h"
#include "OnDemandIoStore.h"
#include "Statistics.h"
#include "Algo/Accumulate.h"
#include "Algo/Find.h"
#include "Async/Mutex.h"
#include "Async/UniqueLock.h"
#include "Async/AsyncFileHandle.h"
#include "Containers/UnrealString.h"
#include "GenericHash.h"
#include "HAL/FileManager.h"
#include "HAL/PlatformFile.h"
#include "HAL/PlatformFileManager.h"
#include "IO/IoContainerHeader.h"
#include "IO/IoChunkId.h"
#include "IO/IoChunkEncoding.h"
#include "Misc/DateTime.h"
#include "Misc/Paths.h"
#include "Misc/PathViews.h"
#include "Misc/ScopeExit.h"
#include "Misc/StringBuilder.h"
#include "Serialization/MemoryReader.h"
#include "Serialization/LargeMemoryWriter.h"
#include "Tasks/Task.h"
#include "ProfilingDebugging/IoStoreTrace.h"
#if WITH_IOSTORE_ONDEMAND_TESTS
#include "Algo/Find.h"
#include "TestHarness.h"
#include "TestMacros/Assertions.h"
#include <catch2/generators/catch_generators.hpp>
#endif
#ifndef UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE
#define UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE (0)
#endif
#if UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE
#include "Tasks/Pipe.h"
#endif // UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE
namespace UE::IoStore
{
///////////////////////////////////////////////////////////////////////////////
namespace CVars
{
static bool GIoStoreOnDemandEnableDefrag = true;
static FAutoConsoleVariableRef CVar_IoStoreOnDemandEnableDefrag(
TEXT("iostore.EnableDefrag"),
GIoStoreOnDemandEnableDefrag,
TEXT("Whether to enable defrag when purging")
);
}
///////////////////////////////////////////////////////////////////////////////
double ToKiB(uint64 Value)
{
return double(Value) / 1024.0;
}
///////////////////////////////////////////////////////////////////////////////
double ToMiB(uint64 Value)
{
return double(Value) / 1024.0 / 1024.0;
}
///////////////////////////////////////////////////////////////////////////////
using FUniqueFileHandle = TUniquePtr<IFileHandle>;
using FSharedFileHandle = TSharedPtr<IFileHandle>;
using FSharedFileOpenResult = TValueOrError<FSharedFileHandle, FFileSystemError>;
using FSharedAsyncFileHandle = TSharedPtr<IAsyncReadFileHandle>;
using FWeakAsyncFileHandle = TWeakPtr<IAsyncReadFileHandle>;
using FSharedFileOpenAsyncResult = TValueOrError<FSharedAsyncFileHandle, FFileSystemError>;
using FCasAddr = FHash96;
///////////////////////////////////////////////////////////////////////////////
struct FCasBlockId
{
FCasBlockId() = default;
explicit FCasBlockId(uint32 InId)
: Id(InId) { }
bool IsValid() const { return Id != 0; }
friend inline bool operator==(FCasBlockId LHS, FCasBlockId RHS)
{
return LHS.Id == RHS.Id;
}
friend inline uint32 GetTypeHash(FCasBlockId BlockId)
{
return GetTypeHash(BlockId.Id);
}
friend FArchive& operator<<(FArchive& Ar, FCasBlockId& BlockId)
{
Ar << BlockId.Id;
return Ar;
}
static const FCasBlockId Invalid;
uint32 Id = 0;
};
const FCasBlockId FCasBlockId::Invalid = FCasBlockId();
///////////////////////////////////////////////////////////////////////////////
struct FCasLocation
{
bool IsValid() const { return BlockId.IsValid() && BlockOffset != MAX_uint32; }
friend inline bool operator==(FCasLocation LHS, FCasLocation RHS)
{
return LHS.BlockId == RHS.BlockId && LHS.BlockOffset == RHS.BlockOffset;
}
friend inline uint32 GetTypeHash(FCasLocation Loc)
{
return HashCombine(GetTypeHash(Loc.BlockId), GetTypeHash(Loc.BlockOffset));
}
friend FArchive& operator<<(FArchive& Ar, FCasLocation& Loc)
{
Ar << Loc.BlockId;
Ar << Loc.BlockOffset;
return Ar;
}
static const FCasLocation Invalid;
FCasBlockId BlockId;
uint32 BlockOffset = MAX_uint32;
};
const FCasLocation FCasLocation::Invalid = FCasLocation();
///////////////////////////////////////////////////////////////////////////////
struct FCasBlockInfo
{
uint64 FileSize = 0;
int64 LastAccess = 0;
uint64 RefSize = 0;
};
using FCasBlockInfoMap = TMap<FCasBlockId, FCasBlockInfo>;
///////////////////////////////////////////////////////////////////////////////
struct FCas
{
static constexpr uint32 DeleteBlockMaxWaitTimeMs = 10000;
using FLookup = TMap<FCasAddr, FCasLocation>;
using FReadHandles = TMap<FCasBlockId, FWeakAsyncFileHandle>;
using FLastAccess = TMap<FCasBlockId, int64>;
using FBlockIdHandleCounts = TMap<FCasBlockId, int32>;
FIoStatus Initialize(FStringView Directory, bool bDeleteExisting = false);
FCasLocation FindChunk(const FIoHash& Hash) const;
FCasBlockId CreateBlock();
FIoStatus DeleteBlock(FCasBlockId BlockId, TArray<FCasAddr>& OutAddrs);
FString GetBlockFilename(FCasBlockId BlockId) const;
FSharedFileOpenResult OpenRead(FCasBlockId BlockId);
FSharedFileOpenAsyncResult OpenAsyncRead(FCasBlockId BlockId);
void OnFileHandleDeleted(FCasBlockId BlockId);
FUniqueFileHandle OpenWrite(FCasBlockId BlockId) const;
void TrackAccess(FCasBlockId BlockId, int64 UtcTicks);
void TrackAccess(FCasBlockId BlockId) { TrackAccess(BlockId, FDateTime::UtcNow().GetTicks()); }
void TrackAccessIfNewer(FCasBlockId BlockId, int64 UtcTicks);
uint64 GetBlockInfo(FCasBlockInfoMap& OutBlockInfo);
void Compact();
FIoStatus Verify(TArray<FCasAddr>& OutAddrs);
FStringView RootDirectory;
FLookup Lookup;
FBlockIdHandleCounts BlockIds;
FLastAccess LastAccess;
FReadHandles ReadHandles;
FEventRef BlockReadsDoneEvent;
const uint32 MaxBlockSize = 32 << 20; //TODO: Make configurable
const uint32 MinBlockSize = MaxBlockSize >> 1; //TODO: Make configurable
mutable UE::FMutex Mutex;
};
///////////////////////////////////////////////////////////////////////////////
FIoStatus FCas::Initialize(FStringView Directory, bool bDeleteExisting)
{
RootDirectory = Directory;
Lookup.Empty();
BlockIds.Empty();
LastAccess.Empty();
TStringBuilder<256> Path;
FPathViews::Append(Path, RootDirectory, TEXT("blocks"));
IFileManager& Ifm = IFileManager::Get();
if (bDeleteExisting)
{
bool bRequireExists = false;
const bool bTree = true;
if (Ifm.DeleteDirectory(Path.ToString(), bRequireExists, bTree) == false)
{
return FIoStatusBuilder(EIoErrorCode::WriteError)
<< TEXT("Failed to delete CAS blocks directory '")
<< Path.ToString()
<< TEXT("'");
}
}
if (Ifm.DirectoryExists(Path.ToString()) == false)
{
const bool bTree = true;
if (Ifm.MakeDirectory(Path.ToString(), bTree) == false)
{
FIoStatus Status = FIoStatusBuilder(EIoErrorCode::WriteError)
<< TEXT("Failed to create directory '")
<< Path.ToString()
<< TEXT("'");
return Status;
}
}
return EIoErrorCode::Ok;
};
FCasLocation FCas::FindChunk(const FIoHash& Hash) const
{
const FCasAddr* Addr = reinterpret_cast<const FCasAddr*>(&Hash);
const uint32 TypeHash = GetTypeHash(*Addr);
{
UE::TUniqueLock Lock(Mutex);
if (const FCasLocation* Loc = Lookup.FindByHash(TypeHash, *Addr))
{
return *Loc;
}
}
return FCasLocation{};
}
FCasBlockId FCas::CreateBlock()
{
IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile();
FCasBlockId Out = FCasBlockId::Invalid;
UE::TUniqueLock Lock(Mutex);
for (uint32 Id = 1; Id < MAX_uint32 && !Out.IsValid(); Id++)
{
const FCasBlockId BlockId(Id);
if (BlockIds.Contains(BlockId))
{
continue;
}
const FString Filename = GetBlockFilename(BlockId);
if (Ipf.FileExists(*Filename))
{
UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Unused CAS block id %u already exists on disk"), BlockId.Id);
continue;
}
BlockIds.Add(BlockId, 0);
LastAccess.FindOrAdd(BlockId, FDateTime::UtcNow().GetTicks());
Out = BlockId;
}
return Out;
}
FIoStatus FCas::DeleteBlock(FCasBlockId BlockId, TArray<FCasAddr>& OutAddrs)
{
IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile();
const FString Filename = GetBlockFilename(BlockId);
UE::TDynamicUniqueLock Lock(Mutex, UE::FDeferLock());
// Wait for pending reads to flush before deleting block
uint32 StartTimeCycles = FPlatformTime::Cycles();
const uint32 WaitTimeMs = 1000;
for (;;)
{
Lock.Lock();
const int32 RequestCount = BlockIds.FindRef(BlockId);
if (RequestCount)
{
Lock.Unlock();
if (FPlatformTime::ToMilliseconds(FPlatformTime::Cycles() - StartTimeCycles) > DeleteBlockMaxWaitTimeMs)
{
return FIoStatusBuilder(EIoErrorCode::Timeout)
<< TEXT("Timed out waiting for pending read(s) while deleting CAS block '")
<< Filename
<< TEXT("'");
}
BlockReadsDoneEvent->Wait(WaitTimeMs);
}
else
{
// Leave mutex locked until it goes out of scope
break;
}
}
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Deleting CAS block '%s'"), *Filename);
if (Ipf.DeleteFile(*Filename) == false)
{
return FIoStatusBuilder(EIoErrorCode::WriteError)
<< TEXT("Failed to delete CAS block '")
<< Filename
<< TEXT("'");
}
BlockIds.Remove(BlockId);
ReadHandles.Remove(BlockId);
for (auto It = Lookup.CreateIterator(); It; ++It)
{
if (It->Value.BlockId == BlockId)
{
OutAddrs.Add(It->Key);
It.RemoveCurrent();
}
}
return FIoStatus::Ok;
}
FString FCas::GetBlockFilename(FCasBlockId BlockId) const
{
check(BlockId.IsValid());
const uint32 Id = NETWORK_ORDER32(BlockId.Id);
FString Hex;
BytesToHexLower(reinterpret_cast<const uint8*>(&Id), sizeof(int32), Hex);
TStringBuilder<256> Path;
FPathViews::Append(Path, RootDirectory, TEXT("blocks"), Hex);
Path << TEXT(".ucas");
return FString(Path.ToView());
}
FSharedFileOpenResult FCas::OpenRead(FCasBlockId BlockId)
{
const FString Filename = GetBlockFilename(BlockId);
IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile();
UE::TUniqueLock Lock(Mutex);
FFileOpenResult Result = Ipf.OpenRead(*Filename, IPlatformFile::EOpenReadFlags::AllowWrite);
if (Result.HasValue())
{
BlockIds.FindOrAdd(BlockId, 0)++;
FSharedFileHandle NewHandle(
Result.GetValue().Release(),
[this, BlockId](IFileHandle* RawHandle)
{
delete RawHandle;
OnFileHandleDeleted(BlockId);
}
);
return MakeValue(MoveTemp(NewHandle));
}
return MakeError(Result.StealError());
}
FSharedFileOpenAsyncResult FCas::OpenAsyncRead(FCasBlockId BlockId)
{
UE::TUniqueLock Lock(Mutex);
if (FWeakAsyncFileHandle* MaybeHandle = ReadHandles.Find(BlockId))
{
if (FSharedAsyncFileHandle Handle = MaybeHandle->Pin(); Handle.IsValid())
{
return MakeValue(MoveTemp(Handle));
}
}
IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile();
const FString Filename = GetBlockFilename(BlockId);
FFileOpenAsyncResult HandleResult(Ipf.OpenAsyncRead(*Filename, IPlatformFile::EOpenReadFlags::AllowWrite));
if (HandleResult.HasValue())
{
BlockIds.FindOrAdd(BlockId, 0)++;
FSharedAsyncFileHandle NewHandle(
HandleResult.GetValue().Release(),
[this, BlockId](IAsyncReadFileHandle* RawHandle)
{
delete RawHandle;
OnFileHandleDeleted(BlockId);
}
);
ReadHandles.FindOrAdd(BlockId, NewHandle);
return MakeValue(MoveTemp(NewHandle));
}
return MakeError(HandleResult.StealError());
}
void FCas::OnFileHandleDeleted(FCasBlockId BlockId)
{
UE::TUniqueLock Lock(Mutex);
const int32 Count = --BlockIds.FindChecked(BlockId);
check(Count >= 0);
if (Count == 0)
{
BlockReadsDoneEvent->Trigger();
}
}
FUniqueFileHandle FCas::OpenWrite(FCasBlockId BlockId) const
{
IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile();
const FString Filename = GetBlockFilename(BlockId);
const bool bAppend = true;
const bool bAllowRead = true;
return FUniqueFileHandle(Ipf.OpenWrite(*Filename, bAppend, bAllowRead));
}
void FCas::TrackAccess(FCasBlockId BlockId, int64 UtcTicks)
{
check(BlockId.IsValid());
UE::TUniqueLock Lock(Mutex);
LastAccess.FindOrAdd(BlockId, UtcTicks);
}
void FCas::TrackAccessIfNewer(FCasBlockId BlockId, int64 UtcTicks)
{
check(BlockId.IsValid());
UE::TUniqueLock Lock(Mutex);
int64& FoundTicks = LastAccess.FindOrAdd(BlockId, FDateTime::MinValue().GetTicks());
if (FoundTicks < UtcTicks)
{
FoundTicks = UtcTicks;
}
}
uint64 FCas::GetBlockInfo(FCasBlockInfoMap& OutBlockInfo)
{
TStringBuilder<256> Path;
FPathViews::Append(Path, RootDirectory, TEXT("blocks"));
struct FDirectoryVisitor final
: public IPlatformFile::FDirectoryVisitor
{
FDirectoryVisitor(IPlatformFile& PlatformFile, FCasBlockInfoMap& InBlockInfo, FLastAccess&& Access)
: Ipf(PlatformFile)
, BlockInfo(InBlockInfo)
, LastAccess(MoveTemp(Access))
{ }
virtual bool Visit(const TCHAR* FilenameOrDirectory, bool bIsDirectory) override
{
if (bIsDirectory)
{
return true;
}
const FStringView Filename(FilenameOrDirectory);
if (FPathViews::GetExtension(Filename) == TEXTVIEW("ucas") == false)
{
return true;
}
const int64 FileSize = Ipf.FileSize(FilenameOrDirectory);
const FStringView IndexHex = FPathViews::GetBaseFilename(Filename);
const FCasBlockId BlockId(FParse::HexNumber(WriteToString<128>(IndexHex).ToString()));
if (BlockId.IsValid() == false || FileSize < 0)
{
UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Found invalid CAS block '%s', FileSize=%lld"),
FilenameOrDirectory, FileSize);
return true;
}
if (BlockInfo.Contains(BlockId))
{
UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Found duplicate CAS block '%s'"), FilenameOrDirectory);
return true;
}
const int64* UtcTicks = LastAccess.Find(BlockId);
BlockInfo.Add(BlockId, FCasBlockInfo
{
.FileSize = uint64(FileSize),
.LastAccess = UtcTicks != nullptr ? *UtcTicks : 0
});
TotalSize += uint64(FileSize);
return true;
}
IPlatformFile& Ipf;
FCasBlockInfoMap& BlockInfo;
FLastAccess LastAccess;
uint64 TotalSize = 0;
};
FLastAccess Access;
{
TUniqueLock Lock(Mutex);
Access = LastAccess;
}
IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile();
FDirectoryVisitor Visitor(Ipf, OutBlockInfo, MoveTemp(Access));
Ipf.IterateDirectory(Path.ToString(), Visitor);
return Visitor.TotalSize;
}
void FCas::Compact()
{
UE::TUniqueLock Lock(Mutex);
Lookup.Compact();
BlockIds.Compact();
ReadHandles.Compact();
LastAccess.Compact();
}
FIoStatus FCas::Verify(TArray<FCasAddr>& OutAddrs)
{
FCasBlockInfoMap BlockInfo;
const uint64 TotalSize = GetBlockInfo(BlockInfo);
uint64 TotalVerifiedBytes = 0;
FIoStatus Status = FIoStatus::Ok;
for (auto BlockIt = BlockIds.CreateIterator(); BlockIt; ++BlockIt)
{
const FCasBlockId BlockId = BlockIt->Key;
if (const FCasBlockInfo* Info = BlockInfo.Find(BlockId))
{
TotalVerifiedBytes += Info->FileSize;
continue;
}
const FString Filename = GetBlockFilename(BlockId);
UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Missing CAS block '%s'"), *Filename);
LastAccess.Remove(BlockId);
BlockIt.RemoveCurrent();
Status = EIoErrorCode::NotFound;
}
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Verified %d CAS blocks of total %.2lf MiB"),
BlockIds.Num(), ToMiB(TotalVerifiedBytes));
IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile();
for (const TPair<FCasBlockId, FCasBlockInfo>& Kv : BlockInfo)
{
const FCasBlockId BlockId = Kv.Key;
if (BlockIds.Contains(BlockId))
{
continue;
}
const FString Filename = GetBlockFilename(BlockId);
if (Ipf.DeleteFile(*Filename))
{
UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Deleted orphaned CAS block '%s'"), *Filename);
}
}
TSet<FString> MissingReferencedBlocks;
for (auto It = Lookup.CreateIterator(); It; ++It)
{
if (!BlockIds.Contains(It->Value.BlockId))
{
MissingReferencedBlocks.Add(GetBlockFilename(It->Value.BlockId));
OutAddrs.Add(It->Key);
It.RemoveCurrent();
Status = EIoErrorCode::NotFound;
}
}
for (const FString& Filename : MissingReferencedBlocks)
{
UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Lookup references missing CAS block '%s'"), *Filename);
}
return Status;
}
///////////////////////////////////////////////////////////////////////////////
struct FCasJournal
{
enum class EVersion : uint32
{
Invalid = 0,
Initial,
LatestPlusOne,
Latest = LatestPlusOne - 1
};
struct FHeader
{
static const inline uint8 MagicSequence[16] = {'C', 'A', 'S', 'J', 'O', 'U', 'R', 'N', 'A', 'L', 'H', 'E', 'A', 'D', 'E', 'R'};
bool IsValid() const;
uint8 Magic[16] = {0};
EVersion Version = EVersion::Invalid;
uint8 Pad[12] = {0};
};
static_assert(sizeof(FHeader) == 32);
struct FFooter
{
static const inline uint8 MagicSequence[16] = {'C', 'A', 'S', 'J', 'O', 'U', 'R', 'N', 'A', 'L', 'F', 'O', 'O', 'T', 'E', 'R'};
bool IsValid() const;
uint8 Magic[16] = {0};
};
static_assert(sizeof(FFooter) == 16);
struct FEntry
{
enum class EType : uint8
{
None = 0,
ChunkLocation,
BlockCreated,
BlockDeleted,
BlockAccess
};
struct FChunkLocation
{
EType Type = EType::ChunkLocation;
uint8 Pad[3]= {0};
FCasLocation CasLocation;
FCasAddr CasAddr;
};
static_assert(sizeof(FChunkLocation) == 24);
struct FBlockOperation
{
EType Type = EType::None;
uint8 Pad[3]= {0};
FCasBlockId BlockId;
int64 UtcTicks = 0;
uint8 Pad1[8]= {0};
};
static_assert(sizeof(FBlockOperation) == 24);
union
{
FChunkLocation ChunkLocation;
FBlockOperation BlockOperation;
};
EType Type() const { return *reinterpret_cast<const EType*>(this); }
};
static_assert(sizeof(FEntry) == 24);
struct FTransaction
{
void ChunkLocation(const FCasLocation& Location, const FCasAddr& Addr);
void BlockCreated(FCasBlockId BlockId);
void BlockDeleted(FCasBlockId BlockId);
void BlockAccess(FCasBlockId BlockId, int64 UtcTicks);
FString JournalFile;
TArray<FEntry> Entries;
};
using FEntryHandler = TFunction<void(const FEntry&)>;
static FIoStatus Replay(const FString& JournalFile, FEntryHandler&& Handler);
static FIoStatus Create(const FString& JournalFile);
static FTransaction Begin(FString&& JournalFile);
static FTransaction Begin(const FString& JournalFile) { return Begin(FString(JournalFile)); }
static FIoStatus Commit(FTransaction&& Transaction);
};
///////////////////////////////////////////////////////////////////////////////
bool FCasJournal::FHeader::IsValid() const
{
if (FMemory::Memcmp(&Magic, &FHeader::MagicSequence, sizeof(FHeader::MagicSequence)) != 0)
{
return false;
}
if (static_cast<uint32>(Version) > static_cast<uint32>(EVersion::Latest))
{
return false;
}
return true;
}
bool FCasJournal::FFooter::IsValid() const
{
return FMemory::Memcmp(Magic, FFooter::MagicSequence, sizeof(FFooter::MagicSequence)) == 0;
}
FIoStatus FCasJournal::Replay(const FString& JournalFile, FEntryHandler&& Handler)
{
IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile();
if (Ipf.FileExists(*JournalFile) == false)
{
return EIoErrorCode::NotFound;
}
TUniquePtr<IFileHandle> FileHandle(Ipf.OpenRead(*JournalFile));
if (FileHandle.IsValid() == false)
{
return EIoErrorCode::FileNotOpen;
}
FHeader Header;
if ((FileHandle->Read(reinterpret_cast<uint8*>(&Header), sizeof(FHeader)) == false) || (Header.IsValid() == false))
{
return FIoStatusBuilder(EIoErrorCode::ReadError)
<< TEXT("Failed to validate journal header '")
<< JournalFile
<< TEXT("'");
}
const int64 FileSize = FileHandle->Size();
const int64 EntryCount = (FileSize - sizeof(FHeader) - sizeof(FFooter)) / sizeof(FEntry);
if (EntryCount < 0)
{
return EIoErrorCode::ReadError;
}
if (EntryCount == 0)
{
return EIoErrorCode::Ok;
}
const int64 FooterPos = FileSize - sizeof(FFooter);
if (FooterPos < 0)
{
return FIoStatusBuilder(EIoErrorCode::CorruptToc)
<< TEXT("Invalid journal footer");
}
const int64 EntriesPos = FileHandle->Tell();
if (FileHandle->Seek(FooterPos) == false)
{
return EIoErrorCode::ReadError;
}
FFooter Footer;
if ((FileHandle->Read(reinterpret_cast<uint8*>(&Footer), sizeof(FFooter)) == false) || (Footer.IsValid() == false))
{
return FIoStatusBuilder(EIoErrorCode::ReadError)
<< TEXT("Failed to validate journal footer '")
<< JournalFile
<< TEXT("'");
}
if (FileHandle->Seek(EntriesPos) == false)
{
return EIoErrorCode::ReadError;
}
TArray<FEntry> Entries;
Entries.SetNumZeroed(IntCastChecked<int32>(EntryCount));
if (FileHandle->Read(reinterpret_cast<uint8*>(Entries.GetData()), sizeof(FEntry) * EntryCount) == false)
{
return EIoErrorCode::ReadError;
}
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Replaying %" INT64_FMT " CAS journal entries of total %.2lf KiB from '%s'"),
EntryCount, ToKiB(sizeof(FEntry) * EntryCount), *JournalFile);
for (const FEntry& Entry : Entries)
{
Handler(Entry);
}
return EIoErrorCode::Ok;
}
FIoStatus FCasJournal::Create(const FString& JournalFile)
{
IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile();
Ipf.DeleteFile(*JournalFile);
TUniquePtr<IFileHandle> FileHandle(Ipf.OpenWrite(*JournalFile));
if (FileHandle.IsValid() == false)
{
return EIoErrorCode::FileNotOpen;
}
FHeader Header;
FMemory::Memcpy(&Header.Magic, &FHeader::MagicSequence, sizeof(FHeader::MagicSequence));
Header.Version = EVersion::Latest;
if (FileHandle->Write(reinterpret_cast<uint8*>(&Header), sizeof(FHeader)) == false)
{
return EIoErrorCode::WriteError;
}
FFooter Footer;
FMemory::Memcpy(&Footer.Magic, &FFooter::MagicSequence, sizeof(FFooter::MagicSequence));
if (FileHandle->Write(reinterpret_cast<uint8*>(&Footer), sizeof(FFooter)) == false)
{
return EIoErrorCode::WriteError;
}
return EIoErrorCode::Ok;
}
FCasJournal::FTransaction FCasJournal::Begin(FString&& JournalFile)
{
return FTransaction
{
.JournalFile = MoveTemp(JournalFile)
};
}
FIoStatus FCasJournal::Commit(FTransaction&& Transaction)
{
if (Transaction.Entries.IsEmpty())
{
return EIoErrorCode::Ok;
}
IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile();
// Validate header and footer
{
TUniquePtr<IFileHandle> FileHandle(Ipf.OpenRead(*Transaction.JournalFile));
const int64 FileSize = FileHandle.IsValid() ? FileHandle->Size() : -1;
if (FileSize < sizeof(FHeader))
{
FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::ReadError, 0);
return FIoStatusBuilder(EIoErrorCode::FileOpenFailed)
<< TEXT("Failed to validate CAS journal file '")
<< Transaction.JournalFile
<< TEXT("'");
}
FHeader Header;
if ((FileHandle->Read(reinterpret_cast<uint8*>(&Header), sizeof(FHeader)) == false) || (Header.IsValid() == false))
{
FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::SignatureError, 0);
return FIoStatusBuilder(EIoErrorCode::ReadError)
<< TEXT("Failed to validate CAS journal header '")
<< Transaction.JournalFile
<< TEXT("'");
}
const int64 FooterPos = FileSize - sizeof(FFooter);
if (FileHandle->Seek(FooterPos) == false)
{
FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::SignatureError, 0);
return FIoStatusBuilder(EIoErrorCode::ReadError)
<< TEXT("Failed to validate CAS journal footer '")
<< Transaction.JournalFile
<< TEXT("'");
}
FFooter Footer;
if ((FileHandle->Read(reinterpret_cast<uint8*>(&Footer), sizeof(FFooter)) == false) || (Footer.IsValid() == false))
{
FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::SignatureError, 0);
return FIoStatusBuilder(EIoErrorCode::ReadError)
<< TEXT("Failed to validate CAS journal footer '")
<< Transaction.JournalFile
<< TEXT("'");
}
}
// Append entries
{
const bool bAppend = true;
TUniquePtr<IFileHandle> FileHandle(Ipf.OpenWrite(*Transaction.JournalFile, bAppend));
const int64 FileSize = FileHandle.IsValid() ? FileHandle->Size() : -1;
const int64 EntriesPos = FileSize > 0 ? FileSize - sizeof(FFooter) : -1;
if ((EntriesPos < 0) || (FileHandle->Seek(EntriesPos) == false))
{
FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::WriteError, 0);
return FIoStatusBuilder(EIoErrorCode::FileOpenFailed)
<< TEXT("Failed to open CAS journal '")
<< Transaction.JournalFile
<< TEXT("'");
}
const int64 TotalEntrySize = Transaction.Entries.Num() * sizeof(FEntry);
if (FileHandle->Write(
reinterpret_cast<const uint8*>(Transaction.Entries.GetData()),
TotalEntrySize) == false)
{
FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::WriteError, 0);
return FIoStatusBuilder(EIoErrorCode::WriteError)
<< TEXT("Failed to write CAS journal entries to '")
<< Transaction.JournalFile
<< TEXT("'");
}
FFooter Footer;
FMemory::Memcpy(&Footer.Magic, &FFooter::MagicSequence, sizeof(FFooter::MagicSequence));
if (FileHandle->Write(reinterpret_cast<uint8*>(&Footer), sizeof(FFooter)) == false)
{
FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::WriteError, 0);
return FIoStatusBuilder(EIoErrorCode::WriteError)
<< TEXT("Failed to write CAS journal footer to '")
<< Transaction.JournalFile
<< TEXT("'");
}
if (FileHandle->Flush() == false)
{
FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::WriteError, 0);
return EIoErrorCode::WriteError;
}
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Committed %d CAS journal entries of total %.2lf KiB to '%s'"),
Transaction.Entries.Num(), ToKiB(TotalEntrySize), *Transaction.JournalFile);
FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::Ok, TotalEntrySize);
return EIoErrorCode::Ok;
}
}
void FCasJournal::FTransaction::ChunkLocation(const FCasLocation& Location, const FCasAddr& Addr)
{
Entries.AddZeroed_GetRef().ChunkLocation = FEntry::FChunkLocation
{
.CasLocation = Location,
.CasAddr = Addr
};
}
void FCasJournal::FTransaction::BlockCreated(FCasBlockId BlockId)
{
Entries.AddZeroed_GetRef().BlockOperation = FEntry::FBlockOperation
{
.Type = FEntry::EType::BlockCreated,
.BlockId = BlockId,
.UtcTicks = FDateTime::UtcNow().GetTicks()
};
}
void FCasJournal::FTransaction::BlockDeleted(FCasBlockId BlockId)
{
Entries.AddZeroed_GetRef().BlockOperation = FEntry::FBlockOperation
{
.Type = FEntry::EType::BlockDeleted,
.BlockId = BlockId,
.UtcTicks = FDateTime::UtcNow().GetTicks()
};
}
void FCasJournal::FTransaction::BlockAccess(FCasBlockId BlockId, int64 UtcTicks)
{
Entries.AddZeroed_GetRef().BlockOperation = FEntry::FBlockOperation
{
.Type = FEntry::EType::BlockAccess,
.BlockId = BlockId,
.UtcTicks = UtcTicks
};
}
///////////////////////////////////////////////////////////////////////////////
struct FCasSnapshot
{
enum class EVersion : uint32
{
Invalid = 0,
Initial,
LatestPlusOne,
Latest = LatestPlusOne - 1
};
struct FHeader
{
static const inline uint8 MagicSequence[16] = {'+', 'S', 'N', 'A', 'P', 'S', 'H', 'O', 'T', 'H', 'E', 'A', 'D', 'E', 'R', '+'};
bool IsValid() const;
uint8 Magic[16] = {0};
EVersion Version = EVersion::Invalid;
uint8 Pad[12] = {0};
};
static_assert(sizeof(FHeader) == 32);
struct FFooter
{
static const inline uint8 MagicSequence[16] = {'+', 'S', 'N', 'A', 'P', 'S', 'H', 'O', 'T', 'F', 'O', 'O', 'T', 'E', 'R', '+'};
bool IsValid() const;
uint8 Magic[16] = {0};
};
static_assert(sizeof(FFooter) == 16);
struct FBlock
{
friend FArchive& operator<<(FArchive& Ar, FBlock& Block)
{
Ar << Block.BlockId;
Ar << Block.LastAccess;
return Ar;
}
FCasBlockId BlockId;
int64 LastAccess;
};
using FChunkLocation = TPair<FCasAddr, FCasLocation>;
static TIoStatusOr<FCasSnapshot> FromJournal(const FString& JournalFile);
static TIoStatusOr<FCasSnapshot> Load(const FString& SnapshotFile, int64* OutFileSize = nullptr);
static TIoStatusOr<int64> Save(const FCasSnapshot& Snapshot, const FString& SnapshotFile);
static TIoStatusOr<int64> TryCreateAndResetJournal(const FString& SnapshotFile, const FString& JournalFile);
TArray<FBlock> Blocks;
TArray<FChunkLocation> ChunkLocations;
FCasBlockId CurrentBlockId;
};
///////////////////////////////////////////////////////////////////////////////
bool FCasSnapshot::FHeader::IsValid() const
{
if (FMemory::Memcmp(&Magic, &FHeader::MagicSequence, sizeof(FHeader::MagicSequence)) != 0)
{
return false;
}
if (static_cast<uint32>(Version) > static_cast<uint32>(EVersion::Latest))
{
return false;
}
return true;
}
bool FCasSnapshot::FFooter::IsValid() const
{
return FMemory::Memcmp(Magic, FFooter::MagicSequence, sizeof(FFooter::MagicSequence)) == 0;
}
TIoStatusOr<FCasSnapshot> FCasSnapshot::FromJournal(const FString& JournalFile)
{
TMap<FCasAddr, FCasLocation> CasLookup;
TMap<FCasBlockId, int64> LastAccess;
TSet<FCasBlockId> BlockIds;
FCasBlockId CurrentBlockId;
FIoStatus ReplayStatus = FCasJournal::Replay(
JournalFile,
[&CasLookup, &LastAccess, &BlockIds, &CurrentBlockId](const FCasJournal::FEntry& JournalEntry)
{
switch(JournalEntry.Type())
{
case FCasJournal::FEntry::EType::ChunkLocation:
{
const FCasJournal::FEntry::FChunkLocation& ChunkLocation = JournalEntry.ChunkLocation;
if (ChunkLocation.CasLocation.IsValid())
{
FCasLocation& Loc = CasLookup.FindOrAdd(ChunkLocation.CasAddr);
Loc = ChunkLocation.CasLocation;
}
else
{
CasLookup.Remove(ChunkLocation.CasAddr);
}
break;
}
case FCasJournal::FEntry::EType::BlockCreated:
{
const FCasJournal::FEntry::FBlockOperation& Op = JournalEntry.BlockOperation;
CurrentBlockId = Op.BlockId;
BlockIds.Add(Op.BlockId);
break;
}
case FCasJournal::FEntry::EType::BlockDeleted:
{
const FCasJournal::FEntry::FBlockOperation& Op = JournalEntry.BlockOperation;
BlockIds.Remove(Op.BlockId);
if (CurrentBlockId == Op.BlockId)
{
CurrentBlockId = FCasBlockId::Invalid;
}
break;
}
case FCasJournal::FEntry::EType::BlockAccess:
{
const FCasJournal::FEntry::FBlockOperation& Op = JournalEntry.BlockOperation;
LastAccess.Add(Op.BlockId, Op.UtcTicks);
break;
}
};
});
FCasSnapshot Snapshot;
Snapshot.Blocks.Reserve(BlockIds.Num());
for (FCasBlockId BlockId : BlockIds)
{
const int64* AccessTime = LastAccess.Find(BlockId);
Snapshot.Blocks.Add(FBlock
{
.BlockId = BlockId,
.LastAccess = AccessTime != nullptr ? *AccessTime : FDateTime::UtcNow().GetTicks()
});
}
Snapshot.ChunkLocations = CasLookup.Array();
Snapshot.CurrentBlockId = CurrentBlockId;
return Snapshot;
}
TIoStatusOr<int64> FCasSnapshot::Save(const FCasSnapshot& Snapshot, const FString& SnapshotFile)
{
IFileManager& Ifm = IFileManager::Get();
const FString TmpSnapshotFile = FPaths::ChangeExtension(SnapshotFile, TEXT(".snptmp"));
TUniquePtr<FArchive> Ar(Ifm.CreateFileWriter(*TmpSnapshotFile));
if (Ar.IsValid() == false)
{
return FIoStatus(EIoErrorCode::FileNotOpen);
}
FHeader Header;
FMemory::Memcpy(&Header.Magic, &FHeader::MagicSequence, sizeof(FHeader::MagicSequence));
Header.Version = EVersion::Latest;
Ar->Serialize(reinterpret_cast<uint8*>(&Header), sizeof(FHeader));
if (Ar->IsError())
{
Ar.Reset();
Ifm.Delete(*TmpSnapshotFile);
return FIoStatus(EIoErrorCode::WriteError);
}
FCasSnapshot& NonConst = *const_cast<FCasSnapshot*>(&Snapshot);
*Ar << NonConst.Blocks;
*Ar << NonConst.ChunkLocations;
*Ar << NonConst.CurrentBlockId;
if (Ar->IsError())
{
Ar.Reset();
Ifm.Delete(*TmpSnapshotFile);
return FIoStatus(EIoErrorCode::WriteError);
}
FFooter Footer;
FMemory::Memcpy(&Footer.Magic, &FFooter::MagicSequence, sizeof(FFooter::MagicSequence));
Ar->Serialize(reinterpret_cast<uint8*>(&Footer), sizeof(FFooter));
if (Ar->IsError())
{
Ar.Reset();
Ifm.Delete(*TmpSnapshotFile);
return FIoStatus(EIoErrorCode::WriteError);
}
const int64 FileSize = Ar->TotalSize();
if (Ar->Close() == false)
{
Ar.Reset();
Ifm.Delete(*TmpSnapshotFile);
return FIoStatus(EIoErrorCode::WriteError);
}
if (Ifm.Move(*SnapshotFile, *TmpSnapshotFile) == false)
{
Ifm.Delete(*TmpSnapshotFile);
return FIoStatus(EIoErrorCode::WriteError);
}
return FileSize;
}
TIoStatusOr<FCasSnapshot> FCasSnapshot::Load(const FString& SnapshotFile, int64* OutFileSize)
{
IFileManager& Ifm = IFileManager::Get();
TUniquePtr<FArchive> Ar(Ifm.CreateFileReader(*SnapshotFile));
if (Ar.IsValid() == false)
{
return FIoStatus(EIoErrorCode::NotFound);
}
FHeader Header;
Ar->Serialize(reinterpret_cast<uint8*>(&Header), sizeof(FHeader));
if (Ar->IsError() || Header.IsValid() == false)
{
FIoStatus Status = FIoStatusBuilder(EIoErrorCode::ReadError)
<< TEXT("Failed to validate snapshot header '")
<< SnapshotFile
<< TEXT("'");
return Status;
}
FCasSnapshot Snapshot;
*Ar << Snapshot.Blocks;
*Ar << Snapshot.ChunkLocations;
*Ar << Snapshot.CurrentBlockId;
FFooter Footer;
Ar->Serialize(reinterpret_cast<uint8*>(&Footer), sizeof(FFooter));
if (Ar->IsError() || Footer.IsValid() == false)
{
FIoStatus Status = FIoStatusBuilder(EIoErrorCode::ReadError)
<< TEXT("Failed to validate snapshot footer '")
<< SnapshotFile
<< TEXT("'");
return Status;
}
if (OutFileSize != nullptr)
{
*OutFileSize = Ar->Tell();
}
return Snapshot;
}
TIoStatusOr<int64> FCasSnapshot::TryCreateAndResetJournal(const FString& SnapshotFile, const FString& JournalFile)
{
IFileManager& Ifm = IFileManager::Get();
const int64 JournalFileSize = Ifm.FileSize(*JournalFile);
if (JournalFileSize < 0)
{
return FIoStatus(EIoErrorCode::NotFound);
}
// Load the snapshot from the journal
TIoStatusOr<FCasSnapshot> SnapshotStatus = FCasSnapshot::FromJournal(JournalFile);
if (SnapshotStatus.IsOk() == false)
{
return SnapshotStatus.Status();
}
// Save the snapshot
int64 SnapshotSize = -1;
FCasSnapshot Snapshot = SnapshotStatus.ConsumeValueOrDie();
if (TIoStatusOr<int64> Status = FCasSnapshot::Save(Snapshot, SnapshotFile); Status.IsOk())
{
SnapshotSize = Status.ConsumeValueOrDie();
}
else
{
return Status.Status();
}
// Try create a new empty journal
const FString TmpJournalFile = FPaths::ChangeExtension(JournalFile, TEXT(".jrntmp"));
if (FIoStatus Status = FCasJournal::Create(TmpJournalFile); Status.IsOk() == false)
{
if (Ifm.Delete(*SnapshotFile) == false)
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to delete CAS snapshot '%s'"), *SnapshotFile);
}
return Status;
}
if (Ifm.Move(*JournalFile , *TmpJournalFile) == false)
{
if (Ifm.Delete(*SnapshotFile) == false)
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to delete CAS snapshot '%s'"), *SnapshotFile);
}
return FIoStatus(EIoErrorCode::WriteError);
}
return SnapshotSize;
}
///////////////////////////////////////////////////////////////////////////////
class FOnDemandInstallCache final
: public IOnDemandInstallCache
{
using FSharedBackendContextRef = TSharedRef<const FIoDispatcherBackendContext>;
using FSharedBackendContext = TSharedPtr<const FIoDispatcherBackendContext>;
struct FChunkRequest
{
explicit FChunkRequest(
FSharedAsyncFileHandle FileHandle,
FIoRequestImpl* Request,
FOnDemandChunkInfo&& Info,
FIoOffsetAndLength Range,
uint64 RequestedRawSize)
: SharedFileHandle(FileHandle)
, DispatcherRequest(Request)
, ChunkInfo(MoveTemp(Info))
, ChunkRange(Range)
, EncodedChunk(ChunkRange.GetLength())
, RawSize(RequestedRawSize)
{
check(DispatcherRequest != nullptr);
check(ChunkInfo.IsValid());
check(Request->NextRequest == nullptr);
check(Request->BackendData == nullptr);
}
static FChunkRequest* Get(FIoRequestImpl& Request)
{
return reinterpret_cast<FChunkRequest*>(Request.BackendData);
}
static FChunkRequest& GetRef(FIoRequestImpl& Request)
{
check(Request.BackendData);
return *reinterpret_cast<FChunkRequest*>(Request.BackendData);
}
static FChunkRequest& Attach(FIoRequestImpl& Request, FChunkRequest* ChunkRequest)
{
check(Request.BackendData == nullptr);
check(ChunkRequest != nullptr);
Request.BackendData = ChunkRequest;
return *ChunkRequest;
}
static TUniquePtr<FChunkRequest> Detach(FIoRequestImpl& Request)
{
void* ChunkRequest = nullptr;
Swap(ChunkRequest, Request.BackendData);
return TUniquePtr<FChunkRequest>(reinterpret_cast<FChunkRequest*>(ChunkRequest));
}
FSharedAsyncFileHandle SharedFileHandle;
TUniquePtr<IAsyncReadRequest> FileReadRequest;
FIoRequestImpl* DispatcherRequest;
FOnDemandChunkInfo ChunkInfo;
FIoOffsetAndLength ChunkRange;
FIoBuffer EncodedChunk;
uint64 RawSize;
};
struct FPendingChunks
{
static constexpr uint64 MaxPendingBytes = 4ull << 20;
bool IsEmpty() const
{
check(Chunks.Num() == ChunkHashes.Num());
return TotalSize == 0 && Chunks.IsEmpty() && ChunkHashes.IsEmpty();
}
void Append(FIoBuffer&& Chunk, const FIoHash& ChunkHash)
{
check(Chunks.Num() == ChunkHashes.Num());
TotalSize += Chunk.GetSize();
ChunkHashes.Add(ChunkHash);
Chunks.Add(MoveTemp(Chunk));
}
FIoBuffer Pop(FIoHash& OutChunkHash)
{
check(Chunks.Num() == ChunkHashes.Num());
check(Chunks.IsEmpty() == false);
FIoBuffer Chunk = Chunks.Pop(EAllowShrinking::No);
TotalSize = TotalSize - Chunk.GetSize();
OutChunkHash = ChunkHashes.Pop(EAllowShrinking::No);
return Chunk;
}
void Reset()
{
Chunks.Reset();
ChunkHashes.Reset();
TotalSize = 0;
}
TArray<FIoBuffer> Chunks;
TArray<FIoHash> ChunkHashes;
uint64 TotalSize = 0;
};
using FUniquePendingChunks = TUniquePtr<FPendingChunks>;
public:
FOnDemandInstallCache(const FOnDemandInstallCacheConfig& Config, FOnDemandIoStore& IoStore);
virtual ~FOnDemandInstallCache();
// IIoDispatcherBackend
virtual void Initialize(FSharedBackendContextRef Context) override;
virtual void Shutdown() override;
virtual void ResolveIoRequests(FIoRequestList Requests, FIoRequestList& OutUnresolved) override;
virtual FIoRequestImpl* GetCompletedIoRequests() override;
virtual void CancelIoRequest(FIoRequestImpl* Request) override;
virtual void UpdatePriorityForIoRequest(FIoRequestImpl* Request) override;
virtual bool DoesChunkExist(const FIoChunkId& ChunkId) const override;
virtual TIoStatusOr<uint64> GetSizeForChunk(const FIoChunkId& ChunkId) const override;
virtual TIoStatusOr<FIoMappedRegion> OpenMapped(const FIoChunkId& ChunkId, const FIoReadOptions& Options) override;
virtual const TCHAR* GetName() const override;
// IOnDemandInstallCache
virtual bool IsChunkCached(const FIoHash& ChunkHash) override;
virtual FIoStatus PutChunk(FIoBuffer&& Chunk, const FIoHash& ChunkHash) override;
virtual FIoStatus Purge(TSet<FIoHash>&& ChunksToInstall) override;
virtual FIoStatus PurgeAllUnreferenced(bool bDefrag, const uint64* BytesToPurge = nullptr) override;
virtual FIoStatus DefragAll(const uint64* BytesToFree = nullptr) override;
virtual FIoStatus Verify() override;
virtual FIoStatus Flush() override;
virtual FOnDemandInstallCacheUsage GetCacheUsage() override;
private:
FIoStatus Reinitialize();
FIoStatus InitialVerify();
uint64 AddReferencesToBlocks(
const TArray<FSharedOnDemandContainer>& Containers,
const TArray<TBitArray<>>& ChunkEntryIndices,
const TSet<FIoHash>& ChunksToInstall,
FCasBlockInfoMap& BlockInfoMap,
uint64& OutTotalReferencedBytes) const;
FIoStatus Purge(FCasBlockInfoMap& BlockInfo, uint64 TotalBytesToPurge, uint64& OutTotalPurgedBytes);
FIoStatus Defrag(
const TArray<FSharedOnDemandContainer>& Containers,
const TArray<TBitArray<>>& ChunkEntryIndices,
FCasBlockInfoMap& BlockInfo,
const uint64* TotalBytesToFree = nullptr);
bool Resolve(FIoRequestImpl* Request);
void CompleteRequest(FIoRequestImpl* Request, EIoErrorCode Status);
FIoStatus FlushPendingChunks(FPendingChunks& Chunks, int64 UtcAccessTicks = 0);
FIoStatus FlushPendingChunksImpl(FPendingChunks& Chunks, int64 UtcAccessTicks = 0);
FString GetJournalFilename() const { return CacheDirectory / TEXT("cas.jrn"); }
FString GetSnapshotFilename() const { return CacheDirectory / TEXT("cas.snp"); }
FOnDemandIoStore& IoStore;
FString CacheDirectory;
FCas Cas;
std::atomic<FCasBlockId> CurrentBlock{ FCasBlockId::Invalid };
FUniquePendingChunks PendingChunks;
FSharedBackendContext BackendContext;
FIoRequestList CompletedRequests;
UE::FMutex Mutex;
uint64 MaxCacheSize{ 0 };
uint64 MaxJournalSize;
#if UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE
UE::Tasks::FPipe ExclusivePipe{ UE_SOURCE_LOCATION };
#endif // UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE
};
///////////////////////////////////////////////////////////////////////////////
FOnDemandInstallCache::FOnDemandInstallCache(const FOnDemandInstallCacheConfig& Config, FOnDemandIoStore& InIoStore)
: IoStore(InIoStore)
, CacheDirectory(Config.RootDirectory)
, MaxCacheSize(Config.DiskQuota)
, MaxJournalSize(Config.JournalMaxSize)
{
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Initializing install cache, MaxCacheSize=%.2lf MiB, MaxJournalSize=%.2lf KiB"),
ToMiB(MaxCacheSize), ToKiB(MaxJournalSize));
const uint64 MinDiskQuota = 2 * Cas.MaxBlockSize;
if (MaxCacheSize < MinDiskQuota)
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to initialize install cache - disk quota must be at least %.2lf MiB"), ToMiB(MinDiskQuota));
return;
}
// Reserve one block of space for defragmentation overhead
MaxCacheSize -= Cas.MaxBlockSize;
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Effective MaxCacheSize without defragmentation space is MaxCacheSize=%.2lf MiB"), ToMiB(MaxCacheSize));
FIoStatus Status = Cas.Initialize(CacheDirectory);
if (Status.IsOk() == false)
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to initialize install cache, reason '%s'"), *Status.ToString());
return;
}
// Try read the journal snapshot
{
const FString SnapshotFile = GetSnapshotFilename();
int64 SnapshotSize = -1;
TIoStatusOr<FCasSnapshot> SnapshotStatus = FCasSnapshot::Load(SnapshotFile, &SnapshotSize);
if (SnapshotStatus.IsOk())
{
FCasSnapshot Snapshot = SnapshotStatus.ConsumeValueOrDie();
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Loaded CAS snapshot '%s' %.2lf KiB with %d blocks and %d chunk locations"),
*SnapshotFile, ToKiB(SnapshotSize), Snapshot.Blocks.Num(), Snapshot.ChunkLocations.Num());
Cas.Lookup.Reserve(Snapshot.ChunkLocations.Num());
for (TPair<FCasAddr, FCasLocation>& Kv : Snapshot.ChunkLocations)
{
Cas.Lookup.Add(MoveTemp(Kv));
}
Cas.BlockIds.Reserve(Snapshot.Blocks.Num());
Cas.LastAccess.Reserve(Snapshot.Blocks.Num());
for (const FCasSnapshot::FBlock& Block : Snapshot.Blocks)
{
Cas.BlockIds.Add(Block.BlockId, 0);
Cas.LastAccess.Add(Block.BlockId, Block.LastAccess);
}
}
}
// Replay the journal
const FString JournalFile = GetJournalFilename();
Status = FCasJournal::Replay(JournalFile, [this](const FCasJournal::FEntry& JournalEntry)
{
switch(JournalEntry.Type())
{
case FCasJournal::FEntry::EType::ChunkLocation:
{
const FCasJournal::FEntry::FChunkLocation& ChunkLocation = JournalEntry.ChunkLocation;
if (ChunkLocation.CasLocation.IsValid())
{
FCasLocation& Loc = Cas.Lookup.FindOrAdd(ChunkLocation.CasAddr);
Loc = ChunkLocation.CasLocation;
}
else
{
Cas.Lookup.Remove(ChunkLocation.CasAddr);
}
break;
}
case FCasJournal::FEntry::EType::BlockCreated:
{
const FCasJournal::FEntry::FBlockOperation& Op = JournalEntry.BlockOperation;
CurrentBlock = Op.BlockId;
Cas.BlockIds.Add(Op.BlockId, 0);
break;
}
case FCasJournal::FEntry::EType::BlockDeleted:
{
const FCasJournal::FEntry::FBlockOperation& Op = JournalEntry.BlockOperation;
Cas.BlockIds.Remove(Op.BlockId);
FCasBlockId MaybeCurrentBlock = Op.BlockId;
CurrentBlock.compare_exchange_strong(MaybeCurrentBlock, FCasBlockId::Invalid);
break;
}
case FCasJournal::FEntry::EType::BlockAccess:
{
const FCasJournal::FEntry::FBlockOperation& Op = JournalEntry.BlockOperation;
Cas.TrackAccess(Op.BlockId, Op.UtcTicks);
break;
}
};
});
// Initializing the cache for the first time
if (Status.GetErrorCode() == EIoErrorCode::NotFound)
{
if (Status = FCasJournal::Create(JournalFile); Status.IsOk())
{
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Created CAS journal '%s'"), *JournalFile);
// Make sure that there are no existing blocks when starting from an empty cache
const bool bDeleteExisting = true;
Status = Cas.Initialize(CacheDirectory, bDeleteExisting);
}
else
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to create CAS journal '%s'"), *JournalFile);
}
}
// Verify the current state of the cache
if (Status.IsOk())
{
Status = InitialVerify();
}
// Try to reinitialize the cache if something has gone wrong
if (Status.IsOk() == false)
{
Status = Reinitialize();
}
if (Status.IsOk())
{
Cas.Compact();
}
else
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to initialize install cache, reason '%s'"),
*Status.ToString());
}
}
FOnDemandInstallCache::~FOnDemandInstallCache()
{
}
void FOnDemandInstallCache::Initialize(FSharedBackendContextRef Context)
{
BackendContext = Context;
}
void FOnDemandInstallCache::Shutdown()
{
FCas::FLastAccess LastAccess;
{
TUniqueLock Lock(Cas.Mutex);
LastAccess = MoveTemp(Cas.LastAccess);
}
const FString JournalFile = GetJournalFilename();
FCasJournal::FTransaction Transaction = FCasJournal::Begin(JournalFile);
for (const TPair<FCasBlockId, int64>& Kv : LastAccess)
{
Transaction.BlockAccess(Kv.Key, Kv.Value);
}
if (FIoStatus Status = FCasJournal::Commit(MoveTemp(Transaction)); !Status.IsOk())
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to update CAS journal '%s' with block timestamp(s), reason '%s'"),
*JournalFile, *Status.ToString());
}
IFileManager& Ifm = IFileManager::Get();
const FString JournalFilename = GetJournalFilename();
if (Ifm.FileSize(*JournalFile) > int64(MaxJournalSize))
{
const FString SnapshotFilename = GetSnapshotFilename();
TIoStatusOr<int64> SnapshotStatus = FCasSnapshot::TryCreateAndResetJournal(SnapshotFilename, JournalFilename);
if (SnapshotStatus.IsOk())
{
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Saved CAS snapshot '%s' %.2lf KiB"), *SnapshotFilename, ToKiB(SnapshotStatus.ValueOrDie()));
}
else
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to create CAS snapshot from journal '%s', reason '%s'"),
*JournalFile, *SnapshotStatus.Status().ToString());
}
}
}
void FOnDemandInstallCache::ResolveIoRequests(FIoRequestList Requests, FIoRequestList& OutUnresolved)
{
while (FIoRequestImpl* Request = Requests.PopHead())
{
if (Resolve(Request) == false)
{
OutUnresolved.AddTail(Request);
}
}
}
FIoRequestImpl* FOnDemandInstallCache::GetCompletedIoRequests()
{
FIoRequestImpl* FirstCompleted = nullptr;
{
UE::TUniqueLock Lock(Mutex);
for (FIoRequestImpl& Completed : CompletedRequests)
{
TUniquePtr<FChunkRequest> Detached = FChunkRequest::Detach(Completed);
}
FirstCompleted = CompletedRequests.GetHead();
CompletedRequests = FIoRequestList();
}
return FirstCompleted;
}
void FOnDemandInstallCache::CancelIoRequest(FIoRequestImpl* Request)
{
check(Request != nullptr);
UE::TUniqueLock Lock(Mutex);
if (FChunkRequest* ChunkRequest = FChunkRequest::Get(*Request))
{
if (ChunkRequest->FileReadRequest.IsValid())
{
ChunkRequest->FileReadRequest->Cancel();
}
}
}
void FOnDemandInstallCache::UpdatePriorityForIoRequest(FIoRequestImpl* Request)
{
}
bool FOnDemandInstallCache::DoesChunkExist(const FIoChunkId& ChunkId) const
{
EIoErrorCode ErrorCode = EIoErrorCode::UnknownChunkID;
if (FOnDemandChunkInfo ChunkInfo = IoStore.GetInstalledChunkInfo(ChunkId, ErrorCode))
{
const FCasLocation CasLoc = Cas.FindChunk(ChunkInfo.Hash());
return CasLoc.IsValid();
}
return false;
}
TIoStatusOr<uint64> FOnDemandInstallCache::GetSizeForChunk(const FIoChunkId& ChunkId) const
{
EIoErrorCode ErrorCode = EIoErrorCode::UnknownChunkID;
if (FOnDemandChunkInfo ChunkInfo = IoStore.GetInstalledChunkInfo(ChunkId, ErrorCode))
{
return ChunkInfo.RawSize();
}
return FIoStatus(ErrorCode);
}
TIoStatusOr<FIoMappedRegion> FOnDemandInstallCache::OpenMapped(const FIoChunkId& ChunkId, const FIoReadOptions& Options)
{
return FIoStatus(EIoErrorCode::FileOpenFailed);
}
const TCHAR* FOnDemandInstallCache::GetName() const
{
return TEXT("OnDemandInstallCache");
}
bool FOnDemandInstallCache::Resolve(FIoRequestImpl* Request)
{
EIoErrorCode ErrorCode = EIoErrorCode::UnknownChunkID;
FOnDemandChunkInfo ChunkInfo = IoStore.GetInstalledChunkInfo(Request->ChunkId, ErrorCode);
if (ChunkInfo.IsValid() == false)
{
if (ErrorCode == EIoErrorCode::NotInstalled)
{
CompleteRequest(Request, EIoErrorCode::NotInstalled);
return true;
}
return false;
}
const FCasLocation CasLoc = Cas.FindChunk(ChunkInfo.Hash());
if (CasLoc.IsValid() == false)
{
CompleteRequest(Request, EIoErrorCode::NotInstalled);
return true;
}
const uint64 RequestSize = FMath::Min<uint64>(
Request->Options.GetSize(),
ChunkInfo.RawSize() - Request->Options.GetOffset());
TIoStatusOr<FIoOffsetAndLength> ChunkRange = FIoChunkEncoding::GetChunkRange(
ChunkInfo.RawSize(),
ChunkInfo.BlockSize(),
ChunkInfo.Blocks(),
Request->Options.GetOffset(),
RequestSize);
if (ChunkRange.IsOk() == false)
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to get chunk range"));
CompleteRequest(Request, ChunkRange.Status().GetErrorCode());
return true;
}
TRACE_IOSTORE_BACKEND_REQUEST_STARTED(Request, this);
Cas.TrackAccess(CasLoc.BlockId);
#if UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE
const bool bIsLocationInCurrentBlock = CasLoc.BlockId == CurrentBlock;
if (bIsLocationInCurrentBlock)
{
// The current block may have open writes which may cause async reads to fail
// on some platforms. Schedule the reads to happen on the same pipe as writes
// The internal request parameters are attached/owned by the I/O request via
// the backend data parameter. The chunk request is deleted in GetCompletedRequests
FChunkRequest::Attach(*Request, new FChunkRequest(
FSharedAsyncFileHandle(),
Request,
MoveTemp(ChunkInfo),
ChunkRange.ConsumeValueOrDie(),
RequestSize));
ExclusivePipe.Launch(UE_SOURCE_LOCATION, [this, Request, CasLoc]
{
FChunkRequest& ChunkRequest = FChunkRequest::GetRef(*Request);
EIoErrorCode Status = EIoErrorCode::FileOpenFailed;
const FString Filename = Cas.GetBlockFilename(CasLoc.BlockId);
FSharedFileOpenResult FileOpenResult = Cas.OpenRead(CasLoc.BlockId);
if (FileOpenResult.IsValid())
{
Status = EIoErrorCode::ReadError;
TSharedPtr<IFileHandle> FileHandle = FileOpenResult.StealValue();
const int64 CasBlockOffset = CasLoc.BlockOffset + ChunkRequest.ChunkRange.GetOffset();
if (Request->IsCancelled())
{
UE_LOG(LogIoStoreOnDemand, Verbose, TEXT("Cancelled request - skipped seek to offset %lld in CAS block '%s'"), CasBlockOffset, *Filename);
}
else if (FileHandle->Seek(CasBlockOffset))
{
const bool bOk = FileHandle->Read(ChunkRequest.EncodedChunk.GetData(), ChunkRequest.EncodedChunk.GetSize());
if (bOk)
{
Status = EIoErrorCode::Ok;
}
else
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to read %llu bytes at offset %lld in CAS block '%s'"),
ChunkRequest.EncodedChunk.GetSize(),
CasBlockOffset,
*Filename);
}
}
else
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to seek to offset %lld in CAS block '%s'"), CasBlockOffset, *Filename);
}
}
else
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to open CAS block '%s' for reading, reason '%s'"),
*Filename, *FileOpenResult.GetError().GetMessage());
}
UE::Tasks::Launch(UE_SOURCE_LOCATION, [this, Request, Status]
{
CompleteRequest(Request, Status);
});
}, UE::Tasks::ETaskPriority::BackgroundHigh);
return true;
}
#endif // UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE
FSharedFileOpenAsyncResult FileOpenResult = Cas.OpenAsyncRead(CasLoc.BlockId);
if (FileOpenResult.HasError())
{
const FString Filename = Cas.GetBlockFilename(CasLoc.BlockId);
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to open CAS block '%s' for async reading, reason '%s'"), *Filename, *FileOpenResult.GetError().GetMessage());
TUniquePtr<FChunkRequest> Detached = FChunkRequest::Detach(*Request);
CompleteRequest(Request, EIoErrorCode::FileOpenFailed);
return true;
}
FSharedAsyncFileHandle FileHandle(FileOpenResult.StealValue());
// The internal request parameters are attached/owned by the I/O request via
// the backend data parameter. The chunk request is deleted in GetCompletedRequests
FChunkRequest& ChunkRequest = FChunkRequest::Attach(*Request, new FChunkRequest(
FileHandle,
Request,
MoveTemp(ChunkInfo),
ChunkRange.ConsumeValueOrDie(),
RequestSize));
FAsyncFileCallBack Callback = [this, Request](bool bWasCancelled, IAsyncReadRequest* ReadRequest)
{
UE::Tasks::Launch(UE_SOURCE_LOCATION, [this, Request, bWasCancelled]
{
const EIoErrorCode Status = bWasCancelled ? EIoErrorCode::ReadError : EIoErrorCode::Ok;
CompleteRequest(Request, Status);
});
};
ChunkRequest.FileReadRequest.Reset(FileHandle->ReadRequest(
CasLoc.BlockOffset + ChunkRequest.ChunkRange.GetOffset(),
ChunkRequest.ChunkRange.GetLength(),
EAsyncIOPriorityAndFlags::AIOP_BelowNormal,
&Callback,
ChunkRequest.EncodedChunk.GetData()));
if (ChunkRequest.FileReadRequest.IsValid() == false)
{
TRACE_IOSTORE_BACKEND_REQUEST_FAILED(Request);
TUniquePtr<FChunkRequest> Detached = FChunkRequest::Detach(*Request);
CompleteRequest(Request, EIoErrorCode::ReadError);
return true;
}
return true;
}
bool FOnDemandInstallCache::IsChunkCached(const FIoHash& ChunkHash)
{
const FCasLocation Loc = Cas.FindChunk(ChunkHash);
return Loc.IsValid();
}
FIoStatus FOnDemandInstallCache::PutChunk(FIoBuffer&& Chunk, const FIoHash& ChunkHash)
{
if (PendingChunks.IsValid() == false)
{
PendingChunks = MakeUnique<FPendingChunks>();
}
if (PendingChunks->TotalSize > FPendingChunks::MaxPendingBytes)
{
if (FIoStatus Status = FlushPendingChunks(*PendingChunks); Status.IsOk() == false)
{
return Status;
}
check(PendingChunks->IsEmpty());
}
PendingChunks->Append(MoveTemp(Chunk), ChunkHash);
return FIoStatus::Ok;
}
FIoStatus FOnDemandInstallCache::Purge(TSet<FIoHash>&& ChunksToInstall)
{
FCasBlockInfoMap BlockInfo;
const uint64 TotalCachedBytes = Cas.GetBlockInfo(BlockInfo);
TArray<FSharedOnDemandContainer> Containers;
TArray<TBitArray<>> ChunkEntryIndices;
IoStore.GetReferencedContent(Containers, ChunkEntryIndices);
check(Containers.Num() == ChunkEntryIndices.Num());
uint64 ReferencedBytes = 0;
uint64 FragmentedBytes = 0;
uint64 TotalReferencedBlockBytes = 0;
int64 OldestBlockAccess = FDateTime::MaxValue().GetTicks();
const uint64 TotalUncachedBytes = AddReferencesToBlocks(Containers, ChunkEntryIndices, ChunksToInstall, BlockInfo, ReferencedBytes);
for (const TPair<FCasBlockId, FCasBlockInfo>& Kv : BlockInfo)
{
const FCasBlockInfo& Info = Kv.Value;
if (Info.RefSize < Info.FileSize)
{
FragmentedBytes += (Info.FileSize - Info.RefSize);
}
if (Info.RefSize > 0)
{
TotalReferencedBlockBytes += Info.FileSize;
}
if (Info.LastAccess < OldestBlockAccess)
{
OldestBlockAccess = Info.LastAccess;
}
}
FOnDemandInstallCacheStats::OnCacheUsage(
MaxCacheSize, TotalCachedBytes, TotalReferencedBlockBytes, ReferencedBytes, FragmentedBytes, OldestBlockAccess);
const uint64 TotalRequiredBytes = TotalCachedBytes + TotalUncachedBytes;
if (TotalRequiredBytes <= MaxCacheSize)
{
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Skipping cache purge, MaxCacheSize=%.2lf MiB, CacheSize=%.2lf MiB, ReferencedBlockSize=%.2lf MiB, ReferencedSize=%.2lf MiB, FragmentedBytes=%.2lf MiB, UncachedSize=%.2lf MiB"),
ToMiB(MaxCacheSize), ToMiB(TotalCachedBytes), ToMiB(TotalReferencedBlockBytes), ToMiB(ReferencedBytes), ToMiB(FragmentedBytes), ToMiB(TotalUncachedBytes));
return FIoStatus::Ok;
}
//TODO: Compute fragmentation metric and redownload chunks when this number gets too high
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Purging install cache, MaxCacheSize=%.2lf MiB, CacheSize=%.2lf MiB, ReferencedBlockSize=%.2lf MiB, ReferencedSize=%.2lf MiB, FragmentedBytes=%.2lf MiB, UncachedSize=%.2lf MiB"),
ToMiB(MaxCacheSize), ToMiB(TotalCachedBytes), ToMiB(TotalReferencedBlockBytes), ToMiB(ReferencedBytes), ToMiB(FragmentedBytes), ToMiB(TotalUncachedBytes));
const uint64 TotalBytesToPurge = TotalRequiredBytes - MaxCacheSize;
uint64 TotalPurgedBytes = 0;
FIoStatus Status = Purge(BlockInfo, TotalBytesToPurge, TotalPurgedBytes);
if (TotalPurgedBytes > 0)
{
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Purged %.2lf MiB (%.2lf%%) from install cache"),
ToMiB(TotalPurgedBytes), 100.0 * (double(TotalPurgedBytes) / double(TotalCachedBytes)));
}
const uint64 NewCachedBytes = TotalCachedBytes - TotalPurgedBytes;
UE_CLOG(NewCachedBytes > MaxCacheSize,
LogIoStoreOnDemand, Warning, TEXT("Max install cache size exceeded by %.2lf MiB (%.2lf%%)"),
ToMiB(NewCachedBytes - MaxCacheSize), 100.0 * (double(NewCachedBytes - MaxCacheSize) / double(MaxCacheSize)));
FOnDemandInstallCacheStats::OnPurge(Status.GetErrorCode(), MaxCacheSize, NewCachedBytes, TotalBytesToPurge, TotalPurgedBytes);
if (TotalPurgedBytes < TotalBytesToPurge)
{
if (UE::IoStore::CVars::GIoStoreOnDemandEnableDefrag)
{
// Attempt to defrag
const uint64 DefragBytesToPurge = TotalBytesToPurge - TotalPurgedBytes;
FIoStatus DefragStatus = Defrag(Containers, ChunkEntryIndices, BlockInfo, &DefragBytesToPurge);
if (DefragStatus.IsOk() == false)
{
return FIoStatusBuilder(EIoErrorCode::WriteError)
<< FString::Printf(TEXT("Failed to purge %" UINT64_FMT " from install cache after defrag (%s)"), TotalBytesToPurge, *DefragStatus.ToString());
}
}
else
{
return FIoStatusBuilder(EIoErrorCode::WriteError) << FString::Printf(TEXT("Failed to purge %" UINT64_FMT " from install cache"), TotalBytesToPurge);
}
}
return Status;
}
FIoStatus FOnDemandInstallCache::PurgeAllUnreferenced(bool bDefrag, const uint64* BytesToPurge /*= nullptr*/)
{
FCasBlockInfoMap BlockInfo;
const uint64 TotalCachedBytes = Cas.GetBlockInfo(BlockInfo);
TArray<FSharedOnDemandContainer> Containers;
TArray<TBitArray<>> ChunkEntryIndices;
IoStore.GetReferencedContent(Containers, ChunkEntryIndices);
check(Containers.Num() == ChunkEntryIndices.Num());
uint64 ReferencedBytes = 0;
AddReferencesToBlocks(Containers, ChunkEntryIndices, {}, BlockInfo, ReferencedBytes);
const uint64 TotalReferencedBytes = Algo::TransformAccumulate(BlockInfo,
[](const TPair<FCasBlockId, FCasBlockInfo>& Kv) { return (Kv.Value.RefSize > 0) ? Kv.Value.FileSize : uint64(0); },
uint64(0));
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Purging install cache, MaxCacheSize=%.2lf MiB, CacheSize=%.2lf MiB, ReferencedBytes=%.2lf MiB"),
ToMiB(MaxCacheSize), ToMiB(TotalCachedBytes), ToMiB(TotalReferencedBytes));
const uint64 TotalBytesToPurge = BytesToPurge ? *BytesToPurge : MaxCacheSize;
uint64 TotalPurgedBytes = 0;
FIoStatus Status = Purge(BlockInfo, TotalBytesToPurge, TotalPurgedBytes);
if (TotalPurgedBytes > 0)
{
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Purged %.2lf MiB (%.2lf%%) from install cache"),
ToMiB(TotalPurgedBytes), 100.0 * (double(TotalPurgedBytes) / double(TotalCachedBytes)));
}
const uint64 NewCachedBytes = TotalCachedBytes - TotalPurgedBytes;
UE_CLOG(NewCachedBytes > MaxCacheSize,
LogIoStoreOnDemand, Warning, TEXT("Max install cache size exceeded by %.2lf MiB (%.2lf%%)"),
ToMiB(NewCachedBytes - MaxCacheSize), 100.0 * (double(NewCachedBytes - MaxCacheSize) / double(MaxCacheSize)));
if (BytesToPurge)
{
if (bDefrag)
{
// Attempt to defrag
const uint64 DefragBytesToPurge = TotalBytesToPurge - TotalPurgedBytes;
FIoStatus DefragStatus = Defrag(Containers, ChunkEntryIndices, BlockInfo, &DefragBytesToPurge);
if (DefragStatus.IsOk() == false)
{
return FIoStatusBuilder(EIoErrorCode::WriteError)
<< FString::Printf(TEXT("Failed to purge %" UINT64_FMT " from install cache after defrag (%s)"), TotalBytesToPurge, *DefragStatus.ToString());
}
}
else
{
return FIoStatusBuilder(EIoErrorCode::WriteError) << FString::Printf(TEXT("Failed to purge %" UINT64_FMT " from install cache"), TotalBytesToPurge);
}
}
else if (bDefrag)
{
// Just do Full Defrag
FIoStatus DefragStatus = Defrag(Containers, ChunkEntryIndices, BlockInfo);
if (DefragStatus.IsOk() == false)
{
return DefragStatus;
}
}
return Status;
}
FIoStatus FOnDemandInstallCache::DefragAll(const uint64* BytesToFree /*= nullptr*/)
{
FCasBlockInfoMap BlockInfo;
const uint64 TotalCachedBytes = Cas.GetBlockInfo(BlockInfo);
TArray<FSharedOnDemandContainer> Containers;
TArray<TBitArray<>> ChunkEntryIndices;
IoStore.GetReferencedContent(Containers, ChunkEntryIndices);
check(Containers.Num() == ChunkEntryIndices.Num());
uint64 ReferencedBytes = 0;
AddReferencesToBlocks(Containers, ChunkEntryIndices, {}, BlockInfo, ReferencedBytes);
const uint64 TotalReferencedBlockBytes = Algo::TransformAccumulate(BlockInfo,
[](const TPair<FCasBlockId, FCasBlockInfo>& Kv) { return (Kv.Value.RefSize > 0) ? Kv.Value.FileSize : uint64(0); },
uint64(0));
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Defragmenting install cache, MaxCacheSize=%.2lf MiB, CacheSize=%.2lf MiB, ReferencedBlockSize=%.2lf MiB, ReferencedSize=%.2lf MiB"),
ToMiB(MaxCacheSize), ToMiB(TotalCachedBytes), ToMiB(TotalReferencedBlockBytes), ToMiB(ReferencedBytes));
return Defrag(Containers, ChunkEntryIndices, BlockInfo, BytesToFree);
}
FIoStatus FOnDemandInstallCache::Verify()
{
struct FChunkLookup
{
TMap<FCasAddr, int32> AddrToIndex;
};
struct FCasAddrLocation
{
FCasAddr Addr;
FCasLocation Location;
bool operator<(const FCasAddrLocation& Other) const
{
if (Location.BlockId == Other.Location.BlockId)
{
return Location.BlockOffset < Other.Location.BlockOffset;
}
return Location.BlockId.Id < Other.Location.BlockId.Id;
}
};
TArray<FSharedOnDemandContainer> Containers = IoStore.GetContainers(EOnDemandContainerFlags::InstallOnDemand);
TArray<FCasAddrLocation> ChunkLocations;
TArray<FChunkLookup> ChunkLookups;
{
TUniqueLock Lock(Cas.Mutex);
ChunkLocations.Reserve(Cas.Lookup.Num());
for (const TPair<FCasAddr, FCasLocation>& Kv : Cas.Lookup)
{
ChunkLocations.Add(FCasAddrLocation
{
.Addr = Kv.Key,
.Location = Kv.Value
});
}
}
ChunkLocations.Sort();
ChunkLookups.Reserve(Containers.Num());
for (int32 Idx = 0; Idx < Containers.Num(); ++Idx)
{
FSharedOnDemandContainer& Container = Containers[Idx];
FChunkLookup& Lookup = ChunkLookups.AddDefaulted_GetRef();
Lookup.AddrToIndex.Reserve(Container->ChunkEntries.Num());
for (int32 EntryIndex = 0; const FOnDemandChunkEntry& Entry : Container->ChunkEntries)
{
const FCasAddr& Addr = *reinterpret_cast<const FCasAddr*>(&Entry.Hash);
Lookup.AddrToIndex.Add(Addr, EntryIndex++);
}
}
auto FindChunkEntry = [&Containers, &ChunkLookups](const FCasAddr& Addr, int32& OutContainerIndex) -> int32
{
OutContainerIndex = INDEX_NONE;
for (int32 Idx = 0; Idx < Containers.Num(); ++Idx)
{
FChunkLookup& Lookup = ChunkLookups[Idx];
if (const int32* EntryIndex = Lookup.AddrToIndex.Find(Addr))
{
OutContainerIndex = Idx;
return *EntryIndex;
}
}
return INDEX_NONE;
};
const int32 ChunkCount = ChunkLocations.Num();
uint32 CorruptChunkCount = 0;
uint32 MissingChunkCount = 0;
uint32 ReadErrorCount = 0;
uint64 TotalVerifiedBytes = 0;
FIoBuffer Chunk(1 << 20);
if (ChunkCount == 0)
{
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Verify skipped, install cache is empty"));
return FIoStatus::Ok;
}
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Verifying %d installed chunks..."), ChunkCount);
for (int32 ChunkIndex = 0; const FCasAddrLocation& ChunkLocation : ChunkLocations)
{
FSharedFileOpenResult OpenResult = Cas.OpenRead(ChunkLocation.Location.BlockId);
if (OpenResult.HasError())
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to open block %u for reading"), ChunkLocation.Location.BlockId.Id);
ReadErrorCount++;
ChunkIndex++;
continue;
}
int32 ContainerIndex = INDEX_NONE;
int32 EntryIndex = FindChunkEntry(ChunkLocation.Addr, ContainerIndex);
if (EntryIndex == INDEX_NONE)
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to find chunk entry for CAS adress '%s'"), *LexToString(ChunkLocation.Addr));
MissingChunkCount++;
ChunkIndex++;
continue;
}
const FSharedOnDemandContainer& Container = Containers[ContainerIndex];
const FIoChunkId& ChunkId = Container->ChunkIds[EntryIndex];
const FOnDemandChunkEntry& ChunkEntry = Container->ChunkEntries[EntryIndex];
FSharedFileHandle FileHandle = OpenResult.GetValue();
const int64 ChunkSize = Align(int64(ChunkEntry.EncodedSize), FAES::AESBlockSize);
TotalVerifiedBytes += ChunkSize;
if (int64(Chunk.GetSize()) < ChunkSize)
{
Chunk = FIoBuffer(ChunkSize);
}
if (FileHandle->Seek(int64(ChunkLocation.Location.BlockOffset)) == false)
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Chunk %d/%d SEEK FAILED, Container='%s', ChunkId='%s', ChunkSize=%lld, Hash='%s', Block=%u, BlockOffset=%u"),
ChunkIndex + 1, ChunkCount, *Container->Name, *LexToString(ChunkId), ChunkSize, *LexToString(ChunkEntry.Hash),
ChunkLocation.Location.BlockId.Id, ChunkLocation.Location.BlockOffset);
ReadErrorCount++;
ChunkIndex++;
continue;
}
if (FileHandle->Read(reinterpret_cast<uint8*>(Chunk.GetData()), ChunkSize) == false)
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Chunk %d/%d READ FAILED, Container='%s', ChunkId='%s', ChunkSize=%lld, Hash='%s', Block=%u, BlockOffset=%u"),
ChunkIndex + 1, ChunkCount, *Container->Name, *LexToString(ChunkId), ChunkSize, *LexToString(ChunkEntry.Hash),
ChunkLocation.Location.BlockId.Id, ChunkLocation.Location.BlockOffset);
ReadErrorCount++;
ChunkIndex++;
continue;
}
const FIoHash ChunkHash = FIoHash::HashBuffer(Chunk.GetView().Left(ChunkSize));
if (ChunkHash == ChunkEntry.Hash)
{
UE_LOG(LogIoStoreOnDemand, VeryVerbose, TEXT("Chunk %d/%d OK, Container='%s', ChunkId='%s', ChunkSize=%lld, Hash='%s', Block=%u, BlockOffset=%u"),
ChunkIndex + 1, ChunkCount, *Container->Name, *LexToString(ChunkId), ChunkSize, *LexToString(ChunkEntry.Hash),
ChunkLocation.Location.BlockId.Id, ChunkLocation.Location.BlockOffset);
}
else
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Chunk %d/%d CORRUPT, Container='%s', ChunkId='%s', ChunkSize=%lld, Hash='%s', ActualHash='%s', Block=%u, BlockOffset=%u"),
ChunkIndex + 1, ChunkCount, *Container->Name, *LexToString(ChunkId), ChunkSize, *LexToString(ChunkEntry.Hash), *LexToString(ChunkHash),
ChunkLocation.Location.BlockId.Id, ChunkLocation.Location.BlockOffset);
CorruptChunkCount++;
}
ChunkIndex++;
}
if (CorruptChunkCount > 0 || MissingChunkCount > 0 || ReadErrorCount > 0)
{
const FString Reason = FString::Printf(TEXT("Verify install cache failed, Corrupt=%u, Missing=%u, ReadErrors=%u"),
CorruptChunkCount, MissingChunkCount, ReadErrorCount);
if (CorruptChunkCount > 0 || ReadErrorCount > 0)
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("%s"), *Reason);
}
else
{
UE_LOG(LogIoStoreOnDemand, Warning, TEXT("%s"), *Reason);
}
if (CorruptChunkCount > 0)
{
return FIoStatus(EIoErrorCode::SignatureError);
}
if (ReadErrorCount > 0)
{
return FIoStatus(EIoErrorCode::ReadError);
}
return FIoStatus(EIoErrorCode::NotFound);
}
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Successfully verified %d chunk(s) of total %.2lf MiB"),
ChunkCount, ToMiB(TotalVerifiedBytes));
return FIoStatus::Ok;
}
FIoStatus FOnDemandInstallCache::Reinitialize()
{
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Recreating install cache in directory '%s'"), *CacheDirectory);
IFileManager& Ifm = IFileManager::Get();
const bool bTree = true;
if (Ifm.DeleteDirectory(*CacheDirectory, false, bTree) == false)
{
return FIoStatusBuilder(EIoErrorCode::WriteError)
<< TEXT("Failed to delete directory '")
<< CacheDirectory
<< TEXT("'");
}
if (Ifm.MakeDirectory(*CacheDirectory, bTree) == false)
{
return FIoStatusBuilder(EIoErrorCode::WriteError)
<< TEXT("Failed to create directory '")
<< CacheDirectory
<< TEXT("'");
}
if (FIoStatus Status = Cas.Initialize(CacheDirectory); Status.IsOk() == false)
{
return Status;
}
const FString JournalFile = GetJournalFilename();
if (FIoStatus Status = FCasJournal::Create(JournalFile); Status.IsOk() == false)
{
return Status;
}
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Created CAS journal '%s'"), *JournalFile);
return FIoStatus::Ok;
}
FIoStatus FOnDemandInstallCache::InitialVerify()
{
// Verify the blocks on disk with the current state of the CAS
{
TArray<FCasAddr> RemovedChunks;
if (FIoStatus Verify = Cas.Verify(RemovedChunks); Verify.IsOk() == false)
{
FOnDemandInstallCacheStats::OnCasVerificationError(RemovedChunks.Num());
// Remove all entries that doesn't have a valid cache block
FCasJournal::FTransaction Transaction = FCasJournal::Begin(GetJournalFilename());
for (const FCasAddr& Addr : RemovedChunks)
{
Transaction.ChunkLocation(FCasLocation::Invalid, Addr);
}
if (FIoStatus Status = FCasJournal::Commit(MoveTemp(Transaction)); Status.IsOk() == false)
{
return Status;
}
}
}
// Check if the cache is over budget
{
FCasBlockInfoMap BlockInfo;
uint64 CacheSize = Cas.GetBlockInfo(BlockInfo);
if (CacheSize > MaxCacheSize)
{
const uint64 TotalBytesToPurge = CacheSize - MaxCacheSize;
uint64 TotalPurgedBytes = 0;
UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Cache size is greater than disk quota - Purging install cache, MaxCacheSize=%.2lf MiB, TotalSize=%.2lf MiB, TotalBytesToPurge=%.2lf MiB"),
ToMiB(MaxCacheSize), ToMiB(CacheSize), ToMiB(TotalBytesToPurge));
FIoStatus PurgeStatus = Purge(BlockInfo, TotalBytesToPurge, TotalPurgedBytes);
if (PurgeStatus.IsOk() && TotalPurgedBytes >= TotalBytesToPurge)
{
UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Successfully purged %.2lf MiB from install cache"), ToMiB(TotalPurgedBytes));
}
else
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to purge %.2lf MiB from install cache. Actually purged %.2lf MiB from install cache"),
ToMiB(TotalBytesToPurge), ToMiB(TotalPurgedBytes));
return FIoStatusBuilder(EIoErrorCode::WriteError) << FString::Printf(TEXT("Failed to purge overbudget cache (%s)"), *PurgeStatus.ToString());
}
}
}
return FIoStatus::Ok;
}
uint64 FOnDemandInstallCache::AddReferencesToBlocks(
const TArray<FSharedOnDemandContainer>& Containers,
const TArray<TBitArray<>>& ChunkEntryIndices,
const TSet<FIoHash>& ChunksToInstall,
FCasBlockInfoMap& BlockInfoMap,
uint64& OutTotalReferencedBytes) const
{
uint64 TotalUncachedBytes = 0;
OutTotalReferencedBytes = 0;
for (int32 Index = 0; FSharedOnDemandContainer Container : Containers)
{
const TBitArray<>& IsReferenced = ChunkEntryIndices[Index++];
for (int32 EntryIndex = 0; const FOnDemandChunkEntry& Entry : Container->ChunkEntries)
{
const bool bToInstall = ChunksToInstall.Contains(Entry.Hash);
const bool bIsReferenced = IsReferenced[EntryIndex];
uint64 ChunkDiskSize = Align(int64(Entry.EncodedSize), FAES::AESBlockSize);
if (bIsReferenced)
{
OutTotalReferencedBytes += ChunkDiskSize;
}
FCasBlockInfo* BlockInfo = nullptr;
if (bToInstall || bIsReferenced)
{
if (FCasLocation Loc = Cas.FindChunk(Entry.Hash); Loc.IsValid())
{
BlockInfo = BlockInfoMap.Find(Loc.BlockId);
if (!BlockInfo)
{
UE_CLOG(bIsReferenced, LogIoStoreOnDemand, Error, TEXT("Failed to find CAS block info for referenced chunk, ChunkId='%s', Container='%s'"),
*LexToString(Container->ChunkIds[EntryIndex]), *Container->Name);
}
}
else
{
UE_CLOG(bIsReferenced, LogIoStoreOnDemand, Error, TEXT("Failed to find CAS location for referenced chunk, ChunkId='%s', Container='%s'"),
*LexToString(Container->ChunkIds[EntryIndex]), *Container->Name);
}
}
if (BlockInfo)
{
BlockInfo->RefSize += ChunkDiskSize;
}
else if (bToInstall)
{
TotalUncachedBytes += ChunkDiskSize;
}
EntryIndex++;
}
}
return TotalUncachedBytes;
}
FIoStatus FOnDemandInstallCache::Purge(FCasBlockInfoMap& BlockInfo, const uint64 TotalBytesToPurge, uint64& OutTotalPurgedBytes)
{
BlockInfo.ValueSort([](const FCasBlockInfo& LHS, const FCasBlockInfo& RHS)
{
return LHS.LastAccess < RHS.LastAccess;
});
OutTotalPurgedBytes = 0;
for (auto It = BlockInfo.CreateIterator(); It; ++It)
{
const FCasBlockId BlockId = It->Key;
const FCasBlockInfo& Info = It->Value;
if (Info.RefSize > 0)
{
continue;
}
FCasJournal::FTransaction Transaction = FCasJournal::Begin(GetJournalFilename());
TArray<FCasAddr> RemovedChunks;
if (FIoStatus Status = Cas.DeleteBlock(BlockId, RemovedChunks); !Status.IsOk())
{
return Status;
}
// This should be the only thread writing to CurrentBlock
FCasBlockId MaybeCurrentBlock = BlockId;
CurrentBlock.compare_exchange_strong(MaybeCurrentBlock, FCasBlockId::Invalid);
OutTotalPurgedBytes += Info.FileSize;
It.RemoveCurrent();
for (const FCasAddr& Addr : RemovedChunks)
{
Transaction.ChunkLocation(FCasLocation::Invalid, Addr);
}
Transaction.BlockDeleted(BlockId);
if (FIoStatus Status = FCasJournal::Commit(MoveTemp(Transaction)); !Status.IsOk())
{
return Status;
}
if (OutTotalPurgedBytes >= TotalBytesToPurge)
{
break;
}
}
return FIoStatus::Ok;
}
FIoStatus FOnDemandInstallCache::Defrag(
const TArray<FSharedOnDemandContainer>& Containers,
const TArray<TBitArray<>>& ChunkEntryIndices,
FCasBlockInfoMap& BlockInfo,
const uint64* TotalBytesToFree /*= nullptr*/)
{
if (TotalBytesToFree && *TotalBytesToFree == 0)
{
return FIoStatus::Ok;
}
const uint64 TotalCachedBytes = Algo::TransformAccumulate(BlockInfo,
[](const TPair<FCasBlockId, FCasBlockInfo>& Kv) { return Kv.Value.FileSize; },
uint64(0));
if (TotalCachedBytes > MaxCacheSize)
{
// Ruh-Roh! There's not enough of the disk quota left to run a defrag!
const FString ErrorMsg = FString::Printf(TEXT("Cache size is greater than disk quota - Cannot Defragment!, MaxCacheSize=%.2lf MiB, TotalCachedBytes=%.2lf MiB"),
ToMiB(MaxCacheSize), ToMiB(TotalCachedBytes));
UE_LOG(LogIoStoreOnDemand, Error, TEXT("%s"), *ErrorMsg);
FOnDemandInstallCacheStats::OnDefrag(EIoErrorCode::WriteError, 0);
return FIoStatusBuilder(EIoErrorCode::WriteError) << ErrorMsg;
}
struct FDefragBlockReferencedChunk
{
uint32 BlockOffset = 0;
uint32 EncodedSize = 0;
FIoHash Hash;
};
struct FDefragBlock
{
FCasBlockId BlockId;
int64 LastAccess = 0;
TArray<FDefragBlockReferencedChunk> ReferencedChunks;
};
// Build the list of blocks to defrag and determine if its possible to free enough data through defragging
TArray<FDefragBlock> BlocksToDefrag;
// Start with the least referenced blocks
BlockInfo.ValueSort([](const FCasBlockInfo& LHS, const FCasBlockInfo& RHS)
{
return LHS.RefSize < RHS.RefSize;
});
uint64 FragmentedBytes = 0;
uint64 TotalBlockSize = 0;
if (TotalBytesToFree)
{
// Partial defrag
bool bPossibleToFreeBytes = false;
uint64 FreedBlockBytes = 0;
uint64 NewBlockBytes = 0;
for (const TPair<FCasBlockId, FCasBlockInfo>& Kv : BlockInfo)
{
const FCasBlockId BlockId = Kv.Key;
const FCasBlockInfo& Info = Kv.Value;
if (!bPossibleToFreeBytes && Info.RefSize < Info.FileSize)
{
// Block is fragmented
FragmentedBytes += (Info.FileSize - Info.RefSize);
TotalBlockSize += Info.FileSize;
FreedBlockBytes += Info.FileSize;
NewBlockBytes += Info.RefSize; // For now, assume that nothing will be moved to the current block
BlocksToDefrag.Add(FDefragBlock{ .BlockId = BlockId, .LastAccess = Info.LastAccess });
if (FreedBlockBytes >= NewBlockBytes && FreedBlockBytes - NewBlockBytes >= *TotalBytesToFree)
{
bPossibleToFreeBytes = true;
}
}
else if (Info.FileSize < Cas.MinBlockSize)
{
// Block is too small whether or not its fragmented
if (ensure(Info.RefSize <= Info.FileSize))
{
FragmentedBytes += (Info.FileSize - Info.RefSize);
}
TotalBlockSize += Info.FileSize;
BlocksToDefrag.Add(FDefragBlock{ .BlockId = BlockId, .LastAccess = Info.LastAccess });
}
}
if (!bPossibleToFreeBytes)
{
FOnDemandInstallCacheStats::OnDefrag(EIoErrorCode::WriteError, 0);
return FIoStatusBuilder(EIoErrorCode::WriteError) << FString::Printf(TEXT("Defrag failed - cannot free %" UINT64_FMT), *TotalBytesToFree);
}
}
else
{
// Full defrag
for (const TPair<FCasBlockId, FCasBlockInfo>& Kv : BlockInfo)
{
const FCasBlockId BlockId = Kv.Key;
const FCasBlockInfo& Info = Kv.Value;
if (Info.RefSize < Info.FileSize)
{
// Block is fragmented
FragmentedBytes += (Info.FileSize - Info.RefSize);
TotalBlockSize += Info.FileSize;
BlocksToDefrag.Add(FDefragBlock{ .BlockId = BlockId, .LastAccess = Info.LastAccess });
}
else if (Info.FileSize < Cas.MinBlockSize)
{
// Block is too small whether or not its fragmented
if (ensure(Info.RefSize <= Info.FileSize))
{
FragmentedBytes += (Info.FileSize - Info.RefSize);
}
TotalBlockSize += Info.FileSize;
BlocksToDefrag.Add(FDefragBlock{ .BlockId = BlockId, .LastAccess = Info.LastAccess });
}
}
if (BlocksToDefrag.IsEmpty())
{
// Already defragged
UE_LOG(LogIoStoreOnDemand, Display, TEXT("Cache not fragmented."));
return FIoStatus::Ok;
}
}
UE_LOG(LogIoStoreOnDemand, Display, TEXT("Defrag found %" UINT64_FMT " fragmented bytes of %" UINT64_FMT " total bytes in %i blocks."), FragmentedBytes, TotalBlockSize, BlocksToDefrag.Num());
// Right now, don't allow moving chunks to the current block for defrag. Its somewhat dangerous and hard to reason about.
// - Currently, the slack in the current block cannot be determined without opening a write handle to the block.
// - If we defrag the current block itself, then we would need additional tracking so we don't lose any chunks moved into it.
// - Additionally, this would also depend on the order blocks are defragged.
// This should be the only thread writing to CurrentBlock.
CurrentBlock = FCasBlockId::Invalid;
// Determine chunks that need to be moved for each defrag block
for (int32 Index = 0; FSharedOnDemandContainer Container : Containers)
{
const TBitArray<>& IsReferenced = ChunkEntryIndices[Index++];
for (int32 EntryIndex = 0; const FOnDemandChunkEntry& Entry : Container->ChunkEntries)
{
if (bool bIsReferenced = IsReferenced[EntryIndex++]; bIsReferenced == false)
{
continue;
}
if (FCasLocation Loc = Cas.FindChunk(Entry.Hash); Loc.IsValid())
{
if (FDefragBlock* DefragBlock = Algo::FindBy(BlocksToDefrag, Loc.BlockId, &FDefragBlock::BlockId))
{
DefragBlock->ReferencedChunks.Add(FDefragBlockReferencedChunk
{
.BlockOffset = Loc.BlockOffset,
.EncodedSize = Entry.EncodedSize,
.Hash = Entry.Hash,
});
}
}
}
}
// Move chunks to new blocks and delete old blocks
FPendingChunks DefragPendingChunks;
for (const FDefragBlock& DefragBlock : BlocksToDefrag)
{
if (DefragBlock.ReferencedChunks.IsEmpty() == false)
{
FSharedFileOpenResult FileOpenResult = Cas.OpenRead(DefragBlock.BlockId);
if (!FileOpenResult.IsValid())
{
const FString Filename = Cas.GetBlockFilename(DefragBlock.BlockId);
const FString ErrorMsg = FileOpenResult.GetError().GetMessage();
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to open CAS block '%s' for reading, reason '%s'"), *Filename, *ErrorMsg);
FOnDemandInstallCacheStats::OnDefrag(EIoErrorCode::FileOpenFailed, 0);
return FIoStatusBuilder(EIoErrorCode::FileOpenFailed) << ErrorMsg;
}
FSharedFileHandle FileHandle = FileOpenResult.StealValue();
Algo::SortBy(DefragBlock.ReferencedChunks, &FDefragBlockReferencedChunk::BlockOffset);
for (const FDefragBlockReferencedChunk& ReffedChunk : DefragBlock.ReferencedChunks)
{
FileHandle->Seek(ReffedChunk.BlockOffset);
const int64 ChunkDiskSize = Align(int64(ReffedChunk.EncodedSize), FAES::AESBlockSize);
FIoBuffer Buffer(ChunkDiskSize);
const bool bOk = FileHandle->Read(Buffer.GetData(), Buffer.GetSize());
if (!bOk)
{
FOnDemandInstallCacheStats::OnDefrag(EIoErrorCode::ReadError, 0);
return EIoErrorCode::ReadError;
}
const FIoHash ChunkHash = FIoHash::HashBuffer(Buffer.GetView());
if (ChunkHash != ReffedChunk.Hash)
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Found chunk with invalid hash while defragging block, BlockId=%u, BlockOffset=%u"),
DefragBlock.BlockId.Id, ReffedChunk.BlockOffset);
if (FIoStatus Status = FlushPendingChunks(DefragPendingChunks); Status.IsOk() == false)
{
FOnDemandInstallCacheStats::OnDefrag(Status.GetErrorCode(), 0);
return Status;
}
FOnDemandInstallCacheStats::OnDefrag(EIoErrorCode::SignatureError, 0);
return EIoErrorCode::SignatureError;
}
if (DefragPendingChunks.TotalSize > FPendingChunks::MaxPendingBytes)
{
if (FIoStatus Status = FlushPendingChunks(DefragPendingChunks, DefragBlock.LastAccess); Status.IsOk() == false)
{
FOnDemandInstallCacheStats::OnDefrag(Status.GetErrorCode(), 0);
return Status;
}
check(DefragPendingChunks.IsEmpty());
}
DefragPendingChunks.Append(MoveTemp(Buffer), ReffedChunk.Hash);
}
FileHandle.Reset();
if (FIoStatus Status = FlushPendingChunks(DefragPendingChunks); Status.IsOk() == false)
{
FOnDemandInstallCacheStats::OnDefrag(Status.GetErrorCode(), 0);
return Status;
}
check(DefragPendingChunks.IsEmpty());
}
FCasJournal::FTransaction Transaction = FCasJournal::Begin(GetJournalFilename());
// Flushing should overwrite the lookup info for the cas addr to point at the new block.
// Can now remove the old block
TArray<FCasAddr> DeletedChunkAddresses;
Cas.DeleteBlock(DefragBlock.BlockId, DeletedChunkAddresses);
for (const FCasAddr& Addr : DeletedChunkAddresses)
{
Transaction.ChunkLocation(FCasLocation::Invalid, Addr);
}
Transaction.BlockDeleted(DefragBlock.BlockId);
if (FIoStatus Status = FCasJournal::Commit(MoveTemp(Transaction)); !Status.IsOk())
{
FOnDemandInstallCacheStats::OnDefrag(Status.GetErrorCode(), 0);
return Status;
}
}
UE_LOG(LogIoStoreOnDemand, Display, TEXT("Defrag removed %" UINT64_FMT " fragmented bytes of %" UINT64_FMT " total bytes in %i blocks."), FragmentedBytes, TotalBlockSize, BlocksToDefrag.Num());
FOnDemandInstallCacheStats::OnDefrag(EIoErrorCode::Ok, FragmentedBytes);
return FIoStatus::Ok;
}
FIoStatus FOnDemandInstallCache::Flush()
{
if (PendingChunks.IsValid())
{
FUniquePendingChunks Chunks = MoveTemp(PendingChunks);
return FlushPendingChunks(*Chunks);
}
Cas.Compact();
return FIoStatus::Ok;
}
FOnDemandInstallCacheUsage FOnDemandInstallCache::GetCacheUsage()
{
// If this is called from a thread other than the OnDemandIoStore tick thread
// then its possible the block info and containers may not be in sync with each other
// or the current state of the tick thread.
// This should only be used for debugging and telemetry purposes.
FCasBlockInfoMap BlockInfo;
const uint64 TotalCachedBytes = Cas.GetBlockInfo(BlockInfo);
TArray<FSharedOnDemandContainer> Containers;
TArray<TBitArray<>> ChunkEntryIndices;
IoStore.GetReferencedContent(Containers, ChunkEntryIndices);
check(Containers.Num() == ChunkEntryIndices.Num());
uint64 ReferencedBytes = 0;
AddReferencesToBlocks(Containers, ChunkEntryIndices, {}, BlockInfo, ReferencedBytes);
uint64 FragmentedBytes = 0;
uint64 ReferencedBlockBytes = 0;
for (const TPair<FCasBlockId, FCasBlockInfo>& Kv : BlockInfo)
{
const FCasBlockId BlockId = Kv.Key;
const FCasBlockInfo& Info = Kv.Value;
if (Info.RefSize < Info.FileSize)
{
FragmentedBytes += (Info.FileSize - Info.RefSize);
}
if (Info.RefSize > 0)
{
ReferencedBlockBytes += Info.FileSize;
}
}
return FOnDemandInstallCacheUsage
{
.MaxSize = MaxCacheSize,
.TotalSize = TotalCachedBytes,
.ReferencedBlockSize = ReferencedBlockBytes,
.ReferencedSize = ReferencedBytes,
.FragmentedChunksSize = FragmentedBytes,
};
}
FIoStatus FOnDemandInstallCache::FlushPendingChunks(FPendingChunks& Chunks, int64 UtcAccessTicks)
{
if (Chunks.IsEmpty())
{
return FIoStatus::Ok;
}
#if UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE
UE::Tasks::TTask<FIoStatus> Task = ExclusivePipe.Launch(UE_SOURCE_LOCATION, [this, &Chunks, UtcAccessTicks]
{
return FlushPendingChunksImpl(Chunks, UtcAccessTicks);
}, UE::Tasks::ETaskPriority::BackgroundHigh);
Task.Wait();
return Task.GetResult();
#else
return FlushPendingChunksImpl(Chunks, UtcAccessTicks);
#endif // UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE
}
FIoStatus FOnDemandInstallCache::FlushPendingChunksImpl(FPendingChunks& Chunks, int64 UtcAccessTicks)
{
ON_SCOPE_EXIT { Chunks.Reset(); };
// This should be the only thread writing to CurrentBlock
FCasBlockId CurrentBlockId = CurrentBlock;
while (Chunks.IsEmpty() == false)
{
FCasJournal::FTransaction Transaction = FCasJournal::Begin(GetJournalFilename());
if (CurrentBlockId.IsValid() == false)
{
CurrentBlockId = Cas.CreateBlock();
ensure(CurrentBlockId.IsValid());
CurrentBlock = CurrentBlockId;
Transaction.BlockCreated(CurrentBlockId);
}
TUniquePtr<IFileHandle> CasFileHandle = Cas.OpenWrite(CurrentBlockId);
if (CasFileHandle.IsValid() == false)
{
FOnDemandInstallCacheStats::OnFlush(EIoErrorCode::WriteError, 0);
return FIoStatusBuilder(EIoErrorCode::FileOpenFailed)
<< TEXT("Failed to open cache block file '")
<< Cas.GetBlockFilename(CurrentBlockId)
<< TEXT("'");
}
const int64 CasBlockOffset = CasFileHandle->Tell();
FLargeMemoryWriter Ar(Chunks.TotalSize);
TArray<FIoHash> ChunkHashes;
TArray<int64> Offsets;
while (Chunks.IsEmpty() == false)
{
if (CasBlockOffset > 0 && CasBlockOffset + Ar.Tell() + Chunks.Chunks[0].GetSize() > Cas.MaxBlockSize)
{
break;
}
FIoBuffer Chunk = Chunks.Pop(ChunkHashes.AddDefaulted_GetRef());
Offsets.Add(CasBlockOffset + Ar.Tell());
Ar.Serialize(Chunk.GetData(), Chunk.GetSize());
}
if (Ar.Tell() > 0)
{
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Writing %.2lf MiB to CAS block %u"),
ToMiB(Ar.Tell()), CurrentBlockId.Id);
if (CasFileHandle->Write(Ar.GetData(), Ar.Tell()) == false)
{
return FIoStatusBuilder(EIoErrorCode::WriteError)
<< TEXT("Failed to serialize chunks to cache block");
}
if (UtcAccessTicks)
{
Cas.TrackAccessIfNewer(CurrentBlockId, UtcAccessTicks);
}
else
{
Cas.TrackAccess(CurrentBlockId);
}
if (CasFileHandle->Flush() == false)
{
FOnDemandInstallCacheStats::OnFlush(EIoErrorCode::WriteError, Ar.Tell());
return FIoStatusBuilder(EIoErrorCode::WriteError)
<< TEXT("Failed to flush cache block to disk");
}
FOnDemandInstallCacheStats::OnFlush(EIoErrorCode::Ok, Ar.Tell());
check(ChunkHashes.Num() == Offsets.Num());
check(CurrentBlockId.IsValid());
for (int32 Idx = 0, Count = Offsets.Num(); Idx < Count; ++Idx)
{
const FCasAddr CasAddr = FCasAddr::From(ChunkHashes[Idx]);
const uint32 ChunkOffset = IntCastChecked<uint32>(Offsets[Idx]);
FCasLocation& Loc = Cas.Lookup.FindOrAdd(CasAddr);
Loc.BlockId = CurrentBlockId;
Loc.BlockOffset = ChunkOffset;
Transaction.ChunkLocation(Loc, CasAddr);
}
}
if (FIoStatus Status = FCasJournal::Commit(MoveTemp(Transaction)); Status.IsOk() == false)
{
return Status;
}
if (Chunks.IsEmpty() == false)
{
CurrentBlockId = FCasBlockId::Invalid;
}
}
return FIoStatus::Ok;
}
void FOnDemandInstallCache::CompleteRequest(FIoRequestImpl* Request, EIoErrorCode Status)
{
bool bSucceeded = false;
if (Status == EIoErrorCode::Ok && !Request->IsCancelled())
{
FChunkRequest& ChunkRequest = FChunkRequest::GetRef(*Request);
const FOnDemandChunkInfo& ChunkInfo = ChunkRequest.ChunkInfo;
FIoBuffer EncodedChunk = MoveTemp(ChunkRequest.EncodedChunk);
if (EncodedChunk.GetSize() > 0)
{
FIoChunkDecodingParams Params;
Params.CompressionFormat = ChunkInfo.CompressionFormat();
Params.EncryptionKey = ChunkInfo.EncryptionKey();
Params.BlockSize = ChunkInfo.BlockSize();
Params.TotalRawSize = ChunkInfo.RawSize();
Params.RawOffset = Request->Options.GetOffset();
Params.EncodedOffset = ChunkRequest.ChunkRange.GetOffset();
Params.EncodedBlockSize = ChunkInfo.Blocks();
Params.BlockHash = ChunkInfo.BlockHashes();
Request->CreateBuffer(ChunkRequest.RawSize);
FMutableMemoryView RawChunk = Request->GetBuffer().GetMutableView();
bSucceeded = FIoChunkEncoding::Decode(Params, EncodedChunk.GetView(), RawChunk);
UE_CLOG(!bSucceeded, LogIoStoreOnDemand, Error, TEXT("Failed to decode chunk, ChunkId='%s'"), *LexToString(Request->ChunkId));
}
}
if (bSucceeded == false)
{
if (Status == EIoErrorCode::Ok)
{
Status = EIoErrorCode::ReadError;
}
Request->SetLastBackendError(Status);
Request->SetResult(FIoBuffer());
TRACE_IOSTORE_BACKEND_REQUEST_FAILED(Request);
}
else
{
TRACE_IOSTORE_BACKEND_REQUEST_COMPLETED(Request, Request->GetBuffer().GetSize());
}
{
UE::TUniqueLock Lock(Mutex);
CompletedRequests.AddTail(Request);
}
BackendContext->WakeUpDispatcherThreadDelegate.Execute();
}
///////////////////////////////////////////////////////////////////////////////
TSharedPtr<IOnDemandInstallCache> MakeOnDemandInstallCache(
FOnDemandIoStore& IoStore,
const FOnDemandInstallCacheConfig& Config)
{
IFileManager& Ifm = IFileManager::Get();
if (Config.bDropCache)
{
UE_LOG(LogIoStoreOnDemand, Log, TEXT("Deleting install cache directory '%s'"), *Config.RootDirectory);
Ifm.DeleteDirectory(*Config.RootDirectory, false, true);
}
const bool bTree = true;
if (!Ifm.MakeDirectory(*Config.RootDirectory, bTree))
{
UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to create directory '%s'"), *Config.RootDirectory);
return TSharedPtr<IOnDemandInstallCache>();
}
return MakeShared<FOnDemandInstallCache>(Config, IoStore);
}
///////////////////////////////////////////////////////////////////////////////
#if WITH_IOSTORE_ONDEMAND_TESTS
class FTmpDirectoryScope
{
public:
explicit FTmpDirectoryScope(const FString& InDir)
: Ifm(IFileManager::Get())
, Dir(InDir)
{
const bool bTree = true;
const bool bRequireExists = false;
Ifm.DeleteDirectory(*Dir, bRequireExists, bTree);
Ifm.MakeDirectory(*Dir, bTree);
}
~FTmpDirectoryScope()
{
const bool bTree = true;
const bool bRequireExists = false;
Ifm.DeleteDirectory(*Dir, bRequireExists, bTree);
}
private:
IFileManager& Ifm;
FString Dir;
};
FCasAddr CreateCasTestAddr(uint64 Value)
{
return FCasAddr::From(reinterpret_cast<const uint8*>(&Value), sizeof(uint64));
}
TEST_CASE("IoStore::OnDemand::InstallCache::Journal", "[IoStoreOnDemand][InstallCache]")
{
const FString TestBaseDir = TEXT("TestTmpDir");
SECTION("CreateJournalFile")
{
FTmpDirectoryScope _(TestBaseDir);
const FString JournalFile = TestBaseDir / TEXT("test.jrn");
FIoStatus Status = FCasJournal::Create(JournalFile);
CHECK(Status.IsOk());
}
SECTION("SimpleTransaction")
{
FTmpDirectoryScope _(TestBaseDir);
const FString JournalFile = TestBaseDir / TEXT("test.jrn");
FIoStatus Status = FCasJournal::Create(JournalFile);
CHECK(Status.IsOk());
FCasJournal::FTransaction Transaction = FCasJournal::Begin(JournalFile);
Transaction.BlockCreated(FCasBlockId(1));
Status = FCasJournal::Commit(MoveTemp(Transaction));
CHECK(Status.IsOk());
}
SECTION("ReplayChunkLocations")
{
//Arrange
TArray<FCasAddr> ExpectedAddresses;
TArray<uint32> ExpectedBlockOffsets;
const FCasBlockId ExpectedBlockId(42);
for (int32 Idx = 1; Idx < 33; ++Idx)
{
ExpectedAddresses.Add(FCasAddr::From(reinterpret_cast<const uint8*>(&Idx), sizeof(uint32)));
ExpectedBlockOffsets.Add(Idx);
}
// Act
FTmpDirectoryScope _(TestBaseDir);
const FString JournalFile = TestBaseDir / TEXT("test.jrn");
FIoStatus Status = FCasJournal::Create(JournalFile);
CHECK(Status.IsOk());
FCasJournal::FTransaction Transaction = FCasJournal::Begin(JournalFile);
for (int32 Idx = 0; const FCasAddr& Addr : ExpectedAddresses)
{
Transaction.ChunkLocation(
FCasLocation
{
.BlockId = ExpectedBlockId,
.BlockOffset = ExpectedBlockOffsets[Idx]
},
Addr);
}
Status = FCasJournal::Commit(MoveTemp(Transaction));
CHECK(Status.IsOk());
// Assert
TArray<FCasJournal::FEntry::FChunkLocation> Locs;
Status = FCasJournal::Replay(
JournalFile,
[&Locs](const FCasJournal::FEntry& JournalEntry)
{
switch(JournalEntry.Type())
{
case FCasJournal::FEntry::EType::ChunkLocation:
{
Locs.Add(JournalEntry.ChunkLocation);
break;
}
default:
CHECK(false);
break;
};
});
CHECK(Status.IsOk());
CHECK(Locs.Num() == ExpectedAddresses.Num());
for (int32 Idx = 0; const FCasJournal::FEntry::FChunkLocation& Loc : Locs)
{
const FCasLocation ExpectedLoc = FCasLocation
{
.BlockId = ExpectedBlockId,
.BlockOffset = uint32(Idx + 1)
};
CHECK(Loc.CasLocation.BlockId == ExpectedLoc.BlockId);
CHECK(Loc.CasLocation.BlockOffset == ExpectedLoc.BlockOffset);
}
}
SECTION("ReplayBlockCreatedAndDeleted")
{
// Arrange
const FCasBlockId ExpectedBlockId(42);
// Act
FTmpDirectoryScope _(TestBaseDir);
const FString JournalFile = TestBaseDir / TEXT("test.jrn");
FIoStatus Status = FCasJournal::Create(JournalFile);
CHECK(Status.IsOk());
FCasJournal::FTransaction Tx = FCasJournal::Begin(JournalFile);
Tx.BlockCreated(ExpectedBlockId);
Tx.BlockDeleted(ExpectedBlockId);
Status = FCasJournal::Commit(MoveTemp(Tx));
CHECK(Status.IsOk());
// Assert
FCasBlockId CreatedBlockId;
FCasBlockId DeletedBlockId;
Status = FCasJournal::Replay(
JournalFile,
[&CreatedBlockId, &DeletedBlockId](const FCasJournal::FEntry& JournalEntry)
{
switch(JournalEntry.Type())
{
case FCasJournal::FEntry::EType::BlockCreated:
{
CreatedBlockId = JournalEntry.BlockOperation.BlockId;
break;
}
case FCasJournal::FEntry::EType::BlockDeleted:
{
DeletedBlockId = JournalEntry.BlockOperation.BlockId;
break;
}
default:
CHECK(false);
break;
};
});
CHECK(Status.IsOk());
CHECK(CreatedBlockId == ExpectedBlockId);
CHECK(DeletedBlockId == ExpectedBlockId);
}
SECTION("ReplayBlockAccess")
{
// Arrange
const FCasBlockId ExpectedBlockId(462);
const uint64 ExpectedTicks = FDateTime::UtcNow().GetTicks();
// Act
FTmpDirectoryScope _(TestBaseDir);
const FString JournalFile = TestBaseDir / TEXT("test.jrn");
FIoStatus Status = FCasJournal::Create(JournalFile);
CHECK(Status.IsOk());
FCasJournal::FTransaction Tx = FCasJournal::Begin(JournalFile);
Tx.BlockAccess(ExpectedBlockId, ExpectedTicks);
Status = FCasJournal::Commit(MoveTemp(Tx));
CHECK(Status.IsOk());
// Assert
FCasBlockId BlockId;
uint64 Ticks = 0;
Status = FCasJournal::Replay(
JournalFile,
[&BlockId, &Ticks](const FCasJournal::FEntry& JournalEntry)
{
switch(JournalEntry.Type())
{
case FCasJournal::FEntry::EType::BlockAccess:
{
const FCasJournal::FEntry::FBlockOperation& Op = JournalEntry.BlockOperation;
BlockId = Op.BlockId;
Ticks = Op.UtcTicks;
break;
}
default:
CHECK(false);
break;
};
});
CHECK(Status.IsOk());
CHECK(BlockId == ExpectedBlockId);
CHECK(Ticks == ExpectedTicks);
}
}
TEST_CASE("IoStore::OnDemand::InstallCache::Snapshot", "[IoStoreOnDemand][InstallCache]")
{
const FString TestBaseDir = TEXT("TestTmpDir");
SECTION("SaveLoadRoundtrip")
{
// Arrange
FCasSnapshot ExpectedSnapshot;
for (uint32 Id = 1; Id <= 10; ++Id)
{
ExpectedSnapshot.Blocks.Add(FCasSnapshot::FBlock
{
.BlockId = FCasBlockId(Id),
.LastAccess = FDateTime::UtcNow().GetTicks()
});
for (uint32 Idx = 1; Idx <= 10; ++Idx)
{
FCasAddr CasAddr = CreateCasTestAddr(Idx);
FCasLocation Loc = FCasLocation
{
.BlockId = FCasBlockId(Id),
.BlockOffset = Idx * 256
};
ExpectedSnapshot.ChunkLocations.Emplace(CasAddr, Loc);
}
}
ExpectedSnapshot.CurrentBlockId = FCasBlockId(1);
// Act
FTmpDirectoryScope _(TestBaseDir);
const FString SnapshotFile = TestBaseDir / TEXT("test.snp");
TIoStatusOr<int64> Status = FCasSnapshot::Save(ExpectedSnapshot, SnapshotFile);
CHECK(Status.IsOk());
const FCasSnapshot Snapshot = FCasSnapshot::Load(SnapshotFile).ConsumeValueOrDie();
// Assert
CHECK(Snapshot.Blocks.Num() == ExpectedSnapshot.Blocks.Num());
for (int32 Idx = 0; Idx < Snapshot.Blocks.Num(); ++Idx)
{
CHECK(Snapshot.Blocks[Idx].BlockId == ExpectedSnapshot.Blocks[Idx].BlockId);
CHECK(Snapshot.Blocks[Idx].LastAccess == ExpectedSnapshot.Blocks[Idx].LastAccess);
}
CHECK(Snapshot.ChunkLocations.Num() == ExpectedSnapshot.ChunkLocations.Num());
for (int32 Idx = 0; Idx < Snapshot.ChunkLocations.Num(); ++Idx)
{
CHECK(Snapshot.ChunkLocations[Idx].Get<0>() == ExpectedSnapshot.ChunkLocations[Idx].Get<0>());
CHECK(Snapshot.ChunkLocations[Idx].Get<1>() == ExpectedSnapshot.ChunkLocations[Idx].Get<1>());
}
CHECK(Snapshot.CurrentBlockId == ExpectedSnapshot.CurrentBlockId);
}
SECTION("CreateFromJournal")
{
// Arrange
FTmpDirectoryScope _(TestBaseDir);
const FString JournalFile = TestBaseDir / TEXT("test.jrn");
const FCasBlockId ExpectedCurrentBlockId(2);
FIoStatus Status = FCasJournal::Create(JournalFile);
CHECK(Status.IsOk());
FCasJournal::FTransaction Tx = FCasJournal::Begin(JournalFile);
// Add a block and some chunk locations
Tx.BlockCreated(FCasBlockId(1));
for (int32 Idx = 1; Idx <= 10; ++Idx)
{
Tx.ChunkLocation(FCasLocation
{
.BlockId = FCasBlockId(1),
.BlockOffset = 256
},
CreateCasTestAddr(uint64(Idx) << 32 | 1ull));
}
// Remove the block and the corresponding chunk locations
for (int32 Idx = 1; Idx <= 10; ++Idx)
{
Tx.ChunkLocation(FCasLocation::Invalid, CreateCasTestAddr(uint64(Idx) << 32 | 1ull));
}
Tx.BlockDeleted(FCasBlockId(1));
// Add a second block and some chunk locations
Tx.BlockCreated(ExpectedCurrentBlockId);
for (int32 Idx = 1; Idx <= 10; ++Idx)
{
Tx.ChunkLocation(FCasLocation
{
.BlockId = ExpectedCurrentBlockId,
.BlockOffset = uint32(Idx) * 256
},
CreateCasTestAddr(Idx));
}
Status = FCasJournal::Commit(MoveTemp(Tx));
CHECK(Status.IsOk());
// Act
const FCasSnapshot Snapshot = FCasSnapshot::FromJournal(JournalFile).ConsumeValueOrDie();
// Assert
CHECK(Snapshot.CurrentBlockId == ExpectedCurrentBlockId);
CHECK(Snapshot.Blocks.Num() == 1);
CHECK(Snapshot.ChunkLocations.Num() == 10);
for (int32 Idx = 1; Idx < Snapshot.ChunkLocations.Num(); ++Idx)
{
const FCasAddr Addr = CreateCasTestAddr(Idx);
const FCasSnapshot::FChunkLocation* Loc =
Algo::FindByPredicate(
Snapshot.ChunkLocations,
[&Addr](const FCasSnapshot::FChunkLocation& L) { return L.Get<0>() == Addr; });
CHECK(Loc != nullptr);
if (Loc != nullptr)
{
CHECK(Loc->Get<1>().BlockId == ExpectedCurrentBlockId);
CHECK(Loc->Get<1>().BlockOffset == uint32(Idx) * 256);
}
}
}
}
#endif // WITH_IOSTORE_ONDEMAND_TESTS
} // namespace UE::IoStore