// Copyright Epic Games, Inc. All Rights Reserved. #include "OnDemandInstallCache.h" #include "OnDemandHttpClient.h" #include "OnDemandIoStore.h" #include "Statistics.h" #include "Algo/Accumulate.h" #include "Algo/Find.h" #include "Async/Mutex.h" #include "Async/UniqueLock.h" #include "Async/AsyncFileHandle.h" #include "Containers/UnrealString.h" #include "GenericHash.h" #include "HAL/FileManager.h" #include "HAL/PlatformFile.h" #include "HAL/PlatformFileManager.h" #include "IO/IoContainerHeader.h" #include "IO/IoChunkId.h" #include "IO/IoChunkEncoding.h" #include "Misc/DateTime.h" #include "Misc/Paths.h" #include "Misc/PathViews.h" #include "Misc/ScopeExit.h" #include "Misc/StringBuilder.h" #include "Serialization/MemoryReader.h" #include "Serialization/LargeMemoryWriter.h" #include "Tasks/Task.h" #include "ProfilingDebugging/IoStoreTrace.h" #if WITH_IOSTORE_ONDEMAND_TESTS #include "Algo/Find.h" #include "TestHarness.h" #include "TestMacros/Assertions.h" #include #endif #ifndef UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE #define UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE (0) #endif #if UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE #include "Tasks/Pipe.h" #endif // UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE namespace UE::IoStore { /////////////////////////////////////////////////////////////////////////////// namespace CVars { static bool GIoStoreOnDemandEnableDefrag = true; static FAutoConsoleVariableRef CVar_IoStoreOnDemandEnableDefrag( TEXT("iostore.EnableDefrag"), GIoStoreOnDemandEnableDefrag, TEXT("Whether to enable defrag when purging") ); } /////////////////////////////////////////////////////////////////////////////// double ToKiB(uint64 Value) { return double(Value) / 1024.0; } /////////////////////////////////////////////////////////////////////////////// double ToMiB(uint64 Value) { return double(Value) / 1024.0 / 1024.0; } /////////////////////////////////////////////////////////////////////////////// using FUniqueFileHandle = TUniquePtr; using FSharedFileHandle = TSharedPtr; using FSharedFileOpenResult = TValueOrError; using FSharedAsyncFileHandle = TSharedPtr; using FWeakAsyncFileHandle = TWeakPtr; using FSharedFileOpenAsyncResult = TValueOrError; using FCasAddr = FHash96; /////////////////////////////////////////////////////////////////////////////// struct FCasBlockId { FCasBlockId() = default; explicit FCasBlockId(uint32 InId) : Id(InId) { } bool IsValid() const { return Id != 0; } friend inline bool operator==(FCasBlockId LHS, FCasBlockId RHS) { return LHS.Id == RHS.Id; } friend inline uint32 GetTypeHash(FCasBlockId BlockId) { return GetTypeHash(BlockId.Id); } friend FArchive& operator<<(FArchive& Ar, FCasBlockId& BlockId) { Ar << BlockId.Id; return Ar; } static const FCasBlockId Invalid; uint32 Id = 0; }; const FCasBlockId FCasBlockId::Invalid = FCasBlockId(); /////////////////////////////////////////////////////////////////////////////// struct FCasLocation { bool IsValid() const { return BlockId.IsValid() && BlockOffset != MAX_uint32; } friend inline bool operator==(FCasLocation LHS, FCasLocation RHS) { return LHS.BlockId == RHS.BlockId && LHS.BlockOffset == RHS.BlockOffset; } friend inline uint32 GetTypeHash(FCasLocation Loc) { return HashCombine(GetTypeHash(Loc.BlockId), GetTypeHash(Loc.BlockOffset)); } friend FArchive& operator<<(FArchive& Ar, FCasLocation& Loc) { Ar << Loc.BlockId; Ar << Loc.BlockOffset; return Ar; } static const FCasLocation Invalid; FCasBlockId BlockId; uint32 BlockOffset = MAX_uint32; }; const FCasLocation FCasLocation::Invalid = FCasLocation(); /////////////////////////////////////////////////////////////////////////////// struct FCasBlockInfo { uint64 FileSize = 0; int64 LastAccess = 0; uint64 RefSize = 0; }; using FCasBlockInfoMap = TMap; /////////////////////////////////////////////////////////////////////////////// struct FCas { static constexpr uint32 DeleteBlockMaxWaitTimeMs = 10000; using FLookup = TMap; using FReadHandles = TMap; using FLastAccess = TMap; using FBlockIdHandleCounts = TMap; FIoStatus Initialize(FStringView Directory, bool bDeleteExisting = false); FCasLocation FindChunk(const FIoHash& Hash) const; FCasBlockId CreateBlock(); FIoStatus DeleteBlock(FCasBlockId BlockId, TArray& OutAddrs); FString GetBlockFilename(FCasBlockId BlockId) const; FSharedFileOpenResult OpenRead(FCasBlockId BlockId); FSharedFileOpenAsyncResult OpenAsyncRead(FCasBlockId BlockId); void OnFileHandleDeleted(FCasBlockId BlockId); FUniqueFileHandle OpenWrite(FCasBlockId BlockId) const; void TrackAccess(FCasBlockId BlockId, int64 UtcTicks); void TrackAccess(FCasBlockId BlockId) { TrackAccess(BlockId, FDateTime::UtcNow().GetTicks()); } void TrackAccessIfNewer(FCasBlockId BlockId, int64 UtcTicks); uint64 GetBlockInfo(FCasBlockInfoMap& OutBlockInfo); void Compact(); FIoStatus Verify(TArray& OutAddrs); FStringView RootDirectory; FLookup Lookup; FBlockIdHandleCounts BlockIds; FLastAccess LastAccess; FReadHandles ReadHandles; FEventRef BlockReadsDoneEvent; const uint32 MaxBlockSize = 32 << 20; //TODO: Make configurable const uint32 MinBlockSize = MaxBlockSize >> 1; //TODO: Make configurable mutable UE::FMutex Mutex; }; /////////////////////////////////////////////////////////////////////////////// FIoStatus FCas::Initialize(FStringView Directory, bool bDeleteExisting) { RootDirectory = Directory; Lookup.Empty(); BlockIds.Empty(); LastAccess.Empty(); TStringBuilder<256> Path; FPathViews::Append(Path, RootDirectory, TEXT("blocks")); IFileManager& Ifm = IFileManager::Get(); if (bDeleteExisting) { bool bRequireExists = false; const bool bTree = true; if (Ifm.DeleteDirectory(Path.ToString(), bRequireExists, bTree) == false) { return FIoStatusBuilder(EIoErrorCode::WriteError) << TEXT("Failed to delete CAS blocks directory '") << Path.ToString() << TEXT("'"); } } if (Ifm.DirectoryExists(Path.ToString()) == false) { const bool bTree = true; if (Ifm.MakeDirectory(Path.ToString(), bTree) == false) { FIoStatus Status = FIoStatusBuilder(EIoErrorCode::WriteError) << TEXT("Failed to create directory '") << Path.ToString() << TEXT("'"); return Status; } } return EIoErrorCode::Ok; }; FCasLocation FCas::FindChunk(const FIoHash& Hash) const { const FCasAddr* Addr = reinterpret_cast(&Hash); const uint32 TypeHash = GetTypeHash(*Addr); { UE::TUniqueLock Lock(Mutex); if (const FCasLocation* Loc = Lookup.FindByHash(TypeHash, *Addr)) { return *Loc; } } return FCasLocation{}; } FCasBlockId FCas::CreateBlock() { IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile(); FCasBlockId Out = FCasBlockId::Invalid; UE::TUniqueLock Lock(Mutex); for (uint32 Id = 1; Id < MAX_uint32 && !Out.IsValid(); Id++) { const FCasBlockId BlockId(Id); if (BlockIds.Contains(BlockId)) { continue; } const FString Filename = GetBlockFilename(BlockId); if (Ipf.FileExists(*Filename)) { UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Unused CAS block id %u already exists on disk"), BlockId.Id); continue; } BlockIds.Add(BlockId, 0); LastAccess.FindOrAdd(BlockId, FDateTime::UtcNow().GetTicks()); Out = BlockId; } return Out; } FIoStatus FCas::DeleteBlock(FCasBlockId BlockId, TArray& OutAddrs) { IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile(); const FString Filename = GetBlockFilename(BlockId); UE::TDynamicUniqueLock Lock(Mutex, UE::FDeferLock()); // Wait for pending reads to flush before deleting block uint32 StartTimeCycles = FPlatformTime::Cycles(); const uint32 WaitTimeMs = 1000; for (;;) { Lock.Lock(); const int32 RequestCount = BlockIds.FindRef(BlockId); if (RequestCount) { Lock.Unlock(); if (FPlatformTime::ToMilliseconds(FPlatformTime::Cycles() - StartTimeCycles) > DeleteBlockMaxWaitTimeMs) { return FIoStatusBuilder(EIoErrorCode::Timeout) << TEXT("Timed out waiting for pending read(s) while deleting CAS block '") << Filename << TEXT("'"); } BlockReadsDoneEvent->Wait(WaitTimeMs); } else { // Leave mutex locked until it goes out of scope break; } } UE_LOG(LogIoStoreOnDemand, Log, TEXT("Deleting CAS block '%s'"), *Filename); if (Ipf.DeleteFile(*Filename) == false) { return FIoStatusBuilder(EIoErrorCode::WriteError) << TEXT("Failed to delete CAS block '") << Filename << TEXT("'"); } BlockIds.Remove(BlockId); ReadHandles.Remove(BlockId); for (auto It = Lookup.CreateIterator(); It; ++It) { if (It->Value.BlockId == BlockId) { OutAddrs.Add(It->Key); It.RemoveCurrent(); } } return FIoStatus::Ok; } FString FCas::GetBlockFilename(FCasBlockId BlockId) const { check(BlockId.IsValid()); const uint32 Id = NETWORK_ORDER32(BlockId.Id); FString Hex; BytesToHexLower(reinterpret_cast(&Id), sizeof(int32), Hex); TStringBuilder<256> Path; FPathViews::Append(Path, RootDirectory, TEXT("blocks"), Hex); Path << TEXT(".ucas"); return FString(Path.ToView()); } FSharedFileOpenResult FCas::OpenRead(FCasBlockId BlockId) { const FString Filename = GetBlockFilename(BlockId); IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile(); UE::TUniqueLock Lock(Mutex); FFileOpenResult Result = Ipf.OpenRead(*Filename, IPlatformFile::EOpenReadFlags::AllowWrite); if (Result.HasValue()) { BlockIds.FindOrAdd(BlockId, 0)++; FSharedFileHandle NewHandle( Result.GetValue().Release(), [this, BlockId](IFileHandle* RawHandle) { delete RawHandle; OnFileHandleDeleted(BlockId); } ); return MakeValue(MoveTemp(NewHandle)); } return MakeError(Result.StealError()); } FSharedFileOpenAsyncResult FCas::OpenAsyncRead(FCasBlockId BlockId) { UE::TUniqueLock Lock(Mutex); if (FWeakAsyncFileHandle* MaybeHandle = ReadHandles.Find(BlockId)) { if (FSharedAsyncFileHandle Handle = MaybeHandle->Pin(); Handle.IsValid()) { return MakeValue(MoveTemp(Handle)); } } IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile(); const FString Filename = GetBlockFilename(BlockId); FFileOpenAsyncResult HandleResult(Ipf.OpenAsyncRead(*Filename, IPlatformFile::EOpenReadFlags::AllowWrite)); if (HandleResult.HasValue()) { BlockIds.FindOrAdd(BlockId, 0)++; FSharedAsyncFileHandle NewHandle( HandleResult.GetValue().Release(), [this, BlockId](IAsyncReadFileHandle* RawHandle) { delete RawHandle; OnFileHandleDeleted(BlockId); } ); ReadHandles.FindOrAdd(BlockId, NewHandle); return MakeValue(MoveTemp(NewHandle)); } return MakeError(HandleResult.StealError()); } void FCas::OnFileHandleDeleted(FCasBlockId BlockId) { UE::TUniqueLock Lock(Mutex); const int32 Count = --BlockIds.FindChecked(BlockId); check(Count >= 0); if (Count == 0) { BlockReadsDoneEvent->Trigger(); } } FUniqueFileHandle FCas::OpenWrite(FCasBlockId BlockId) const { IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile(); const FString Filename = GetBlockFilename(BlockId); const bool bAppend = true; const bool bAllowRead = true; return FUniqueFileHandle(Ipf.OpenWrite(*Filename, bAppend, bAllowRead)); } void FCas::TrackAccess(FCasBlockId BlockId, int64 UtcTicks) { check(BlockId.IsValid()); UE::TUniqueLock Lock(Mutex); LastAccess.FindOrAdd(BlockId, UtcTicks); } void FCas::TrackAccessIfNewer(FCasBlockId BlockId, int64 UtcTicks) { check(BlockId.IsValid()); UE::TUniqueLock Lock(Mutex); int64& FoundTicks = LastAccess.FindOrAdd(BlockId, FDateTime::MinValue().GetTicks()); if (FoundTicks < UtcTicks) { FoundTicks = UtcTicks; } } uint64 FCas::GetBlockInfo(FCasBlockInfoMap& OutBlockInfo) { TStringBuilder<256> Path; FPathViews::Append(Path, RootDirectory, TEXT("blocks")); struct FDirectoryVisitor final : public IPlatformFile::FDirectoryVisitor { FDirectoryVisitor(IPlatformFile& PlatformFile, FCasBlockInfoMap& InBlockInfo, FLastAccess&& Access) : Ipf(PlatformFile) , BlockInfo(InBlockInfo) , LastAccess(MoveTemp(Access)) { } virtual bool Visit(const TCHAR* FilenameOrDirectory, bool bIsDirectory) override { if (bIsDirectory) { return true; } const FStringView Filename(FilenameOrDirectory); if (FPathViews::GetExtension(Filename) == TEXTVIEW("ucas") == false) { return true; } const int64 FileSize = Ipf.FileSize(FilenameOrDirectory); const FStringView IndexHex = FPathViews::GetBaseFilename(Filename); const FCasBlockId BlockId(FParse::HexNumber(WriteToString<128>(IndexHex).ToString())); if (BlockId.IsValid() == false || FileSize < 0) { UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Found invalid CAS block '%s', FileSize=%lld"), FilenameOrDirectory, FileSize); return true; } if (BlockInfo.Contains(BlockId)) { UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Found duplicate CAS block '%s'"), FilenameOrDirectory); return true; } const int64* UtcTicks = LastAccess.Find(BlockId); BlockInfo.Add(BlockId, FCasBlockInfo { .FileSize = uint64(FileSize), .LastAccess = UtcTicks != nullptr ? *UtcTicks : 0 }); TotalSize += uint64(FileSize); return true; } IPlatformFile& Ipf; FCasBlockInfoMap& BlockInfo; FLastAccess LastAccess; uint64 TotalSize = 0; }; FLastAccess Access; { TUniqueLock Lock(Mutex); Access = LastAccess; } IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile(); FDirectoryVisitor Visitor(Ipf, OutBlockInfo, MoveTemp(Access)); Ipf.IterateDirectory(Path.ToString(), Visitor); return Visitor.TotalSize; } void FCas::Compact() { UE::TUniqueLock Lock(Mutex); Lookup.Compact(); BlockIds.Compact(); ReadHandles.Compact(); LastAccess.Compact(); } FIoStatus FCas::Verify(TArray& OutAddrs) { FCasBlockInfoMap BlockInfo; const uint64 TotalSize = GetBlockInfo(BlockInfo); uint64 TotalVerifiedBytes = 0; FIoStatus Status = FIoStatus::Ok; for (auto BlockIt = BlockIds.CreateIterator(); BlockIt; ++BlockIt) { const FCasBlockId BlockId = BlockIt->Key; if (const FCasBlockInfo* Info = BlockInfo.Find(BlockId)) { TotalVerifiedBytes += Info->FileSize; continue; } const FString Filename = GetBlockFilename(BlockId); UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Missing CAS block '%s'"), *Filename); LastAccess.Remove(BlockId); BlockIt.RemoveCurrent(); Status = EIoErrorCode::NotFound; } UE_LOG(LogIoStoreOnDemand, Log, TEXT("Verified %d CAS blocks of total %.2lf MiB"), BlockIds.Num(), ToMiB(TotalVerifiedBytes)); IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile(); for (const TPair& Kv : BlockInfo) { const FCasBlockId BlockId = Kv.Key; if (BlockIds.Contains(BlockId)) { continue; } const FString Filename = GetBlockFilename(BlockId); if (Ipf.DeleteFile(*Filename)) { UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Deleted orphaned CAS block '%s'"), *Filename); } } TSet MissingReferencedBlocks; for (auto It = Lookup.CreateIterator(); It; ++It) { if (!BlockIds.Contains(It->Value.BlockId)) { MissingReferencedBlocks.Add(GetBlockFilename(It->Value.BlockId)); OutAddrs.Add(It->Key); It.RemoveCurrent(); Status = EIoErrorCode::NotFound; } } for (const FString& Filename : MissingReferencedBlocks) { UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Lookup references missing CAS block '%s'"), *Filename); } return Status; } /////////////////////////////////////////////////////////////////////////////// struct FCasJournal { enum class EVersion : uint32 { Invalid = 0, Initial, LatestPlusOne, Latest = LatestPlusOne - 1 }; struct FHeader { static const inline uint8 MagicSequence[16] = {'C', 'A', 'S', 'J', 'O', 'U', 'R', 'N', 'A', 'L', 'H', 'E', 'A', 'D', 'E', 'R'}; bool IsValid() const; uint8 Magic[16] = {0}; EVersion Version = EVersion::Invalid; uint8 Pad[12] = {0}; }; static_assert(sizeof(FHeader) == 32); struct FFooter { static const inline uint8 MagicSequence[16] = {'C', 'A', 'S', 'J', 'O', 'U', 'R', 'N', 'A', 'L', 'F', 'O', 'O', 'T', 'E', 'R'}; bool IsValid() const; uint8 Magic[16] = {0}; }; static_assert(sizeof(FFooter) == 16); struct FEntry { enum class EType : uint8 { None = 0, ChunkLocation, BlockCreated, BlockDeleted, BlockAccess }; struct FChunkLocation { EType Type = EType::ChunkLocation; uint8 Pad[3]= {0}; FCasLocation CasLocation; FCasAddr CasAddr; }; static_assert(sizeof(FChunkLocation) == 24); struct FBlockOperation { EType Type = EType::None; uint8 Pad[3]= {0}; FCasBlockId BlockId; int64 UtcTicks = 0; uint8 Pad1[8]= {0}; }; static_assert(sizeof(FBlockOperation) == 24); union { FChunkLocation ChunkLocation; FBlockOperation BlockOperation; }; EType Type() const { return *reinterpret_cast(this); } }; static_assert(sizeof(FEntry) == 24); struct FTransaction { void ChunkLocation(const FCasLocation& Location, const FCasAddr& Addr); void BlockCreated(FCasBlockId BlockId); void BlockDeleted(FCasBlockId BlockId); void BlockAccess(FCasBlockId BlockId, int64 UtcTicks); FString JournalFile; TArray Entries; }; using FEntryHandler = TFunction; static FIoStatus Replay(const FString& JournalFile, FEntryHandler&& Handler); static FIoStatus Create(const FString& JournalFile); static FTransaction Begin(FString&& JournalFile); static FTransaction Begin(const FString& JournalFile) { return Begin(FString(JournalFile)); } static FIoStatus Commit(FTransaction&& Transaction); }; /////////////////////////////////////////////////////////////////////////////// bool FCasJournal::FHeader::IsValid() const { if (FMemory::Memcmp(&Magic, &FHeader::MagicSequence, sizeof(FHeader::MagicSequence)) != 0) { return false; } if (static_cast(Version) > static_cast(EVersion::Latest)) { return false; } return true; } bool FCasJournal::FFooter::IsValid() const { return FMemory::Memcmp(Magic, FFooter::MagicSequence, sizeof(FFooter::MagicSequence)) == 0; } FIoStatus FCasJournal::Replay(const FString& JournalFile, FEntryHandler&& Handler) { IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile(); if (Ipf.FileExists(*JournalFile) == false) { return EIoErrorCode::NotFound; } TUniquePtr FileHandle(Ipf.OpenRead(*JournalFile)); if (FileHandle.IsValid() == false) { return EIoErrorCode::FileNotOpen; } FHeader Header; if ((FileHandle->Read(reinterpret_cast(&Header), sizeof(FHeader)) == false) || (Header.IsValid() == false)) { return FIoStatusBuilder(EIoErrorCode::ReadError) << TEXT("Failed to validate journal header '") << JournalFile << TEXT("'"); } const int64 FileSize = FileHandle->Size(); const int64 EntryCount = (FileSize - sizeof(FHeader) - sizeof(FFooter)) / sizeof(FEntry); if (EntryCount < 0) { return EIoErrorCode::ReadError; } if (EntryCount == 0) { return EIoErrorCode::Ok; } const int64 FooterPos = FileSize - sizeof(FFooter); if (FooterPos < 0) { return FIoStatusBuilder(EIoErrorCode::CorruptToc) << TEXT("Invalid journal footer"); } const int64 EntriesPos = FileHandle->Tell(); if (FileHandle->Seek(FooterPos) == false) { return EIoErrorCode::ReadError; } FFooter Footer; if ((FileHandle->Read(reinterpret_cast(&Footer), sizeof(FFooter)) == false) || (Footer.IsValid() == false)) { return FIoStatusBuilder(EIoErrorCode::ReadError) << TEXT("Failed to validate journal footer '") << JournalFile << TEXT("'"); } if (FileHandle->Seek(EntriesPos) == false) { return EIoErrorCode::ReadError; } TArray Entries; Entries.SetNumZeroed(IntCastChecked(EntryCount)); if (FileHandle->Read(reinterpret_cast(Entries.GetData()), sizeof(FEntry) * EntryCount) == false) { return EIoErrorCode::ReadError; } UE_LOG(LogIoStoreOnDemand, Log, TEXT("Replaying %" INT64_FMT " CAS journal entries of total %.2lf KiB from '%s'"), EntryCount, ToKiB(sizeof(FEntry) * EntryCount), *JournalFile); for (const FEntry& Entry : Entries) { Handler(Entry); } return EIoErrorCode::Ok; } FIoStatus FCasJournal::Create(const FString& JournalFile) { IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile(); Ipf.DeleteFile(*JournalFile); TUniquePtr FileHandle(Ipf.OpenWrite(*JournalFile)); if (FileHandle.IsValid() == false) { return EIoErrorCode::FileNotOpen; } FHeader Header; FMemory::Memcpy(&Header.Magic, &FHeader::MagicSequence, sizeof(FHeader::MagicSequence)); Header.Version = EVersion::Latest; if (FileHandle->Write(reinterpret_cast(&Header), sizeof(FHeader)) == false) { return EIoErrorCode::WriteError; } FFooter Footer; FMemory::Memcpy(&Footer.Magic, &FFooter::MagicSequence, sizeof(FFooter::MagicSequence)); if (FileHandle->Write(reinterpret_cast(&Footer), sizeof(FFooter)) == false) { return EIoErrorCode::WriteError; } return EIoErrorCode::Ok; } FCasJournal::FTransaction FCasJournal::Begin(FString&& JournalFile) { return FTransaction { .JournalFile = MoveTemp(JournalFile) }; } FIoStatus FCasJournal::Commit(FTransaction&& Transaction) { if (Transaction.Entries.IsEmpty()) { return EIoErrorCode::Ok; } IPlatformFile& Ipf = FPlatformFileManager::Get().GetPlatformFile(); // Validate header and footer { TUniquePtr FileHandle(Ipf.OpenRead(*Transaction.JournalFile)); const int64 FileSize = FileHandle.IsValid() ? FileHandle->Size() : -1; if (FileSize < sizeof(FHeader)) { FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::ReadError, 0); return FIoStatusBuilder(EIoErrorCode::FileOpenFailed) << TEXT("Failed to validate CAS journal file '") << Transaction.JournalFile << TEXT("'"); } FHeader Header; if ((FileHandle->Read(reinterpret_cast(&Header), sizeof(FHeader)) == false) || (Header.IsValid() == false)) { FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::SignatureError, 0); return FIoStatusBuilder(EIoErrorCode::ReadError) << TEXT("Failed to validate CAS journal header '") << Transaction.JournalFile << TEXT("'"); } const int64 FooterPos = FileSize - sizeof(FFooter); if (FileHandle->Seek(FooterPos) == false) { FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::SignatureError, 0); return FIoStatusBuilder(EIoErrorCode::ReadError) << TEXT("Failed to validate CAS journal footer '") << Transaction.JournalFile << TEXT("'"); } FFooter Footer; if ((FileHandle->Read(reinterpret_cast(&Footer), sizeof(FFooter)) == false) || (Footer.IsValid() == false)) { FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::SignatureError, 0); return FIoStatusBuilder(EIoErrorCode::ReadError) << TEXT("Failed to validate CAS journal footer '") << Transaction.JournalFile << TEXT("'"); } } // Append entries { const bool bAppend = true; TUniquePtr FileHandle(Ipf.OpenWrite(*Transaction.JournalFile, bAppend)); const int64 FileSize = FileHandle.IsValid() ? FileHandle->Size() : -1; const int64 EntriesPos = FileSize > 0 ? FileSize - sizeof(FFooter) : -1; if ((EntriesPos < 0) || (FileHandle->Seek(EntriesPos) == false)) { FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::WriteError, 0); return FIoStatusBuilder(EIoErrorCode::FileOpenFailed) << TEXT("Failed to open CAS journal '") << Transaction.JournalFile << TEXT("'"); } const int64 TotalEntrySize = Transaction.Entries.Num() * sizeof(FEntry); if (FileHandle->Write( reinterpret_cast(Transaction.Entries.GetData()), TotalEntrySize) == false) { FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::WriteError, 0); return FIoStatusBuilder(EIoErrorCode::WriteError) << TEXT("Failed to write CAS journal entries to '") << Transaction.JournalFile << TEXT("'"); } FFooter Footer; FMemory::Memcpy(&Footer.Magic, &FFooter::MagicSequence, sizeof(FFooter::MagicSequence)); if (FileHandle->Write(reinterpret_cast(&Footer), sizeof(FFooter)) == false) { FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::WriteError, 0); return FIoStatusBuilder(EIoErrorCode::WriteError) << TEXT("Failed to write CAS journal footer to '") << Transaction.JournalFile << TEXT("'"); } if (FileHandle->Flush() == false) { FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::WriteError, 0); return EIoErrorCode::WriteError; } UE_LOG(LogIoStoreOnDemand, Log, TEXT("Committed %d CAS journal entries of total %.2lf KiB to '%s'"), Transaction.Entries.Num(), ToKiB(TotalEntrySize), *Transaction.JournalFile); FOnDemandInstallCacheStats::OnJournalCommit(EIoErrorCode::Ok, TotalEntrySize); return EIoErrorCode::Ok; } } void FCasJournal::FTransaction::ChunkLocation(const FCasLocation& Location, const FCasAddr& Addr) { Entries.AddZeroed_GetRef().ChunkLocation = FEntry::FChunkLocation { .CasLocation = Location, .CasAddr = Addr }; } void FCasJournal::FTransaction::BlockCreated(FCasBlockId BlockId) { Entries.AddZeroed_GetRef().BlockOperation = FEntry::FBlockOperation { .Type = FEntry::EType::BlockCreated, .BlockId = BlockId, .UtcTicks = FDateTime::UtcNow().GetTicks() }; } void FCasJournal::FTransaction::BlockDeleted(FCasBlockId BlockId) { Entries.AddZeroed_GetRef().BlockOperation = FEntry::FBlockOperation { .Type = FEntry::EType::BlockDeleted, .BlockId = BlockId, .UtcTicks = FDateTime::UtcNow().GetTicks() }; } void FCasJournal::FTransaction::BlockAccess(FCasBlockId BlockId, int64 UtcTicks) { Entries.AddZeroed_GetRef().BlockOperation = FEntry::FBlockOperation { .Type = FEntry::EType::BlockAccess, .BlockId = BlockId, .UtcTicks = UtcTicks }; } /////////////////////////////////////////////////////////////////////////////// struct FCasSnapshot { enum class EVersion : uint32 { Invalid = 0, Initial, LatestPlusOne, Latest = LatestPlusOne - 1 }; struct FHeader { static const inline uint8 MagicSequence[16] = {'+', 'S', 'N', 'A', 'P', 'S', 'H', 'O', 'T', 'H', 'E', 'A', 'D', 'E', 'R', '+'}; bool IsValid() const; uint8 Magic[16] = {0}; EVersion Version = EVersion::Invalid; uint8 Pad[12] = {0}; }; static_assert(sizeof(FHeader) == 32); struct FFooter { static const inline uint8 MagicSequence[16] = {'+', 'S', 'N', 'A', 'P', 'S', 'H', 'O', 'T', 'F', 'O', 'O', 'T', 'E', 'R', '+'}; bool IsValid() const; uint8 Magic[16] = {0}; }; static_assert(sizeof(FFooter) == 16); struct FBlock { friend FArchive& operator<<(FArchive& Ar, FBlock& Block) { Ar << Block.BlockId; Ar << Block.LastAccess; return Ar; } FCasBlockId BlockId; int64 LastAccess; }; using FChunkLocation = TPair; static TIoStatusOr FromJournal(const FString& JournalFile); static TIoStatusOr Load(const FString& SnapshotFile, int64* OutFileSize = nullptr); static TIoStatusOr Save(const FCasSnapshot& Snapshot, const FString& SnapshotFile); static TIoStatusOr TryCreateAndResetJournal(const FString& SnapshotFile, const FString& JournalFile); TArray Blocks; TArray ChunkLocations; FCasBlockId CurrentBlockId; }; /////////////////////////////////////////////////////////////////////////////// bool FCasSnapshot::FHeader::IsValid() const { if (FMemory::Memcmp(&Magic, &FHeader::MagicSequence, sizeof(FHeader::MagicSequence)) != 0) { return false; } if (static_cast(Version) > static_cast(EVersion::Latest)) { return false; } return true; } bool FCasSnapshot::FFooter::IsValid() const { return FMemory::Memcmp(Magic, FFooter::MagicSequence, sizeof(FFooter::MagicSequence)) == 0; } TIoStatusOr FCasSnapshot::FromJournal(const FString& JournalFile) { TMap CasLookup; TMap LastAccess; TSet BlockIds; FCasBlockId CurrentBlockId; FIoStatus ReplayStatus = FCasJournal::Replay( JournalFile, [&CasLookup, &LastAccess, &BlockIds, &CurrentBlockId](const FCasJournal::FEntry& JournalEntry) { switch(JournalEntry.Type()) { case FCasJournal::FEntry::EType::ChunkLocation: { const FCasJournal::FEntry::FChunkLocation& ChunkLocation = JournalEntry.ChunkLocation; if (ChunkLocation.CasLocation.IsValid()) { FCasLocation& Loc = CasLookup.FindOrAdd(ChunkLocation.CasAddr); Loc = ChunkLocation.CasLocation; } else { CasLookup.Remove(ChunkLocation.CasAddr); } break; } case FCasJournal::FEntry::EType::BlockCreated: { const FCasJournal::FEntry::FBlockOperation& Op = JournalEntry.BlockOperation; CurrentBlockId = Op.BlockId; BlockIds.Add(Op.BlockId); break; } case FCasJournal::FEntry::EType::BlockDeleted: { const FCasJournal::FEntry::FBlockOperation& Op = JournalEntry.BlockOperation; BlockIds.Remove(Op.BlockId); if (CurrentBlockId == Op.BlockId) { CurrentBlockId = FCasBlockId::Invalid; } break; } case FCasJournal::FEntry::EType::BlockAccess: { const FCasJournal::FEntry::FBlockOperation& Op = JournalEntry.BlockOperation; LastAccess.Add(Op.BlockId, Op.UtcTicks); break; } }; }); FCasSnapshot Snapshot; Snapshot.Blocks.Reserve(BlockIds.Num()); for (FCasBlockId BlockId : BlockIds) { const int64* AccessTime = LastAccess.Find(BlockId); Snapshot.Blocks.Add(FBlock { .BlockId = BlockId, .LastAccess = AccessTime != nullptr ? *AccessTime : FDateTime::UtcNow().GetTicks() }); } Snapshot.ChunkLocations = CasLookup.Array(); Snapshot.CurrentBlockId = CurrentBlockId; return Snapshot; } TIoStatusOr FCasSnapshot::Save(const FCasSnapshot& Snapshot, const FString& SnapshotFile) { IFileManager& Ifm = IFileManager::Get(); const FString TmpSnapshotFile = FPaths::ChangeExtension(SnapshotFile, TEXT(".snptmp")); TUniquePtr Ar(Ifm.CreateFileWriter(*TmpSnapshotFile)); if (Ar.IsValid() == false) { return FIoStatus(EIoErrorCode::FileNotOpen); } FHeader Header; FMemory::Memcpy(&Header.Magic, &FHeader::MagicSequence, sizeof(FHeader::MagicSequence)); Header.Version = EVersion::Latest; Ar->Serialize(reinterpret_cast(&Header), sizeof(FHeader)); if (Ar->IsError()) { Ar.Reset(); Ifm.Delete(*TmpSnapshotFile); return FIoStatus(EIoErrorCode::WriteError); } FCasSnapshot& NonConst = *const_cast(&Snapshot); *Ar << NonConst.Blocks; *Ar << NonConst.ChunkLocations; *Ar << NonConst.CurrentBlockId; if (Ar->IsError()) { Ar.Reset(); Ifm.Delete(*TmpSnapshotFile); return FIoStatus(EIoErrorCode::WriteError); } FFooter Footer; FMemory::Memcpy(&Footer.Magic, &FFooter::MagicSequence, sizeof(FFooter::MagicSequence)); Ar->Serialize(reinterpret_cast(&Footer), sizeof(FFooter)); if (Ar->IsError()) { Ar.Reset(); Ifm.Delete(*TmpSnapshotFile); return FIoStatus(EIoErrorCode::WriteError); } const int64 FileSize = Ar->TotalSize(); if (Ar->Close() == false) { Ar.Reset(); Ifm.Delete(*TmpSnapshotFile); return FIoStatus(EIoErrorCode::WriteError); } if (Ifm.Move(*SnapshotFile, *TmpSnapshotFile) == false) { Ifm.Delete(*TmpSnapshotFile); return FIoStatus(EIoErrorCode::WriteError); } return FileSize; } TIoStatusOr FCasSnapshot::Load(const FString& SnapshotFile, int64* OutFileSize) { IFileManager& Ifm = IFileManager::Get(); TUniquePtr Ar(Ifm.CreateFileReader(*SnapshotFile)); if (Ar.IsValid() == false) { return FIoStatus(EIoErrorCode::NotFound); } FHeader Header; Ar->Serialize(reinterpret_cast(&Header), sizeof(FHeader)); if (Ar->IsError() || Header.IsValid() == false) { FIoStatus Status = FIoStatusBuilder(EIoErrorCode::ReadError) << TEXT("Failed to validate snapshot header '") << SnapshotFile << TEXT("'"); return Status; } FCasSnapshot Snapshot; *Ar << Snapshot.Blocks; *Ar << Snapshot.ChunkLocations; *Ar << Snapshot.CurrentBlockId; FFooter Footer; Ar->Serialize(reinterpret_cast(&Footer), sizeof(FFooter)); if (Ar->IsError() || Footer.IsValid() == false) { FIoStatus Status = FIoStatusBuilder(EIoErrorCode::ReadError) << TEXT("Failed to validate snapshot footer '") << SnapshotFile << TEXT("'"); return Status; } if (OutFileSize != nullptr) { *OutFileSize = Ar->Tell(); } return Snapshot; } TIoStatusOr FCasSnapshot::TryCreateAndResetJournal(const FString& SnapshotFile, const FString& JournalFile) { IFileManager& Ifm = IFileManager::Get(); const int64 JournalFileSize = Ifm.FileSize(*JournalFile); if (JournalFileSize < 0) { return FIoStatus(EIoErrorCode::NotFound); } // Load the snapshot from the journal TIoStatusOr SnapshotStatus = FCasSnapshot::FromJournal(JournalFile); if (SnapshotStatus.IsOk() == false) { return SnapshotStatus.Status(); } // Save the snapshot int64 SnapshotSize = -1; FCasSnapshot Snapshot = SnapshotStatus.ConsumeValueOrDie(); if (TIoStatusOr Status = FCasSnapshot::Save(Snapshot, SnapshotFile); Status.IsOk()) { SnapshotSize = Status.ConsumeValueOrDie(); } else { return Status.Status(); } // Try create a new empty journal const FString TmpJournalFile = FPaths::ChangeExtension(JournalFile, TEXT(".jrntmp")); if (FIoStatus Status = FCasJournal::Create(TmpJournalFile); Status.IsOk() == false) { if (Ifm.Delete(*SnapshotFile) == false) { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to delete CAS snapshot '%s'"), *SnapshotFile); } return Status; } if (Ifm.Move(*JournalFile , *TmpJournalFile) == false) { if (Ifm.Delete(*SnapshotFile) == false) { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to delete CAS snapshot '%s'"), *SnapshotFile); } return FIoStatus(EIoErrorCode::WriteError); } return SnapshotSize; } /////////////////////////////////////////////////////////////////////////////// class FOnDemandInstallCache final : public IOnDemandInstallCache { using FSharedBackendContextRef = TSharedRef; using FSharedBackendContext = TSharedPtr; struct FChunkRequest { explicit FChunkRequest( FSharedAsyncFileHandle FileHandle, FIoRequestImpl* Request, FOnDemandChunkInfo&& Info, FIoOffsetAndLength Range, uint64 RequestedRawSize) : SharedFileHandle(FileHandle) , DispatcherRequest(Request) , ChunkInfo(MoveTemp(Info)) , ChunkRange(Range) , EncodedChunk(ChunkRange.GetLength()) , RawSize(RequestedRawSize) { check(DispatcherRequest != nullptr); check(ChunkInfo.IsValid()); check(Request->NextRequest == nullptr); check(Request->BackendData == nullptr); } static FChunkRequest* Get(FIoRequestImpl& Request) { return reinterpret_cast(Request.BackendData); } static FChunkRequest& GetRef(FIoRequestImpl& Request) { check(Request.BackendData); return *reinterpret_cast(Request.BackendData); } static FChunkRequest& Attach(FIoRequestImpl& Request, FChunkRequest* ChunkRequest) { check(Request.BackendData == nullptr); check(ChunkRequest != nullptr); Request.BackendData = ChunkRequest; return *ChunkRequest; } static TUniquePtr Detach(FIoRequestImpl& Request) { void* ChunkRequest = nullptr; Swap(ChunkRequest, Request.BackendData); return TUniquePtr(reinterpret_cast(ChunkRequest)); } FSharedAsyncFileHandle SharedFileHandle; TUniquePtr FileReadRequest; FIoRequestImpl* DispatcherRequest; FOnDemandChunkInfo ChunkInfo; FIoOffsetAndLength ChunkRange; FIoBuffer EncodedChunk; uint64 RawSize; }; struct FPendingChunks { static constexpr uint64 MaxPendingBytes = 4ull << 20; bool IsEmpty() const { check(Chunks.Num() == ChunkHashes.Num()); return TotalSize == 0 && Chunks.IsEmpty() && ChunkHashes.IsEmpty(); } void Append(FIoBuffer&& Chunk, const FIoHash& ChunkHash) { check(Chunks.Num() == ChunkHashes.Num()); TotalSize += Chunk.GetSize(); ChunkHashes.Add(ChunkHash); Chunks.Add(MoveTemp(Chunk)); } FIoBuffer Pop(FIoHash& OutChunkHash) { check(Chunks.Num() == ChunkHashes.Num()); check(Chunks.IsEmpty() == false); FIoBuffer Chunk = Chunks.Pop(EAllowShrinking::No); TotalSize = TotalSize - Chunk.GetSize(); OutChunkHash = ChunkHashes.Pop(EAllowShrinking::No); return Chunk; } void Reset() { Chunks.Reset(); ChunkHashes.Reset(); TotalSize = 0; } TArray Chunks; TArray ChunkHashes; uint64 TotalSize = 0; }; using FUniquePendingChunks = TUniquePtr; public: FOnDemandInstallCache(const FOnDemandInstallCacheConfig& Config, FOnDemandIoStore& IoStore); virtual ~FOnDemandInstallCache(); // IIoDispatcherBackend virtual void Initialize(FSharedBackendContextRef Context) override; virtual void Shutdown() override; virtual void ResolveIoRequests(FIoRequestList Requests, FIoRequestList& OutUnresolved) override; virtual FIoRequestImpl* GetCompletedIoRequests() override; virtual void CancelIoRequest(FIoRequestImpl* Request) override; virtual void UpdatePriorityForIoRequest(FIoRequestImpl* Request) override; virtual bool DoesChunkExist(const FIoChunkId& ChunkId) const override; virtual TIoStatusOr GetSizeForChunk(const FIoChunkId& ChunkId) const override; virtual TIoStatusOr OpenMapped(const FIoChunkId& ChunkId, const FIoReadOptions& Options) override; virtual const TCHAR* GetName() const override; // IOnDemandInstallCache virtual bool IsChunkCached(const FIoHash& ChunkHash) override; virtual FIoStatus PutChunk(FIoBuffer&& Chunk, const FIoHash& ChunkHash) override; virtual FIoStatus Purge(TSet&& ChunksToInstall) override; virtual FIoStatus PurgeAllUnreferenced(bool bDefrag, const uint64* BytesToPurge = nullptr) override; virtual FIoStatus DefragAll(const uint64* BytesToFree = nullptr) override; virtual FIoStatus Verify() override; virtual FIoStatus Flush() override; virtual FOnDemandInstallCacheUsage GetCacheUsage() override; private: FIoStatus Reinitialize(); FIoStatus InitialVerify(); uint64 AddReferencesToBlocks( const TArray& Containers, const TArray>& ChunkEntryIndices, const TSet& ChunksToInstall, FCasBlockInfoMap& BlockInfoMap, uint64& OutTotalReferencedBytes) const; FIoStatus Purge(FCasBlockInfoMap& BlockInfo, uint64 TotalBytesToPurge, uint64& OutTotalPurgedBytes); FIoStatus Defrag( const TArray& Containers, const TArray>& ChunkEntryIndices, FCasBlockInfoMap& BlockInfo, const uint64* TotalBytesToFree = nullptr); bool Resolve(FIoRequestImpl* Request); void CompleteRequest(FIoRequestImpl* Request, EIoErrorCode Status); FIoStatus FlushPendingChunks(FPendingChunks& Chunks, int64 UtcAccessTicks = 0); FIoStatus FlushPendingChunksImpl(FPendingChunks& Chunks, int64 UtcAccessTicks = 0); FString GetJournalFilename() const { return CacheDirectory / TEXT("cas.jrn"); } FString GetSnapshotFilename() const { return CacheDirectory / TEXT("cas.snp"); } FOnDemandIoStore& IoStore; FString CacheDirectory; FCas Cas; std::atomic CurrentBlock{ FCasBlockId::Invalid }; FUniquePendingChunks PendingChunks; FSharedBackendContext BackendContext; FIoRequestList CompletedRequests; UE::FMutex Mutex; uint64 MaxCacheSize{ 0 }; uint64 MaxJournalSize; #if UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE UE::Tasks::FPipe ExclusivePipe{ UE_SOURCE_LOCATION }; #endif // UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE }; /////////////////////////////////////////////////////////////////////////////// FOnDemandInstallCache::FOnDemandInstallCache(const FOnDemandInstallCacheConfig& Config, FOnDemandIoStore& InIoStore) : IoStore(InIoStore) , CacheDirectory(Config.RootDirectory) , MaxCacheSize(Config.DiskQuota) , MaxJournalSize(Config.JournalMaxSize) { UE_LOG(LogIoStoreOnDemand, Log, TEXT("Initializing install cache, MaxCacheSize=%.2lf MiB, MaxJournalSize=%.2lf KiB"), ToMiB(MaxCacheSize), ToKiB(MaxJournalSize)); const uint64 MinDiskQuota = 2 * Cas.MaxBlockSize; if (MaxCacheSize < MinDiskQuota) { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to initialize install cache - disk quota must be at least %.2lf MiB"), ToMiB(MinDiskQuota)); return; } // Reserve one block of space for defragmentation overhead MaxCacheSize -= Cas.MaxBlockSize; UE_LOG(LogIoStoreOnDemand, Log, TEXT("Effective MaxCacheSize without defragmentation space is MaxCacheSize=%.2lf MiB"), ToMiB(MaxCacheSize)); FIoStatus Status = Cas.Initialize(CacheDirectory); if (Status.IsOk() == false) { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to initialize install cache, reason '%s'"), *Status.ToString()); return; } // Try read the journal snapshot { const FString SnapshotFile = GetSnapshotFilename(); int64 SnapshotSize = -1; TIoStatusOr SnapshotStatus = FCasSnapshot::Load(SnapshotFile, &SnapshotSize); if (SnapshotStatus.IsOk()) { FCasSnapshot Snapshot = SnapshotStatus.ConsumeValueOrDie(); UE_LOG(LogIoStoreOnDemand, Log, TEXT("Loaded CAS snapshot '%s' %.2lf KiB with %d blocks and %d chunk locations"), *SnapshotFile, ToKiB(SnapshotSize), Snapshot.Blocks.Num(), Snapshot.ChunkLocations.Num()); Cas.Lookup.Reserve(Snapshot.ChunkLocations.Num()); for (TPair& Kv : Snapshot.ChunkLocations) { Cas.Lookup.Add(MoveTemp(Kv)); } Cas.BlockIds.Reserve(Snapshot.Blocks.Num()); Cas.LastAccess.Reserve(Snapshot.Blocks.Num()); for (const FCasSnapshot::FBlock& Block : Snapshot.Blocks) { Cas.BlockIds.Add(Block.BlockId, 0); Cas.LastAccess.Add(Block.BlockId, Block.LastAccess); } } } // Replay the journal const FString JournalFile = GetJournalFilename(); Status = FCasJournal::Replay(JournalFile, [this](const FCasJournal::FEntry& JournalEntry) { switch(JournalEntry.Type()) { case FCasJournal::FEntry::EType::ChunkLocation: { const FCasJournal::FEntry::FChunkLocation& ChunkLocation = JournalEntry.ChunkLocation; if (ChunkLocation.CasLocation.IsValid()) { FCasLocation& Loc = Cas.Lookup.FindOrAdd(ChunkLocation.CasAddr); Loc = ChunkLocation.CasLocation; } else { Cas.Lookup.Remove(ChunkLocation.CasAddr); } break; } case FCasJournal::FEntry::EType::BlockCreated: { const FCasJournal::FEntry::FBlockOperation& Op = JournalEntry.BlockOperation; CurrentBlock = Op.BlockId; Cas.BlockIds.Add(Op.BlockId, 0); break; } case FCasJournal::FEntry::EType::BlockDeleted: { const FCasJournal::FEntry::FBlockOperation& Op = JournalEntry.BlockOperation; Cas.BlockIds.Remove(Op.BlockId); FCasBlockId MaybeCurrentBlock = Op.BlockId; CurrentBlock.compare_exchange_strong(MaybeCurrentBlock, FCasBlockId::Invalid); break; } case FCasJournal::FEntry::EType::BlockAccess: { const FCasJournal::FEntry::FBlockOperation& Op = JournalEntry.BlockOperation; Cas.TrackAccess(Op.BlockId, Op.UtcTicks); break; } }; }); // Initializing the cache for the first time if (Status.GetErrorCode() == EIoErrorCode::NotFound) { if (Status = FCasJournal::Create(JournalFile); Status.IsOk()) { UE_LOG(LogIoStoreOnDemand, Log, TEXT("Created CAS journal '%s'"), *JournalFile); // Make sure that there are no existing blocks when starting from an empty cache const bool bDeleteExisting = true; Status = Cas.Initialize(CacheDirectory, bDeleteExisting); } else { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to create CAS journal '%s'"), *JournalFile); } } // Verify the current state of the cache if (Status.IsOk()) { Status = InitialVerify(); } // Try to reinitialize the cache if something has gone wrong if (Status.IsOk() == false) { Status = Reinitialize(); } if (Status.IsOk()) { Cas.Compact(); } else { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to initialize install cache, reason '%s'"), *Status.ToString()); } } FOnDemandInstallCache::~FOnDemandInstallCache() { } void FOnDemandInstallCache::Initialize(FSharedBackendContextRef Context) { BackendContext = Context; } void FOnDemandInstallCache::Shutdown() { FCas::FLastAccess LastAccess; { TUniqueLock Lock(Cas.Mutex); LastAccess = MoveTemp(Cas.LastAccess); } const FString JournalFile = GetJournalFilename(); FCasJournal::FTransaction Transaction = FCasJournal::Begin(JournalFile); for (const TPair& Kv : LastAccess) { Transaction.BlockAccess(Kv.Key, Kv.Value); } if (FIoStatus Status = FCasJournal::Commit(MoveTemp(Transaction)); !Status.IsOk()) { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to update CAS journal '%s' with block timestamp(s), reason '%s'"), *JournalFile, *Status.ToString()); } IFileManager& Ifm = IFileManager::Get(); const FString JournalFilename = GetJournalFilename(); if (Ifm.FileSize(*JournalFile) > int64(MaxJournalSize)) { const FString SnapshotFilename = GetSnapshotFilename(); TIoStatusOr SnapshotStatus = FCasSnapshot::TryCreateAndResetJournal(SnapshotFilename, JournalFilename); if (SnapshotStatus.IsOk()) { UE_LOG(LogIoStoreOnDemand, Log, TEXT("Saved CAS snapshot '%s' %.2lf KiB"), *SnapshotFilename, ToKiB(SnapshotStatus.ValueOrDie())); } else { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to create CAS snapshot from journal '%s', reason '%s'"), *JournalFile, *SnapshotStatus.Status().ToString()); } } } void FOnDemandInstallCache::ResolveIoRequests(FIoRequestList Requests, FIoRequestList& OutUnresolved) { while (FIoRequestImpl* Request = Requests.PopHead()) { if (Resolve(Request) == false) { OutUnresolved.AddTail(Request); } } } FIoRequestImpl* FOnDemandInstallCache::GetCompletedIoRequests() { FIoRequestImpl* FirstCompleted = nullptr; { UE::TUniqueLock Lock(Mutex); for (FIoRequestImpl& Completed : CompletedRequests) { TUniquePtr Detached = FChunkRequest::Detach(Completed); } FirstCompleted = CompletedRequests.GetHead(); CompletedRequests = FIoRequestList(); } return FirstCompleted; } void FOnDemandInstallCache::CancelIoRequest(FIoRequestImpl* Request) { check(Request != nullptr); UE::TUniqueLock Lock(Mutex); if (FChunkRequest* ChunkRequest = FChunkRequest::Get(*Request)) { if (ChunkRequest->FileReadRequest.IsValid()) { ChunkRequest->FileReadRequest->Cancel(); } } } void FOnDemandInstallCache::UpdatePriorityForIoRequest(FIoRequestImpl* Request) { } bool FOnDemandInstallCache::DoesChunkExist(const FIoChunkId& ChunkId) const { EIoErrorCode ErrorCode = EIoErrorCode::UnknownChunkID; if (FOnDemandChunkInfo ChunkInfo = IoStore.GetInstalledChunkInfo(ChunkId, ErrorCode)) { const FCasLocation CasLoc = Cas.FindChunk(ChunkInfo.Hash()); return CasLoc.IsValid(); } return false; } TIoStatusOr FOnDemandInstallCache::GetSizeForChunk(const FIoChunkId& ChunkId) const { EIoErrorCode ErrorCode = EIoErrorCode::UnknownChunkID; if (FOnDemandChunkInfo ChunkInfo = IoStore.GetInstalledChunkInfo(ChunkId, ErrorCode)) { return ChunkInfo.RawSize(); } return FIoStatus(ErrorCode); } TIoStatusOr FOnDemandInstallCache::OpenMapped(const FIoChunkId& ChunkId, const FIoReadOptions& Options) { return FIoStatus(EIoErrorCode::FileOpenFailed); } const TCHAR* FOnDemandInstallCache::GetName() const { return TEXT("OnDemandInstallCache"); } bool FOnDemandInstallCache::Resolve(FIoRequestImpl* Request) { EIoErrorCode ErrorCode = EIoErrorCode::UnknownChunkID; FOnDemandChunkInfo ChunkInfo = IoStore.GetInstalledChunkInfo(Request->ChunkId, ErrorCode); if (ChunkInfo.IsValid() == false) { if (ErrorCode == EIoErrorCode::NotInstalled) { CompleteRequest(Request, EIoErrorCode::NotInstalled); return true; } return false; } const FCasLocation CasLoc = Cas.FindChunk(ChunkInfo.Hash()); if (CasLoc.IsValid() == false) { CompleteRequest(Request, EIoErrorCode::NotInstalled); return true; } const uint64 RequestSize = FMath::Min( Request->Options.GetSize(), ChunkInfo.RawSize() - Request->Options.GetOffset()); TIoStatusOr ChunkRange = FIoChunkEncoding::GetChunkRange( ChunkInfo.RawSize(), ChunkInfo.BlockSize(), ChunkInfo.Blocks(), Request->Options.GetOffset(), RequestSize); if (ChunkRange.IsOk() == false) { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to get chunk range")); CompleteRequest(Request, ChunkRange.Status().GetErrorCode()); return true; } TRACE_IOSTORE_BACKEND_REQUEST_STARTED(Request, this); Cas.TrackAccess(CasLoc.BlockId); #if UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE const bool bIsLocationInCurrentBlock = CasLoc.BlockId == CurrentBlock; if (bIsLocationInCurrentBlock) { // The current block may have open writes which may cause async reads to fail // on some platforms. Schedule the reads to happen on the same pipe as writes // The internal request parameters are attached/owned by the I/O request via // the backend data parameter. The chunk request is deleted in GetCompletedRequests FChunkRequest::Attach(*Request, new FChunkRequest( FSharedAsyncFileHandle(), Request, MoveTemp(ChunkInfo), ChunkRange.ConsumeValueOrDie(), RequestSize)); ExclusivePipe.Launch(UE_SOURCE_LOCATION, [this, Request, CasLoc] { FChunkRequest& ChunkRequest = FChunkRequest::GetRef(*Request); EIoErrorCode Status = EIoErrorCode::FileOpenFailed; const FString Filename = Cas.GetBlockFilename(CasLoc.BlockId); FSharedFileOpenResult FileOpenResult = Cas.OpenRead(CasLoc.BlockId); if (FileOpenResult.IsValid()) { Status = EIoErrorCode::ReadError; TSharedPtr FileHandle = FileOpenResult.StealValue(); const int64 CasBlockOffset = CasLoc.BlockOffset + ChunkRequest.ChunkRange.GetOffset(); if (Request->IsCancelled()) { UE_LOG(LogIoStoreOnDemand, Verbose, TEXT("Cancelled request - skipped seek to offset %lld in CAS block '%s'"), CasBlockOffset, *Filename); } else if (FileHandle->Seek(CasBlockOffset)) { const bool bOk = FileHandle->Read(ChunkRequest.EncodedChunk.GetData(), ChunkRequest.EncodedChunk.GetSize()); if (bOk) { Status = EIoErrorCode::Ok; } else { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to read %llu bytes at offset %lld in CAS block '%s'"), ChunkRequest.EncodedChunk.GetSize(), CasBlockOffset, *Filename); } } else { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to seek to offset %lld in CAS block '%s'"), CasBlockOffset, *Filename); } } else { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to open CAS block '%s' for reading, reason '%s'"), *Filename, *FileOpenResult.GetError().GetMessage()); } UE::Tasks::Launch(UE_SOURCE_LOCATION, [this, Request, Status] { CompleteRequest(Request, Status); }); }, UE::Tasks::ETaskPriority::BackgroundHigh); return true; } #endif // UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE FSharedFileOpenAsyncResult FileOpenResult = Cas.OpenAsyncRead(CasLoc.BlockId); if (FileOpenResult.HasError()) { const FString Filename = Cas.GetBlockFilename(CasLoc.BlockId); UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to open CAS block '%s' for async reading, reason '%s'"), *Filename, *FileOpenResult.GetError().GetMessage()); TUniquePtr Detached = FChunkRequest::Detach(*Request); CompleteRequest(Request, EIoErrorCode::FileOpenFailed); return true; } FSharedAsyncFileHandle FileHandle(FileOpenResult.StealValue()); // The internal request parameters are attached/owned by the I/O request via // the backend data parameter. The chunk request is deleted in GetCompletedRequests FChunkRequest& ChunkRequest = FChunkRequest::Attach(*Request, new FChunkRequest( FileHandle, Request, MoveTemp(ChunkInfo), ChunkRange.ConsumeValueOrDie(), RequestSize)); FAsyncFileCallBack Callback = [this, Request](bool bWasCancelled, IAsyncReadRequest* ReadRequest) { UE::Tasks::Launch(UE_SOURCE_LOCATION, [this, Request, bWasCancelled] { const EIoErrorCode Status = bWasCancelled ? EIoErrorCode::ReadError : EIoErrorCode::Ok; CompleteRequest(Request, Status); }); }; ChunkRequest.FileReadRequest.Reset(FileHandle->ReadRequest( CasLoc.BlockOffset + ChunkRequest.ChunkRange.GetOffset(), ChunkRequest.ChunkRange.GetLength(), EAsyncIOPriorityAndFlags::AIOP_BelowNormal, &Callback, ChunkRequest.EncodedChunk.GetData())); if (ChunkRequest.FileReadRequest.IsValid() == false) { TRACE_IOSTORE_BACKEND_REQUEST_FAILED(Request); TUniquePtr Detached = FChunkRequest::Detach(*Request); CompleteRequest(Request, EIoErrorCode::ReadError); return true; } return true; } bool FOnDemandInstallCache::IsChunkCached(const FIoHash& ChunkHash) { const FCasLocation Loc = Cas.FindChunk(ChunkHash); return Loc.IsValid(); } FIoStatus FOnDemandInstallCache::PutChunk(FIoBuffer&& Chunk, const FIoHash& ChunkHash) { if (PendingChunks.IsValid() == false) { PendingChunks = MakeUnique(); } if (PendingChunks->TotalSize > FPendingChunks::MaxPendingBytes) { if (FIoStatus Status = FlushPendingChunks(*PendingChunks); Status.IsOk() == false) { return Status; } check(PendingChunks->IsEmpty()); } PendingChunks->Append(MoveTemp(Chunk), ChunkHash); return FIoStatus::Ok; } FIoStatus FOnDemandInstallCache::Purge(TSet&& ChunksToInstall) { FCasBlockInfoMap BlockInfo; const uint64 TotalCachedBytes = Cas.GetBlockInfo(BlockInfo); TArray Containers; TArray> ChunkEntryIndices; IoStore.GetReferencedContent(Containers, ChunkEntryIndices); check(Containers.Num() == ChunkEntryIndices.Num()); uint64 ReferencedBytes = 0; uint64 FragmentedBytes = 0; uint64 TotalReferencedBlockBytes = 0; int64 OldestBlockAccess = FDateTime::MaxValue().GetTicks(); const uint64 TotalUncachedBytes = AddReferencesToBlocks(Containers, ChunkEntryIndices, ChunksToInstall, BlockInfo, ReferencedBytes); for (const TPair& Kv : BlockInfo) { const FCasBlockInfo& Info = Kv.Value; if (Info.RefSize < Info.FileSize) { FragmentedBytes += (Info.FileSize - Info.RefSize); } if (Info.RefSize > 0) { TotalReferencedBlockBytes += Info.FileSize; } if (Info.LastAccess < OldestBlockAccess) { OldestBlockAccess = Info.LastAccess; } } FOnDemandInstallCacheStats::OnCacheUsage( MaxCacheSize, TotalCachedBytes, TotalReferencedBlockBytes, ReferencedBytes, FragmentedBytes, OldestBlockAccess); const uint64 TotalRequiredBytes = TotalCachedBytes + TotalUncachedBytes; if (TotalRequiredBytes <= MaxCacheSize) { UE_LOG(LogIoStoreOnDemand, Log, TEXT("Skipping cache purge, MaxCacheSize=%.2lf MiB, CacheSize=%.2lf MiB, ReferencedBlockSize=%.2lf MiB, ReferencedSize=%.2lf MiB, FragmentedBytes=%.2lf MiB, UncachedSize=%.2lf MiB"), ToMiB(MaxCacheSize), ToMiB(TotalCachedBytes), ToMiB(TotalReferencedBlockBytes), ToMiB(ReferencedBytes), ToMiB(FragmentedBytes), ToMiB(TotalUncachedBytes)); return FIoStatus::Ok; } //TODO: Compute fragmentation metric and redownload chunks when this number gets too high UE_LOG(LogIoStoreOnDemand, Log, TEXT("Purging install cache, MaxCacheSize=%.2lf MiB, CacheSize=%.2lf MiB, ReferencedBlockSize=%.2lf MiB, ReferencedSize=%.2lf MiB, FragmentedBytes=%.2lf MiB, UncachedSize=%.2lf MiB"), ToMiB(MaxCacheSize), ToMiB(TotalCachedBytes), ToMiB(TotalReferencedBlockBytes), ToMiB(ReferencedBytes), ToMiB(FragmentedBytes), ToMiB(TotalUncachedBytes)); const uint64 TotalBytesToPurge = TotalRequiredBytes - MaxCacheSize; uint64 TotalPurgedBytes = 0; FIoStatus Status = Purge(BlockInfo, TotalBytesToPurge, TotalPurgedBytes); if (TotalPurgedBytes > 0) { UE_LOG(LogIoStoreOnDemand, Log, TEXT("Purged %.2lf MiB (%.2lf%%) from install cache"), ToMiB(TotalPurgedBytes), 100.0 * (double(TotalPurgedBytes) / double(TotalCachedBytes))); } const uint64 NewCachedBytes = TotalCachedBytes - TotalPurgedBytes; UE_CLOG(NewCachedBytes > MaxCacheSize, LogIoStoreOnDemand, Warning, TEXT("Max install cache size exceeded by %.2lf MiB (%.2lf%%)"), ToMiB(NewCachedBytes - MaxCacheSize), 100.0 * (double(NewCachedBytes - MaxCacheSize) / double(MaxCacheSize))); FOnDemandInstallCacheStats::OnPurge(Status.GetErrorCode(), MaxCacheSize, NewCachedBytes, TotalBytesToPurge, TotalPurgedBytes); if (TotalPurgedBytes < TotalBytesToPurge) { if (UE::IoStore::CVars::GIoStoreOnDemandEnableDefrag) { // Attempt to defrag const uint64 DefragBytesToPurge = TotalBytesToPurge - TotalPurgedBytes; FIoStatus DefragStatus = Defrag(Containers, ChunkEntryIndices, BlockInfo, &DefragBytesToPurge); if (DefragStatus.IsOk() == false) { return FIoStatusBuilder(EIoErrorCode::WriteError) << FString::Printf(TEXT("Failed to purge %" UINT64_FMT " from install cache after defrag (%s)"), TotalBytesToPurge, *DefragStatus.ToString()); } } else { return FIoStatusBuilder(EIoErrorCode::WriteError) << FString::Printf(TEXT("Failed to purge %" UINT64_FMT " from install cache"), TotalBytesToPurge); } } return Status; } FIoStatus FOnDemandInstallCache::PurgeAllUnreferenced(bool bDefrag, const uint64* BytesToPurge /*= nullptr*/) { FCasBlockInfoMap BlockInfo; const uint64 TotalCachedBytes = Cas.GetBlockInfo(BlockInfo); TArray Containers; TArray> ChunkEntryIndices; IoStore.GetReferencedContent(Containers, ChunkEntryIndices); check(Containers.Num() == ChunkEntryIndices.Num()); uint64 ReferencedBytes = 0; AddReferencesToBlocks(Containers, ChunkEntryIndices, {}, BlockInfo, ReferencedBytes); const uint64 TotalReferencedBytes = Algo::TransformAccumulate(BlockInfo, [](const TPair& Kv) { return (Kv.Value.RefSize > 0) ? Kv.Value.FileSize : uint64(0); }, uint64(0)); UE_LOG(LogIoStoreOnDemand, Log, TEXT("Purging install cache, MaxCacheSize=%.2lf MiB, CacheSize=%.2lf MiB, ReferencedBytes=%.2lf MiB"), ToMiB(MaxCacheSize), ToMiB(TotalCachedBytes), ToMiB(TotalReferencedBytes)); const uint64 TotalBytesToPurge = BytesToPurge ? *BytesToPurge : MaxCacheSize; uint64 TotalPurgedBytes = 0; FIoStatus Status = Purge(BlockInfo, TotalBytesToPurge, TotalPurgedBytes); if (TotalPurgedBytes > 0) { UE_LOG(LogIoStoreOnDemand, Log, TEXT("Purged %.2lf MiB (%.2lf%%) from install cache"), ToMiB(TotalPurgedBytes), 100.0 * (double(TotalPurgedBytes) / double(TotalCachedBytes))); } const uint64 NewCachedBytes = TotalCachedBytes - TotalPurgedBytes; UE_CLOG(NewCachedBytes > MaxCacheSize, LogIoStoreOnDemand, Warning, TEXT("Max install cache size exceeded by %.2lf MiB (%.2lf%%)"), ToMiB(NewCachedBytes - MaxCacheSize), 100.0 * (double(NewCachedBytes - MaxCacheSize) / double(MaxCacheSize))); if (BytesToPurge) { if (bDefrag) { // Attempt to defrag const uint64 DefragBytesToPurge = TotalBytesToPurge - TotalPurgedBytes; FIoStatus DefragStatus = Defrag(Containers, ChunkEntryIndices, BlockInfo, &DefragBytesToPurge); if (DefragStatus.IsOk() == false) { return FIoStatusBuilder(EIoErrorCode::WriteError) << FString::Printf(TEXT("Failed to purge %" UINT64_FMT " from install cache after defrag (%s)"), TotalBytesToPurge, *DefragStatus.ToString()); } } else { return FIoStatusBuilder(EIoErrorCode::WriteError) << FString::Printf(TEXT("Failed to purge %" UINT64_FMT " from install cache"), TotalBytesToPurge); } } else if (bDefrag) { // Just do Full Defrag FIoStatus DefragStatus = Defrag(Containers, ChunkEntryIndices, BlockInfo); if (DefragStatus.IsOk() == false) { return DefragStatus; } } return Status; } FIoStatus FOnDemandInstallCache::DefragAll(const uint64* BytesToFree /*= nullptr*/) { FCasBlockInfoMap BlockInfo; const uint64 TotalCachedBytes = Cas.GetBlockInfo(BlockInfo); TArray Containers; TArray> ChunkEntryIndices; IoStore.GetReferencedContent(Containers, ChunkEntryIndices); check(Containers.Num() == ChunkEntryIndices.Num()); uint64 ReferencedBytes = 0; AddReferencesToBlocks(Containers, ChunkEntryIndices, {}, BlockInfo, ReferencedBytes); const uint64 TotalReferencedBlockBytes = Algo::TransformAccumulate(BlockInfo, [](const TPair& Kv) { return (Kv.Value.RefSize > 0) ? Kv.Value.FileSize : uint64(0); }, uint64(0)); UE_LOG(LogIoStoreOnDemand, Log, TEXT("Defragmenting install cache, MaxCacheSize=%.2lf MiB, CacheSize=%.2lf MiB, ReferencedBlockSize=%.2lf MiB, ReferencedSize=%.2lf MiB"), ToMiB(MaxCacheSize), ToMiB(TotalCachedBytes), ToMiB(TotalReferencedBlockBytes), ToMiB(ReferencedBytes)); return Defrag(Containers, ChunkEntryIndices, BlockInfo, BytesToFree); } FIoStatus FOnDemandInstallCache::Verify() { struct FChunkLookup { TMap AddrToIndex; }; struct FCasAddrLocation { FCasAddr Addr; FCasLocation Location; bool operator<(const FCasAddrLocation& Other) const { if (Location.BlockId == Other.Location.BlockId) { return Location.BlockOffset < Other.Location.BlockOffset; } return Location.BlockId.Id < Other.Location.BlockId.Id; } }; TArray Containers = IoStore.GetContainers(EOnDemandContainerFlags::InstallOnDemand); TArray ChunkLocations; TArray ChunkLookups; { TUniqueLock Lock(Cas.Mutex); ChunkLocations.Reserve(Cas.Lookup.Num()); for (const TPair& Kv : Cas.Lookup) { ChunkLocations.Add(FCasAddrLocation { .Addr = Kv.Key, .Location = Kv.Value }); } } ChunkLocations.Sort(); ChunkLookups.Reserve(Containers.Num()); for (int32 Idx = 0; Idx < Containers.Num(); ++Idx) { FSharedOnDemandContainer& Container = Containers[Idx]; FChunkLookup& Lookup = ChunkLookups.AddDefaulted_GetRef(); Lookup.AddrToIndex.Reserve(Container->ChunkEntries.Num()); for (int32 EntryIndex = 0; const FOnDemandChunkEntry& Entry : Container->ChunkEntries) { const FCasAddr& Addr = *reinterpret_cast(&Entry.Hash); Lookup.AddrToIndex.Add(Addr, EntryIndex++); } } auto FindChunkEntry = [&Containers, &ChunkLookups](const FCasAddr& Addr, int32& OutContainerIndex) -> int32 { OutContainerIndex = INDEX_NONE; for (int32 Idx = 0; Idx < Containers.Num(); ++Idx) { FChunkLookup& Lookup = ChunkLookups[Idx]; if (const int32* EntryIndex = Lookup.AddrToIndex.Find(Addr)) { OutContainerIndex = Idx; return *EntryIndex; } } return INDEX_NONE; }; const int32 ChunkCount = ChunkLocations.Num(); uint32 CorruptChunkCount = 0; uint32 MissingChunkCount = 0; uint32 ReadErrorCount = 0; uint64 TotalVerifiedBytes = 0; FIoBuffer Chunk(1 << 20); if (ChunkCount == 0) { UE_LOG(LogIoStoreOnDemand, Log, TEXT("Verify skipped, install cache is empty")); return FIoStatus::Ok; } UE_LOG(LogIoStoreOnDemand, Log, TEXT("Verifying %d installed chunks..."), ChunkCount); for (int32 ChunkIndex = 0; const FCasAddrLocation& ChunkLocation : ChunkLocations) { FSharedFileOpenResult OpenResult = Cas.OpenRead(ChunkLocation.Location.BlockId); if (OpenResult.HasError()) { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to open block %u for reading"), ChunkLocation.Location.BlockId.Id); ReadErrorCount++; ChunkIndex++; continue; } int32 ContainerIndex = INDEX_NONE; int32 EntryIndex = FindChunkEntry(ChunkLocation.Addr, ContainerIndex); if (EntryIndex == INDEX_NONE) { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to find chunk entry for CAS adress '%s'"), *LexToString(ChunkLocation.Addr)); MissingChunkCount++; ChunkIndex++; continue; } const FSharedOnDemandContainer& Container = Containers[ContainerIndex]; const FIoChunkId& ChunkId = Container->ChunkIds[EntryIndex]; const FOnDemandChunkEntry& ChunkEntry = Container->ChunkEntries[EntryIndex]; FSharedFileHandle FileHandle = OpenResult.GetValue(); const int64 ChunkSize = Align(int64(ChunkEntry.EncodedSize), FAES::AESBlockSize); TotalVerifiedBytes += ChunkSize; if (int64(Chunk.GetSize()) < ChunkSize) { Chunk = FIoBuffer(ChunkSize); } if (FileHandle->Seek(int64(ChunkLocation.Location.BlockOffset)) == false) { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Chunk %d/%d SEEK FAILED, Container='%s', ChunkId='%s', ChunkSize=%lld, Hash='%s', Block=%u, BlockOffset=%u"), ChunkIndex + 1, ChunkCount, *Container->Name, *LexToString(ChunkId), ChunkSize, *LexToString(ChunkEntry.Hash), ChunkLocation.Location.BlockId.Id, ChunkLocation.Location.BlockOffset); ReadErrorCount++; ChunkIndex++; continue; } if (FileHandle->Read(reinterpret_cast(Chunk.GetData()), ChunkSize) == false) { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Chunk %d/%d READ FAILED, Container='%s', ChunkId='%s', ChunkSize=%lld, Hash='%s', Block=%u, BlockOffset=%u"), ChunkIndex + 1, ChunkCount, *Container->Name, *LexToString(ChunkId), ChunkSize, *LexToString(ChunkEntry.Hash), ChunkLocation.Location.BlockId.Id, ChunkLocation.Location.BlockOffset); ReadErrorCount++; ChunkIndex++; continue; } const FIoHash ChunkHash = FIoHash::HashBuffer(Chunk.GetView().Left(ChunkSize)); if (ChunkHash == ChunkEntry.Hash) { UE_LOG(LogIoStoreOnDemand, VeryVerbose, TEXT("Chunk %d/%d OK, Container='%s', ChunkId='%s', ChunkSize=%lld, Hash='%s', Block=%u, BlockOffset=%u"), ChunkIndex + 1, ChunkCount, *Container->Name, *LexToString(ChunkId), ChunkSize, *LexToString(ChunkEntry.Hash), ChunkLocation.Location.BlockId.Id, ChunkLocation.Location.BlockOffset); } else { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Chunk %d/%d CORRUPT, Container='%s', ChunkId='%s', ChunkSize=%lld, Hash='%s', ActualHash='%s', Block=%u, BlockOffset=%u"), ChunkIndex + 1, ChunkCount, *Container->Name, *LexToString(ChunkId), ChunkSize, *LexToString(ChunkEntry.Hash), *LexToString(ChunkHash), ChunkLocation.Location.BlockId.Id, ChunkLocation.Location.BlockOffset); CorruptChunkCount++; } ChunkIndex++; } if (CorruptChunkCount > 0 || MissingChunkCount > 0 || ReadErrorCount > 0) { const FString Reason = FString::Printf(TEXT("Verify install cache failed, Corrupt=%u, Missing=%u, ReadErrors=%u"), CorruptChunkCount, MissingChunkCount, ReadErrorCount); if (CorruptChunkCount > 0 || ReadErrorCount > 0) { UE_LOG(LogIoStoreOnDemand, Error, TEXT("%s"), *Reason); } else { UE_LOG(LogIoStoreOnDemand, Warning, TEXT("%s"), *Reason); } if (CorruptChunkCount > 0) { return FIoStatus(EIoErrorCode::SignatureError); } if (ReadErrorCount > 0) { return FIoStatus(EIoErrorCode::ReadError); } return FIoStatus(EIoErrorCode::NotFound); } UE_LOG(LogIoStoreOnDemand, Log, TEXT("Successfully verified %d chunk(s) of total %.2lf MiB"), ChunkCount, ToMiB(TotalVerifiedBytes)); return FIoStatus::Ok; } FIoStatus FOnDemandInstallCache::Reinitialize() { UE_LOG(LogIoStoreOnDemand, Log, TEXT("Recreating install cache in directory '%s'"), *CacheDirectory); IFileManager& Ifm = IFileManager::Get(); const bool bTree = true; if (Ifm.DeleteDirectory(*CacheDirectory, false, bTree) == false) { return FIoStatusBuilder(EIoErrorCode::WriteError) << TEXT("Failed to delete directory '") << CacheDirectory << TEXT("'"); } if (Ifm.MakeDirectory(*CacheDirectory, bTree) == false) { return FIoStatusBuilder(EIoErrorCode::WriteError) << TEXT("Failed to create directory '") << CacheDirectory << TEXT("'"); } if (FIoStatus Status = Cas.Initialize(CacheDirectory); Status.IsOk() == false) { return Status; } const FString JournalFile = GetJournalFilename(); if (FIoStatus Status = FCasJournal::Create(JournalFile); Status.IsOk() == false) { return Status; } UE_LOG(LogIoStoreOnDemand, Log, TEXT("Created CAS journal '%s'"), *JournalFile); return FIoStatus::Ok; } FIoStatus FOnDemandInstallCache::InitialVerify() { // Verify the blocks on disk with the current state of the CAS { TArray RemovedChunks; if (FIoStatus Verify = Cas.Verify(RemovedChunks); Verify.IsOk() == false) { FOnDemandInstallCacheStats::OnCasVerificationError(RemovedChunks.Num()); // Remove all entries that doesn't have a valid cache block FCasJournal::FTransaction Transaction = FCasJournal::Begin(GetJournalFilename()); for (const FCasAddr& Addr : RemovedChunks) { Transaction.ChunkLocation(FCasLocation::Invalid, Addr); } if (FIoStatus Status = FCasJournal::Commit(MoveTemp(Transaction)); Status.IsOk() == false) { return Status; } } } // Check if the cache is over budget { FCasBlockInfoMap BlockInfo; uint64 CacheSize = Cas.GetBlockInfo(BlockInfo); if (CacheSize > MaxCacheSize) { const uint64 TotalBytesToPurge = CacheSize - MaxCacheSize; uint64 TotalPurgedBytes = 0; UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Cache size is greater than disk quota - Purging install cache, MaxCacheSize=%.2lf MiB, TotalSize=%.2lf MiB, TotalBytesToPurge=%.2lf MiB"), ToMiB(MaxCacheSize), ToMiB(CacheSize), ToMiB(TotalBytesToPurge)); FIoStatus PurgeStatus = Purge(BlockInfo, TotalBytesToPurge, TotalPurgedBytes); if (PurgeStatus.IsOk() && TotalPurgedBytes >= TotalBytesToPurge) { UE_LOG(LogIoStoreOnDemand, Warning, TEXT("Successfully purged %.2lf MiB from install cache"), ToMiB(TotalPurgedBytes)); } else { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to purge %.2lf MiB from install cache. Actually purged %.2lf MiB from install cache"), ToMiB(TotalBytesToPurge), ToMiB(TotalPurgedBytes)); return FIoStatusBuilder(EIoErrorCode::WriteError) << FString::Printf(TEXT("Failed to purge overbudget cache (%s)"), *PurgeStatus.ToString()); } } } return FIoStatus::Ok; } uint64 FOnDemandInstallCache::AddReferencesToBlocks( const TArray& Containers, const TArray>& ChunkEntryIndices, const TSet& ChunksToInstall, FCasBlockInfoMap& BlockInfoMap, uint64& OutTotalReferencedBytes) const { uint64 TotalUncachedBytes = 0; OutTotalReferencedBytes = 0; for (int32 Index = 0; FSharedOnDemandContainer Container : Containers) { const TBitArray<>& IsReferenced = ChunkEntryIndices[Index++]; for (int32 EntryIndex = 0; const FOnDemandChunkEntry& Entry : Container->ChunkEntries) { const bool bToInstall = ChunksToInstall.Contains(Entry.Hash); const bool bIsReferenced = IsReferenced[EntryIndex]; uint64 ChunkDiskSize = Align(int64(Entry.EncodedSize), FAES::AESBlockSize); if (bIsReferenced) { OutTotalReferencedBytes += ChunkDiskSize; } FCasBlockInfo* BlockInfo = nullptr; if (bToInstall || bIsReferenced) { if (FCasLocation Loc = Cas.FindChunk(Entry.Hash); Loc.IsValid()) { BlockInfo = BlockInfoMap.Find(Loc.BlockId); if (!BlockInfo) { UE_CLOG(bIsReferenced, LogIoStoreOnDemand, Error, TEXT("Failed to find CAS block info for referenced chunk, ChunkId='%s', Container='%s'"), *LexToString(Container->ChunkIds[EntryIndex]), *Container->Name); } } else { UE_CLOG(bIsReferenced, LogIoStoreOnDemand, Error, TEXT("Failed to find CAS location for referenced chunk, ChunkId='%s', Container='%s'"), *LexToString(Container->ChunkIds[EntryIndex]), *Container->Name); } } if (BlockInfo) { BlockInfo->RefSize += ChunkDiskSize; } else if (bToInstall) { TotalUncachedBytes += ChunkDiskSize; } EntryIndex++; } } return TotalUncachedBytes; } FIoStatus FOnDemandInstallCache::Purge(FCasBlockInfoMap& BlockInfo, const uint64 TotalBytesToPurge, uint64& OutTotalPurgedBytes) { BlockInfo.ValueSort([](const FCasBlockInfo& LHS, const FCasBlockInfo& RHS) { return LHS.LastAccess < RHS.LastAccess; }); OutTotalPurgedBytes = 0; for (auto It = BlockInfo.CreateIterator(); It; ++It) { const FCasBlockId BlockId = It->Key; const FCasBlockInfo& Info = It->Value; if (Info.RefSize > 0) { continue; } FCasJournal::FTransaction Transaction = FCasJournal::Begin(GetJournalFilename()); TArray RemovedChunks; if (FIoStatus Status = Cas.DeleteBlock(BlockId, RemovedChunks); !Status.IsOk()) { return Status; } // This should be the only thread writing to CurrentBlock FCasBlockId MaybeCurrentBlock = BlockId; CurrentBlock.compare_exchange_strong(MaybeCurrentBlock, FCasBlockId::Invalid); OutTotalPurgedBytes += Info.FileSize; It.RemoveCurrent(); for (const FCasAddr& Addr : RemovedChunks) { Transaction.ChunkLocation(FCasLocation::Invalid, Addr); } Transaction.BlockDeleted(BlockId); if (FIoStatus Status = FCasJournal::Commit(MoveTemp(Transaction)); !Status.IsOk()) { return Status; } if (OutTotalPurgedBytes >= TotalBytesToPurge) { break; } } return FIoStatus::Ok; } FIoStatus FOnDemandInstallCache::Defrag( const TArray& Containers, const TArray>& ChunkEntryIndices, FCasBlockInfoMap& BlockInfo, const uint64* TotalBytesToFree /*= nullptr*/) { if (TotalBytesToFree && *TotalBytesToFree == 0) { return FIoStatus::Ok; } const uint64 TotalCachedBytes = Algo::TransformAccumulate(BlockInfo, [](const TPair& Kv) { return Kv.Value.FileSize; }, uint64(0)); if (TotalCachedBytes > MaxCacheSize) { // Ruh-Roh! There's not enough of the disk quota left to run a defrag! const FString ErrorMsg = FString::Printf(TEXT("Cache size is greater than disk quota - Cannot Defragment!, MaxCacheSize=%.2lf MiB, TotalCachedBytes=%.2lf MiB"), ToMiB(MaxCacheSize), ToMiB(TotalCachedBytes)); UE_LOG(LogIoStoreOnDemand, Error, TEXT("%s"), *ErrorMsg); FOnDemandInstallCacheStats::OnDefrag(EIoErrorCode::WriteError, 0); return FIoStatusBuilder(EIoErrorCode::WriteError) << ErrorMsg; } struct FDefragBlockReferencedChunk { uint32 BlockOffset = 0; uint32 EncodedSize = 0; FIoHash Hash; }; struct FDefragBlock { FCasBlockId BlockId; int64 LastAccess = 0; TArray ReferencedChunks; }; // Build the list of blocks to defrag and determine if its possible to free enough data through defragging TArray BlocksToDefrag; // Start with the least referenced blocks BlockInfo.ValueSort([](const FCasBlockInfo& LHS, const FCasBlockInfo& RHS) { return LHS.RefSize < RHS.RefSize; }); uint64 FragmentedBytes = 0; uint64 TotalBlockSize = 0; if (TotalBytesToFree) { // Partial defrag bool bPossibleToFreeBytes = false; uint64 FreedBlockBytes = 0; uint64 NewBlockBytes = 0; for (const TPair& Kv : BlockInfo) { const FCasBlockId BlockId = Kv.Key; const FCasBlockInfo& Info = Kv.Value; if (!bPossibleToFreeBytes && Info.RefSize < Info.FileSize) { // Block is fragmented FragmentedBytes += (Info.FileSize - Info.RefSize); TotalBlockSize += Info.FileSize; FreedBlockBytes += Info.FileSize; NewBlockBytes += Info.RefSize; // For now, assume that nothing will be moved to the current block BlocksToDefrag.Add(FDefragBlock{ .BlockId = BlockId, .LastAccess = Info.LastAccess }); if (FreedBlockBytes >= NewBlockBytes && FreedBlockBytes - NewBlockBytes >= *TotalBytesToFree) { bPossibleToFreeBytes = true; } } else if (Info.FileSize < Cas.MinBlockSize) { // Block is too small whether or not its fragmented if (ensure(Info.RefSize <= Info.FileSize)) { FragmentedBytes += (Info.FileSize - Info.RefSize); } TotalBlockSize += Info.FileSize; BlocksToDefrag.Add(FDefragBlock{ .BlockId = BlockId, .LastAccess = Info.LastAccess }); } } if (!bPossibleToFreeBytes) { FOnDemandInstallCacheStats::OnDefrag(EIoErrorCode::WriteError, 0); return FIoStatusBuilder(EIoErrorCode::WriteError) << FString::Printf(TEXT("Defrag failed - cannot free %" UINT64_FMT), *TotalBytesToFree); } } else { // Full defrag for (const TPair& Kv : BlockInfo) { const FCasBlockId BlockId = Kv.Key; const FCasBlockInfo& Info = Kv.Value; if (Info.RefSize < Info.FileSize) { // Block is fragmented FragmentedBytes += (Info.FileSize - Info.RefSize); TotalBlockSize += Info.FileSize; BlocksToDefrag.Add(FDefragBlock{ .BlockId = BlockId, .LastAccess = Info.LastAccess }); } else if (Info.FileSize < Cas.MinBlockSize) { // Block is too small whether or not its fragmented if (ensure(Info.RefSize <= Info.FileSize)) { FragmentedBytes += (Info.FileSize - Info.RefSize); } TotalBlockSize += Info.FileSize; BlocksToDefrag.Add(FDefragBlock{ .BlockId = BlockId, .LastAccess = Info.LastAccess }); } } if (BlocksToDefrag.IsEmpty()) { // Already defragged UE_LOG(LogIoStoreOnDemand, Display, TEXT("Cache not fragmented.")); return FIoStatus::Ok; } } UE_LOG(LogIoStoreOnDemand, Display, TEXT("Defrag found %" UINT64_FMT " fragmented bytes of %" UINT64_FMT " total bytes in %i blocks."), FragmentedBytes, TotalBlockSize, BlocksToDefrag.Num()); // Right now, don't allow moving chunks to the current block for defrag. Its somewhat dangerous and hard to reason about. // - Currently, the slack in the current block cannot be determined without opening a write handle to the block. // - If we defrag the current block itself, then we would need additional tracking so we don't lose any chunks moved into it. // - Additionally, this would also depend on the order blocks are defragged. // This should be the only thread writing to CurrentBlock. CurrentBlock = FCasBlockId::Invalid; // Determine chunks that need to be moved for each defrag block for (int32 Index = 0; FSharedOnDemandContainer Container : Containers) { const TBitArray<>& IsReferenced = ChunkEntryIndices[Index++]; for (int32 EntryIndex = 0; const FOnDemandChunkEntry& Entry : Container->ChunkEntries) { if (bool bIsReferenced = IsReferenced[EntryIndex++]; bIsReferenced == false) { continue; } if (FCasLocation Loc = Cas.FindChunk(Entry.Hash); Loc.IsValid()) { if (FDefragBlock* DefragBlock = Algo::FindBy(BlocksToDefrag, Loc.BlockId, &FDefragBlock::BlockId)) { DefragBlock->ReferencedChunks.Add(FDefragBlockReferencedChunk { .BlockOffset = Loc.BlockOffset, .EncodedSize = Entry.EncodedSize, .Hash = Entry.Hash, }); } } } } // Move chunks to new blocks and delete old blocks FPendingChunks DefragPendingChunks; for (const FDefragBlock& DefragBlock : BlocksToDefrag) { if (DefragBlock.ReferencedChunks.IsEmpty() == false) { FSharedFileOpenResult FileOpenResult = Cas.OpenRead(DefragBlock.BlockId); if (!FileOpenResult.IsValid()) { const FString Filename = Cas.GetBlockFilename(DefragBlock.BlockId); const FString ErrorMsg = FileOpenResult.GetError().GetMessage(); UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to open CAS block '%s' for reading, reason '%s'"), *Filename, *ErrorMsg); FOnDemandInstallCacheStats::OnDefrag(EIoErrorCode::FileOpenFailed, 0); return FIoStatusBuilder(EIoErrorCode::FileOpenFailed) << ErrorMsg; } FSharedFileHandle FileHandle = FileOpenResult.StealValue(); Algo::SortBy(DefragBlock.ReferencedChunks, &FDefragBlockReferencedChunk::BlockOffset); for (const FDefragBlockReferencedChunk& ReffedChunk : DefragBlock.ReferencedChunks) { FileHandle->Seek(ReffedChunk.BlockOffset); const int64 ChunkDiskSize = Align(int64(ReffedChunk.EncodedSize), FAES::AESBlockSize); FIoBuffer Buffer(ChunkDiskSize); const bool bOk = FileHandle->Read(Buffer.GetData(), Buffer.GetSize()); if (!bOk) { FOnDemandInstallCacheStats::OnDefrag(EIoErrorCode::ReadError, 0); return EIoErrorCode::ReadError; } const FIoHash ChunkHash = FIoHash::HashBuffer(Buffer.GetView()); if (ChunkHash != ReffedChunk.Hash) { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Found chunk with invalid hash while defragging block, BlockId=%u, BlockOffset=%u"), DefragBlock.BlockId.Id, ReffedChunk.BlockOffset); if (FIoStatus Status = FlushPendingChunks(DefragPendingChunks); Status.IsOk() == false) { FOnDemandInstallCacheStats::OnDefrag(Status.GetErrorCode(), 0); return Status; } FOnDemandInstallCacheStats::OnDefrag(EIoErrorCode::SignatureError, 0); return EIoErrorCode::SignatureError; } if (DefragPendingChunks.TotalSize > FPendingChunks::MaxPendingBytes) { if (FIoStatus Status = FlushPendingChunks(DefragPendingChunks, DefragBlock.LastAccess); Status.IsOk() == false) { FOnDemandInstallCacheStats::OnDefrag(Status.GetErrorCode(), 0); return Status; } check(DefragPendingChunks.IsEmpty()); } DefragPendingChunks.Append(MoveTemp(Buffer), ReffedChunk.Hash); } FileHandle.Reset(); if (FIoStatus Status = FlushPendingChunks(DefragPendingChunks); Status.IsOk() == false) { FOnDemandInstallCacheStats::OnDefrag(Status.GetErrorCode(), 0); return Status; } check(DefragPendingChunks.IsEmpty()); } FCasJournal::FTransaction Transaction = FCasJournal::Begin(GetJournalFilename()); // Flushing should overwrite the lookup info for the cas addr to point at the new block. // Can now remove the old block TArray DeletedChunkAddresses; Cas.DeleteBlock(DefragBlock.BlockId, DeletedChunkAddresses); for (const FCasAddr& Addr : DeletedChunkAddresses) { Transaction.ChunkLocation(FCasLocation::Invalid, Addr); } Transaction.BlockDeleted(DefragBlock.BlockId); if (FIoStatus Status = FCasJournal::Commit(MoveTemp(Transaction)); !Status.IsOk()) { FOnDemandInstallCacheStats::OnDefrag(Status.GetErrorCode(), 0); return Status; } } UE_LOG(LogIoStoreOnDemand, Display, TEXT("Defrag removed %" UINT64_FMT " fragmented bytes of %" UINT64_FMT " total bytes in %i blocks."), FragmentedBytes, TotalBlockSize, BlocksToDefrag.Num()); FOnDemandInstallCacheStats::OnDefrag(EIoErrorCode::Ok, FragmentedBytes); return FIoStatus::Ok; } FIoStatus FOnDemandInstallCache::Flush() { if (PendingChunks.IsValid()) { FUniquePendingChunks Chunks = MoveTemp(PendingChunks); return FlushPendingChunks(*Chunks); } Cas.Compact(); return FIoStatus::Ok; } FOnDemandInstallCacheUsage FOnDemandInstallCache::GetCacheUsage() { // If this is called from a thread other than the OnDemandIoStore tick thread // then its possible the block info and containers may not be in sync with each other // or the current state of the tick thread. // This should only be used for debugging and telemetry purposes. FCasBlockInfoMap BlockInfo; const uint64 TotalCachedBytes = Cas.GetBlockInfo(BlockInfo); TArray Containers; TArray> ChunkEntryIndices; IoStore.GetReferencedContent(Containers, ChunkEntryIndices); check(Containers.Num() == ChunkEntryIndices.Num()); uint64 ReferencedBytes = 0; AddReferencesToBlocks(Containers, ChunkEntryIndices, {}, BlockInfo, ReferencedBytes); uint64 FragmentedBytes = 0; uint64 ReferencedBlockBytes = 0; for (const TPair& Kv : BlockInfo) { const FCasBlockId BlockId = Kv.Key; const FCasBlockInfo& Info = Kv.Value; if (Info.RefSize < Info.FileSize) { FragmentedBytes += (Info.FileSize - Info.RefSize); } if (Info.RefSize > 0) { ReferencedBlockBytes += Info.FileSize; } } return FOnDemandInstallCacheUsage { .MaxSize = MaxCacheSize, .TotalSize = TotalCachedBytes, .ReferencedBlockSize = ReferencedBlockBytes, .ReferencedSize = ReferencedBytes, .FragmentedChunksSize = FragmentedBytes, }; } FIoStatus FOnDemandInstallCache::FlushPendingChunks(FPendingChunks& Chunks, int64 UtcAccessTicks) { if (Chunks.IsEmpty()) { return FIoStatus::Ok; } #if UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE UE::Tasks::TTask Task = ExclusivePipe.Launch(UE_SOURCE_LOCATION, [this, &Chunks, UtcAccessTicks] { return FlushPendingChunksImpl(Chunks, UtcAccessTicks); }, UE::Tasks::ETaskPriority::BackgroundHigh); Task.Wait(); return Task.GetResult(); #else return FlushPendingChunksImpl(Chunks, UtcAccessTicks); #endif // UE_ONDEMANDINSTALLCACHE_EXCLUSIVE_WRITE } FIoStatus FOnDemandInstallCache::FlushPendingChunksImpl(FPendingChunks& Chunks, int64 UtcAccessTicks) { ON_SCOPE_EXIT { Chunks.Reset(); }; // This should be the only thread writing to CurrentBlock FCasBlockId CurrentBlockId = CurrentBlock; while (Chunks.IsEmpty() == false) { FCasJournal::FTransaction Transaction = FCasJournal::Begin(GetJournalFilename()); if (CurrentBlockId.IsValid() == false) { CurrentBlockId = Cas.CreateBlock(); ensure(CurrentBlockId.IsValid()); CurrentBlock = CurrentBlockId; Transaction.BlockCreated(CurrentBlockId); } TUniquePtr CasFileHandle = Cas.OpenWrite(CurrentBlockId); if (CasFileHandle.IsValid() == false) { FOnDemandInstallCacheStats::OnFlush(EIoErrorCode::WriteError, 0); return FIoStatusBuilder(EIoErrorCode::FileOpenFailed) << TEXT("Failed to open cache block file '") << Cas.GetBlockFilename(CurrentBlockId) << TEXT("'"); } const int64 CasBlockOffset = CasFileHandle->Tell(); FLargeMemoryWriter Ar(Chunks.TotalSize); TArray ChunkHashes; TArray Offsets; while (Chunks.IsEmpty() == false) { if (CasBlockOffset > 0 && CasBlockOffset + Ar.Tell() + Chunks.Chunks[0].GetSize() > Cas.MaxBlockSize) { break; } FIoBuffer Chunk = Chunks.Pop(ChunkHashes.AddDefaulted_GetRef()); Offsets.Add(CasBlockOffset + Ar.Tell()); Ar.Serialize(Chunk.GetData(), Chunk.GetSize()); } if (Ar.Tell() > 0) { UE_LOG(LogIoStoreOnDemand, Log, TEXT("Writing %.2lf MiB to CAS block %u"), ToMiB(Ar.Tell()), CurrentBlockId.Id); if (CasFileHandle->Write(Ar.GetData(), Ar.Tell()) == false) { return FIoStatusBuilder(EIoErrorCode::WriteError) << TEXT("Failed to serialize chunks to cache block"); } if (UtcAccessTicks) { Cas.TrackAccessIfNewer(CurrentBlockId, UtcAccessTicks); } else { Cas.TrackAccess(CurrentBlockId); } if (CasFileHandle->Flush() == false) { FOnDemandInstallCacheStats::OnFlush(EIoErrorCode::WriteError, Ar.Tell()); return FIoStatusBuilder(EIoErrorCode::WriteError) << TEXT("Failed to flush cache block to disk"); } FOnDemandInstallCacheStats::OnFlush(EIoErrorCode::Ok, Ar.Tell()); check(ChunkHashes.Num() == Offsets.Num()); check(CurrentBlockId.IsValid()); for (int32 Idx = 0, Count = Offsets.Num(); Idx < Count; ++Idx) { const FCasAddr CasAddr = FCasAddr::From(ChunkHashes[Idx]); const uint32 ChunkOffset = IntCastChecked(Offsets[Idx]); FCasLocation& Loc = Cas.Lookup.FindOrAdd(CasAddr); Loc.BlockId = CurrentBlockId; Loc.BlockOffset = ChunkOffset; Transaction.ChunkLocation(Loc, CasAddr); } } if (FIoStatus Status = FCasJournal::Commit(MoveTemp(Transaction)); Status.IsOk() == false) { return Status; } if (Chunks.IsEmpty() == false) { CurrentBlockId = FCasBlockId::Invalid; } } return FIoStatus::Ok; } void FOnDemandInstallCache::CompleteRequest(FIoRequestImpl* Request, EIoErrorCode Status) { bool bSucceeded = false; if (Status == EIoErrorCode::Ok && !Request->IsCancelled()) { FChunkRequest& ChunkRequest = FChunkRequest::GetRef(*Request); const FOnDemandChunkInfo& ChunkInfo = ChunkRequest.ChunkInfo; FIoBuffer EncodedChunk = MoveTemp(ChunkRequest.EncodedChunk); if (EncodedChunk.GetSize() > 0) { FIoChunkDecodingParams Params; Params.CompressionFormat = ChunkInfo.CompressionFormat(); Params.EncryptionKey = ChunkInfo.EncryptionKey(); Params.BlockSize = ChunkInfo.BlockSize(); Params.TotalRawSize = ChunkInfo.RawSize(); Params.RawOffset = Request->Options.GetOffset(); Params.EncodedOffset = ChunkRequest.ChunkRange.GetOffset(); Params.EncodedBlockSize = ChunkInfo.Blocks(); Params.BlockHash = ChunkInfo.BlockHashes(); Request->CreateBuffer(ChunkRequest.RawSize); FMutableMemoryView RawChunk = Request->GetBuffer().GetMutableView(); bSucceeded = FIoChunkEncoding::Decode(Params, EncodedChunk.GetView(), RawChunk); UE_CLOG(!bSucceeded, LogIoStoreOnDemand, Error, TEXT("Failed to decode chunk, ChunkId='%s'"), *LexToString(Request->ChunkId)); } } if (bSucceeded == false) { if (Status == EIoErrorCode::Ok) { Status = EIoErrorCode::ReadError; } Request->SetLastBackendError(Status); Request->SetResult(FIoBuffer()); TRACE_IOSTORE_BACKEND_REQUEST_FAILED(Request); } else { TRACE_IOSTORE_BACKEND_REQUEST_COMPLETED(Request, Request->GetBuffer().GetSize()); } { UE::TUniqueLock Lock(Mutex); CompletedRequests.AddTail(Request); } BackendContext->WakeUpDispatcherThreadDelegate.Execute(); } /////////////////////////////////////////////////////////////////////////////// TSharedPtr MakeOnDemandInstallCache( FOnDemandIoStore& IoStore, const FOnDemandInstallCacheConfig& Config) { IFileManager& Ifm = IFileManager::Get(); if (Config.bDropCache) { UE_LOG(LogIoStoreOnDemand, Log, TEXT("Deleting install cache directory '%s'"), *Config.RootDirectory); Ifm.DeleteDirectory(*Config.RootDirectory, false, true); } const bool bTree = true; if (!Ifm.MakeDirectory(*Config.RootDirectory, bTree)) { UE_LOG(LogIoStoreOnDemand, Error, TEXT("Failed to create directory '%s'"), *Config.RootDirectory); return TSharedPtr(); } return MakeShared(Config, IoStore); } /////////////////////////////////////////////////////////////////////////////// #if WITH_IOSTORE_ONDEMAND_TESTS class FTmpDirectoryScope { public: explicit FTmpDirectoryScope(const FString& InDir) : Ifm(IFileManager::Get()) , Dir(InDir) { const bool bTree = true; const bool bRequireExists = false; Ifm.DeleteDirectory(*Dir, bRequireExists, bTree); Ifm.MakeDirectory(*Dir, bTree); } ~FTmpDirectoryScope() { const bool bTree = true; const bool bRequireExists = false; Ifm.DeleteDirectory(*Dir, bRequireExists, bTree); } private: IFileManager& Ifm; FString Dir; }; FCasAddr CreateCasTestAddr(uint64 Value) { return FCasAddr::From(reinterpret_cast(&Value), sizeof(uint64)); } TEST_CASE("IoStore::OnDemand::InstallCache::Journal", "[IoStoreOnDemand][InstallCache]") { const FString TestBaseDir = TEXT("TestTmpDir"); SECTION("CreateJournalFile") { FTmpDirectoryScope _(TestBaseDir); const FString JournalFile = TestBaseDir / TEXT("test.jrn"); FIoStatus Status = FCasJournal::Create(JournalFile); CHECK(Status.IsOk()); } SECTION("SimpleTransaction") { FTmpDirectoryScope _(TestBaseDir); const FString JournalFile = TestBaseDir / TEXT("test.jrn"); FIoStatus Status = FCasJournal::Create(JournalFile); CHECK(Status.IsOk()); FCasJournal::FTransaction Transaction = FCasJournal::Begin(JournalFile); Transaction.BlockCreated(FCasBlockId(1)); Status = FCasJournal::Commit(MoveTemp(Transaction)); CHECK(Status.IsOk()); } SECTION("ReplayChunkLocations") { //Arrange TArray ExpectedAddresses; TArray ExpectedBlockOffsets; const FCasBlockId ExpectedBlockId(42); for (int32 Idx = 1; Idx < 33; ++Idx) { ExpectedAddresses.Add(FCasAddr::From(reinterpret_cast(&Idx), sizeof(uint32))); ExpectedBlockOffsets.Add(Idx); } // Act FTmpDirectoryScope _(TestBaseDir); const FString JournalFile = TestBaseDir / TEXT("test.jrn"); FIoStatus Status = FCasJournal::Create(JournalFile); CHECK(Status.IsOk()); FCasJournal::FTransaction Transaction = FCasJournal::Begin(JournalFile); for (int32 Idx = 0; const FCasAddr& Addr : ExpectedAddresses) { Transaction.ChunkLocation( FCasLocation { .BlockId = ExpectedBlockId, .BlockOffset = ExpectedBlockOffsets[Idx] }, Addr); } Status = FCasJournal::Commit(MoveTemp(Transaction)); CHECK(Status.IsOk()); // Assert TArray Locs; Status = FCasJournal::Replay( JournalFile, [&Locs](const FCasJournal::FEntry& JournalEntry) { switch(JournalEntry.Type()) { case FCasJournal::FEntry::EType::ChunkLocation: { Locs.Add(JournalEntry.ChunkLocation); break; } default: CHECK(false); break; }; }); CHECK(Status.IsOk()); CHECK(Locs.Num() == ExpectedAddresses.Num()); for (int32 Idx = 0; const FCasJournal::FEntry::FChunkLocation& Loc : Locs) { const FCasLocation ExpectedLoc = FCasLocation { .BlockId = ExpectedBlockId, .BlockOffset = uint32(Idx + 1) }; CHECK(Loc.CasLocation.BlockId == ExpectedLoc.BlockId); CHECK(Loc.CasLocation.BlockOffset == ExpectedLoc.BlockOffset); } } SECTION("ReplayBlockCreatedAndDeleted") { // Arrange const FCasBlockId ExpectedBlockId(42); // Act FTmpDirectoryScope _(TestBaseDir); const FString JournalFile = TestBaseDir / TEXT("test.jrn"); FIoStatus Status = FCasJournal::Create(JournalFile); CHECK(Status.IsOk()); FCasJournal::FTransaction Tx = FCasJournal::Begin(JournalFile); Tx.BlockCreated(ExpectedBlockId); Tx.BlockDeleted(ExpectedBlockId); Status = FCasJournal::Commit(MoveTemp(Tx)); CHECK(Status.IsOk()); // Assert FCasBlockId CreatedBlockId; FCasBlockId DeletedBlockId; Status = FCasJournal::Replay( JournalFile, [&CreatedBlockId, &DeletedBlockId](const FCasJournal::FEntry& JournalEntry) { switch(JournalEntry.Type()) { case FCasJournal::FEntry::EType::BlockCreated: { CreatedBlockId = JournalEntry.BlockOperation.BlockId; break; } case FCasJournal::FEntry::EType::BlockDeleted: { DeletedBlockId = JournalEntry.BlockOperation.BlockId; break; } default: CHECK(false); break; }; }); CHECK(Status.IsOk()); CHECK(CreatedBlockId == ExpectedBlockId); CHECK(DeletedBlockId == ExpectedBlockId); } SECTION("ReplayBlockAccess") { // Arrange const FCasBlockId ExpectedBlockId(462); const uint64 ExpectedTicks = FDateTime::UtcNow().GetTicks(); // Act FTmpDirectoryScope _(TestBaseDir); const FString JournalFile = TestBaseDir / TEXT("test.jrn"); FIoStatus Status = FCasJournal::Create(JournalFile); CHECK(Status.IsOk()); FCasJournal::FTransaction Tx = FCasJournal::Begin(JournalFile); Tx.BlockAccess(ExpectedBlockId, ExpectedTicks); Status = FCasJournal::Commit(MoveTemp(Tx)); CHECK(Status.IsOk()); // Assert FCasBlockId BlockId; uint64 Ticks = 0; Status = FCasJournal::Replay( JournalFile, [&BlockId, &Ticks](const FCasJournal::FEntry& JournalEntry) { switch(JournalEntry.Type()) { case FCasJournal::FEntry::EType::BlockAccess: { const FCasJournal::FEntry::FBlockOperation& Op = JournalEntry.BlockOperation; BlockId = Op.BlockId; Ticks = Op.UtcTicks; break; } default: CHECK(false); break; }; }); CHECK(Status.IsOk()); CHECK(BlockId == ExpectedBlockId); CHECK(Ticks == ExpectedTicks); } } TEST_CASE("IoStore::OnDemand::InstallCache::Snapshot", "[IoStoreOnDemand][InstallCache]") { const FString TestBaseDir = TEXT("TestTmpDir"); SECTION("SaveLoadRoundtrip") { // Arrange FCasSnapshot ExpectedSnapshot; for (uint32 Id = 1; Id <= 10; ++Id) { ExpectedSnapshot.Blocks.Add(FCasSnapshot::FBlock { .BlockId = FCasBlockId(Id), .LastAccess = FDateTime::UtcNow().GetTicks() }); for (uint32 Idx = 1; Idx <= 10; ++Idx) { FCasAddr CasAddr = CreateCasTestAddr(Idx); FCasLocation Loc = FCasLocation { .BlockId = FCasBlockId(Id), .BlockOffset = Idx * 256 }; ExpectedSnapshot.ChunkLocations.Emplace(CasAddr, Loc); } } ExpectedSnapshot.CurrentBlockId = FCasBlockId(1); // Act FTmpDirectoryScope _(TestBaseDir); const FString SnapshotFile = TestBaseDir / TEXT("test.snp"); TIoStatusOr Status = FCasSnapshot::Save(ExpectedSnapshot, SnapshotFile); CHECK(Status.IsOk()); const FCasSnapshot Snapshot = FCasSnapshot::Load(SnapshotFile).ConsumeValueOrDie(); // Assert CHECK(Snapshot.Blocks.Num() == ExpectedSnapshot.Blocks.Num()); for (int32 Idx = 0; Idx < Snapshot.Blocks.Num(); ++Idx) { CHECK(Snapshot.Blocks[Idx].BlockId == ExpectedSnapshot.Blocks[Idx].BlockId); CHECK(Snapshot.Blocks[Idx].LastAccess == ExpectedSnapshot.Blocks[Idx].LastAccess); } CHECK(Snapshot.ChunkLocations.Num() == ExpectedSnapshot.ChunkLocations.Num()); for (int32 Idx = 0; Idx < Snapshot.ChunkLocations.Num(); ++Idx) { CHECK(Snapshot.ChunkLocations[Idx].Get<0>() == ExpectedSnapshot.ChunkLocations[Idx].Get<0>()); CHECK(Snapshot.ChunkLocations[Idx].Get<1>() == ExpectedSnapshot.ChunkLocations[Idx].Get<1>()); } CHECK(Snapshot.CurrentBlockId == ExpectedSnapshot.CurrentBlockId); } SECTION("CreateFromJournal") { // Arrange FTmpDirectoryScope _(TestBaseDir); const FString JournalFile = TestBaseDir / TEXT("test.jrn"); const FCasBlockId ExpectedCurrentBlockId(2); FIoStatus Status = FCasJournal::Create(JournalFile); CHECK(Status.IsOk()); FCasJournal::FTransaction Tx = FCasJournal::Begin(JournalFile); // Add a block and some chunk locations Tx.BlockCreated(FCasBlockId(1)); for (int32 Idx = 1; Idx <= 10; ++Idx) { Tx.ChunkLocation(FCasLocation { .BlockId = FCasBlockId(1), .BlockOffset = 256 }, CreateCasTestAddr(uint64(Idx) << 32 | 1ull)); } // Remove the block and the corresponding chunk locations for (int32 Idx = 1; Idx <= 10; ++Idx) { Tx.ChunkLocation(FCasLocation::Invalid, CreateCasTestAddr(uint64(Idx) << 32 | 1ull)); } Tx.BlockDeleted(FCasBlockId(1)); // Add a second block and some chunk locations Tx.BlockCreated(ExpectedCurrentBlockId); for (int32 Idx = 1; Idx <= 10; ++Idx) { Tx.ChunkLocation(FCasLocation { .BlockId = ExpectedCurrentBlockId, .BlockOffset = uint32(Idx) * 256 }, CreateCasTestAddr(Idx)); } Status = FCasJournal::Commit(MoveTemp(Tx)); CHECK(Status.IsOk()); // Act const FCasSnapshot Snapshot = FCasSnapshot::FromJournal(JournalFile).ConsumeValueOrDie(); // Assert CHECK(Snapshot.CurrentBlockId == ExpectedCurrentBlockId); CHECK(Snapshot.Blocks.Num() == 1); CHECK(Snapshot.ChunkLocations.Num() == 10); for (int32 Idx = 1; Idx < Snapshot.ChunkLocations.Num(); ++Idx) { const FCasAddr Addr = CreateCasTestAddr(Idx); const FCasSnapshot::FChunkLocation* Loc = Algo::FindByPredicate( Snapshot.ChunkLocations, [&Addr](const FCasSnapshot::FChunkLocation& L) { return L.Get<0>() == Addr; }); CHECK(Loc != nullptr); if (Loc != nullptr) { CHECK(Loc->Get<1>().BlockId == ExpectedCurrentBlockId); CHECK(Loc->Get<1>().BlockOffset == uint32(Idx) * 256); } } } } #endif // WITH_IOSTORE_ONDEMAND_TESTS } // namespace UE::IoStore