// Copyright Epic Games, Inc. All Rights Reserved. #include "DeviceNativeTask.h" #include "Device.h" #include "Transform/BlobTransform.h" #include "Job/TempHashService.h" #include std::atomic DeviceNativeTask::GTaskId; ////////////////////////////////////////////////////////////////////////// int64_t DeviceNativeTask::GetNewTaskId() { return GTaskId++; } DeviceNativeTask::DeviceNativeTask(int32 TaskPriority, FString TaskName) : TaskId(GetNewTaskId()) , Priority(TaskPriority) , Name(TaskName) { Future = Promise.get_future(); } DeviceNativeTask::DeviceNativeTask(DeviceNativeTaskPtrVec& PrevTasks, int32 TaskPriority, FString TaskName) : DeviceNativeTask(TaskPriority, TaskName) { Prev = PrevTasks; } DeviceNativeTask::~DeviceNativeTask() { } cti::continuable DeviceNativeTask::GenericExecAsync(ENamedThreads::Type CallingThread, ENamedThreads::Type ReturnThread) { return cti::make_continuable([this, CallingThread, ReturnThread](auto&& ThisPromise) { AsyncTask(CallingThread, [this, CallingThread, ReturnThread, ThisPromise = std::forward(ThisPromise)]() mutable { std::exception_ptr Ex = nullptr; int32 RetVal = -1; try { RetVal = Exec(); } catch (const std::exception& exc) { ThisPromise.set_exception(std::make_exception_ptr(std::exception(exc))); } if (CallingThread == ReturnThread || (ReturnThread & ENamedThreads::AnyThread)) { if (!Ex) ThisPromise.set_value(RetVal); else ThisPromise.set_exception(Ex); } /// Otherwise, get on the thread /// TODO: There's a better way of doing this in the continuable library. Do that later on AsyncTask(ReturnThread, [this, ReturnThread, RetVal, Ex, ThisPromise = std::forward(ThisPromise)]() mutable { if (!Ex) ThisPromise.set_value(RetVal); else ThisPromise.set_exception(Ex); }); }); }); } cti::continuable DeviceNativeTask::PreExecAsync(ENamedThreads::Type CallingThread, ENamedThreads::Type ReturnThread) { check(IsInGameThread()); if (CallingThread == ENamedThreads::GameThread) { PreExec(); return cti::make_ready_continuable(0); } return GenericExecAsync(CallingThread, ReturnThread); } cti::continuable DeviceNativeTask::ExecAsync(ENamedThreads::Type CallingThread, ENamedThreads::Type ReturnThread) { ThreadId = std::hash{}(std::this_thread::get_id()); return GenericExecAsync(CallingThread, ReturnThread); } ENamedThreads::Type DeviceNativeTask::GetExecutionThread() const { return ENamedThreads::AnyThread; } FString DeviceNativeTask::GetDebugName() const { return FString::Printf(TEXT("%s [%" INT64_FMT ", %d]"), *GetName(), GetTaskId(), GetPriority()); } FString DeviceNativeTask::GetName() const { return Name; } void DeviceNativeTask::Terminate() { /// Recursively try to terminate the dependencies that we're waiting on for (DeviceNativeTaskPtr& PrevTasks : Prev) { if (PrevTasks->GetFuture().wait_for(std::chrono::seconds(0)) != std::future_status::ready) { PrevTasks->Terminate(); bTerminate = true; return; } } /// If we get to here, it means that this native task is the offending task. /// Must terminate self in this case! bTerminate = true; } bool DeviceNativeTask::IsHigherPriorityThan(const DeviceNativeTask& RHS) const { if (BatchId == RHS.BatchId) { if (Priority != RHS.Priority) return Priority > RHS.Priority; return TaskId < RHS.TaskId; } return BatchId < RHS.BatchId; } void DeviceNativeTask::SetPriority(DeviceNativeTask* Ref) { int32 existingPriority = Priority; Priority = Ref->Priority + 1; AdjustPrevPriority(Ref, this); } void DeviceNativeTask::SetPriority(int32 TaskPriority, bool AdjustPrev) { int32 existingPriority = Priority; Priority = TaskPriority; /// Adjust priorities of the PrevTasks dependencies if we've been asked to do so OR /// if the current TaskPriority is higher (LESS THAN) the TaskPriority that we had before /// meaning that the TaskPriority of this particular task has increased! if (AdjustPrev || existingPriority > Priority) AdjustPrevPriority(nullptr, this); } #if (DEVICE_NATIVE_TASKS_CHECK_CYCLES == 1) void DeviceNativeTask::CheckQueued(DeviceNativeTask* Ref) { if (Ref && Ref != this) { check(this->IsHigherPriorityThan(*Ref)); } for (DeviceNativeTaskPtr PrevTasks : Prev) { PrevTasks->CheckQueued(Ref); check(_batchId == PrevTasks->_batchId && PrevTasks->IsHigherPriorityThan(*this)); if (Ref) { check(PrevTasks->_batchId == Ref->_batchId && PrevTasks->IsHigherPriorityThan(*Ref)); } check(!PrevTasks->IsCulled()); } } void DeviceNativeTask::DebugVerifyDepsQueued(const std::vector& Queue, const DeviceNativeTaskPtrVec& AllItems) { for (auto PrevTasks : Prev) { if (!PrevTasks->IsDone()) { auto Iter = std::find(Queue.begin(), Queue.end(), PrevTasks); if (Iter == Queue.end()) { auto Iter2 = std::find(AllItems.begin(), AllItems.end(), PrevTasks); check(Iter2 != AllItems.end()); size_t Index = std::distance(AllItems.begin(), Iter2); check(false); } PrevTasks->DebugVerifyDepsQueued(Queue, AllItems); } } } #endif void DeviceNativeTask::Reset() { // Reset the promise and future Promise = std::promise(); Future = Promise.get_future(); bIsDone = false; } void DeviceNativeTask::WaitSelf() { Future.wait(); } void DeviceNativeTask::Wait() { #if (DEVICE_NATIVE_TASKS_CHECK_CYCLES == 1) CheckQueued(this); #endif while (!WaitFor(1)) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); check(!bTerminate); } } bool DeviceNativeTask::WaitSelfFor(int32 Seconds) { std::future_status Status = Future.wait_for(std::chrono::seconds(Seconds)); if (Status == std::future_status::timeout) return false; return true; } void DeviceNativeTask::CheckCyclicalDependency(std::unordered_set& TransferChain) { for (DeviceNativeTaskPtr PrevTasks : Prev) { DeviceNativeTask* PrevTask = PrevTasks.get(); check(TransferChain.find(PrevTask) == TransferChain.end()); for (auto Iter = TransferChain.begin(); Iter != TransferChain.end(); Iter++) { DeviceNativeTask* pp = *Iter; check(IsHigherPriorityThan(*pp)); } auto myChain = TransferChain; myChain.insert(PrevTask); PrevTask->CheckCyclicalDependency(TransferChain); } } FString DeviceNativeTask::DebugWaitChain() { FString debugWaitChain = GetDebugName(); for (DeviceNativeTaskPtr& PrevTasks : Prev) { std::future_status Status = PrevTasks->GetFuture().wait_for(std::chrono::seconds(0)); if (Status == std::future_status::timeout) { debugWaitChain += FString::Printf(TEXT(" -> %s"), *PrevTasks->DebugWaitChain()); } } return debugWaitChain; } bool DeviceNativeTask::WaitFor(int32 Seconds) { check(!IsInGameThread()); for (auto& PrevTask : Prev) { /// Culled jobs should never be added even though their promise has been resolved if (!PrevTask->IsCulled()) { check(!PrevTask->IsCulled()); /// Must be higher TaskPriority than this particular task check(PrevTask->IsHigherPriorityThan(*this)); UE_LOG(LogDevice, VeryVerbose, TEXT("[%u] Begin Wait::%s"), PrevTask->GetTaskId(), *PrevTask->GetName()); std::future_status Status = PrevTask->GetFuture().wait_for(std::chrono::seconds(Seconds)); if (Status == std::future_status::timeout) { FString WaitChain = DebugWaitChain(); UE_LOG(LogDevice, VeryVerbose, TEXT("[%u] Timed out wait for task::%s. Chain: %s"), PrevTask->GetTaskId(), *PrevTask->GetName(), *WaitChain); return false; } UE_LOG(LogDevice, VeryVerbose, TEXT("[%u] End Wait::%s"), PrevTask->GetTaskId(), *PrevTask->GetName()); } } return true; } bool DeviceNativeTask::DebugCompleteCheck() { std::future_status Status = Future.wait_for(std::chrono::seconds(0)); check(Status == std::future_status::ready); return true; } void DeviceNativeTask::FixPriorities() { AdjustPrevPriority(this, this); #if (DEVICE_NATIVE_TASKS_CHECK_CYCLES == 1) CheckQueued(this); #endif } void DeviceNativeTask::AdjustPrevPriority(DeviceNativeTask* Ref, DeviceNativeTask* Parent, DeviceNativeTask* PrevTasks) { if (Ref->BatchId == PrevTasks->BatchId && Ref->IsHigherPriorityThan(*PrevTasks)) PrevTasks->SetPriority(Ref); if (Parent && Parent != Ref && Parent->BatchId == PrevTasks->BatchId && Parent->IsHigherPriorityThan(*PrevTasks)) PrevTasks->SetPriority(Parent); for (auto p : Prev) { p->AdjustPrevPriority(Ref, this); } } void DeviceNativeTask::AdjustPrevPriority(DeviceNativeTask* Ref, DeviceNativeTask* Parent) { if (Ref == nullptr) Ref = this; //AdjustPrevPriority(Ref, this); if (BatchId == Ref->BatchId && Ref->IsHigherPriorityThan(*this)) this->SetPriority(Ref); if (Parent && Parent != Ref && BatchId == Parent->BatchId && Parent->IsHigherPriorityThan(*this)) this->SetPriority(Parent); for (auto PrevTasks : Prev) { PrevTasks->AdjustPrevPriority(Ref, this); } } void DeviceNativeTask::SetPrev(const DeviceNativeTaskPtrVec& PrevTasks) { check(Prev.empty()); Prev = PrevTasks; #if (DEVICE_NATIVE_TASKS_CHECK_CYCLES == 1) std::unordered_set TransferChain; CheckCyclicalDependency(TransferChain); #endif AdjustPrevPriority(this, this); #if (DEVICE_NATIVE_TASKS_CHECK_CYCLES == 1) CheckQueued(this); #endif } void DeviceNativeTask::AddPrev(DeviceNativeTaskPtr PrevTasks) { if (PrevTasks->IsCulled()) return; PrevTasks->AdjustPrevPriority(this, this); Prev.push_back(PrevTasks); #if (DEVICE_NATIVE_TASKS_CHECK_CYCLES == 1) CheckQueued(this); #endif PrevTasks->AdjustPrevPriority(this, this); #if (DEVICE_NATIVE_TASKS_CHECK_CYCLES == 1) CheckQueued(this); #endif #if (DEVICE_NATIVE_TASKS_CHECK_CYCLES == 1) std::unordered_set TransferChain; CheckCyclicalDependency(TransferChain); #endif } void DeviceNativeTask::AddPrev(const DeviceNativeTaskPtrVec& Tasks) { if (Tasks.empty()) return; DeviceNativeTaskPtrVec nonCulledTasks; nonCulledTasks.reserve(Tasks.size()); for (auto t : Tasks) { if (!t->IsDone()) { t->AdjustPrevPriority(this, this); nonCulledTasks.push_back(t); } } if (!nonCulledTasks.empty()) Prev.insert(Prev.end(), nonCulledTasks.begin(), nonCulledTasks.end()); #if (DEVICE_NATIVE_TASKS_CHECK_CYCLES == 1) CheckQueued(this); std::unordered_set TransferChain; CheckCyclicalDependency(TransferChain); #endif } void DeviceNativeTask::PreExec() { /// Nothing to be done if already in game thread. The Device has already waited for our /// stuff to be ready before calling PreExec here if (IsInGameThread()) return; /// No dependency ... if (Prev.empty()) return; Device* thisDevice = GetTargetDevice(); ENamedThreads::Type thisThreadId = GetExecutionThread(); bool wait = false; /// or dependency on the same device, then we're good for (size_t i = 0; i < Prev.size() && !wait; i++) { if (Prev[i]) { if (Prev[i]->GetTargetDevice() != thisDevice) wait = true; else { /// If we have a PrevTasks task or if its from a different device then we need to wait for it /// to finish, but first we check if we're running on the same thread because that'll /// block this ENamedThreads::Type prevThreadId = Prev[i]->GetExecutionThread(); if (prevThreadId == thisThreadId && !(prevThreadId & ENamedThreads::AnyThread) && !(thisThreadId & ENamedThreads::AnyThread)) { /// Invalid condition FString errorStr = FString::Printf(TEXT("Error where previous device task and this device task are on different devices and yet have the same thread id. This will cause them to block. ThreadId: %d"), thisThreadId); UE_LOG(LogDevice, Error, TEXT("%s"), *errorStr); throw std::runtime_error(TCHAR_TO_UTF8(*errorStr)); } else wait = true; } } } if (wait) Wait(); } void DeviceNativeTask::SetPromise(int32 Value) { ErrorCode = Value; bIsDone = true; Promise.set_value(Value); } void DeviceNativeTask::PostExec() { SetPromise(ErrorCode); } cti::continuable DeviceNativeTask::WaitAsync(ENamedThreads::Type ReturnThread /* = ENamedThreads::UnusedAnchor */) { #if (DEVICE_NATIVE_TASKS_CHECK_CYCLES == 1) CheckQueued(this); #endif return cti::make_continuable([this, ReturnThread](auto&& ThisPromise) mutable { Util::OnBackgroundThread([this, ReturnThread, FWD_PROMISE(ThisPromise)]() mutable { Wait(); /// If no thread is specified, then just return whatever if (ReturnThread == ENamedThreads::UnusedAnchor) { ThisPromise.set_value(0); return; } /// Otherwise, we resolve the promise from the correct thread Util::OnThread(ReturnThread, [=, FWD_PROMISE(ThisPromise)]() mutable { ThisPromise.set_value(0); }); }); }); } ////////////////////////////////////////////////////////////////////////// DeviceNativeTask_Generic::DeviceNativeTask_Generic(Device* TargetDevice, FString TaskName, ENamedThreads::Type TaskThreadId /* = ENamedThreads::AnyThread */, bool IsTaskAsync /* = false */, int32 TaskPriority /* = JobPriority::kNormal */) : DeviceNativeTask(TaskPriority, TaskName) , Dev(TargetDevice) , ThreadId(TaskThreadId) { DeviceNativeTask::bIsAsync = IsTaskAsync; } DeviceNativeTask_Generic::DeviceNativeTask_Generic(DeviceNativeTaskPtrVec& PrevTasks, Device* TargetDevice, FString TaskName, ENamedThreads::Type TaskThreadId /* = ENamedThreads::AnyThread */, bool IsTaskAsync /* = false */, int32 TaskPriority /* = JobPriority::kNormal */) : DeviceNativeTask(PrevTasks, TaskPriority, TaskName) , Dev(TargetDevice) , ThreadId(TaskThreadId) { DeviceNativeTask::bIsAsync = IsTaskAsync; } ////////////////////////////////////////////////////////////////////////// DeviceNativeTask_Lambda::DeviceNativeTask_Lambda(DeviceNativeTask_Lambda_Func LambdaCallback, Device* Dev, FString TaskName, ENamedThreads::Type TaskThreadId /* = ENamedThreads::AnyThread */, bool IsTaskAsync /* = false */, int32 TaskPriority /* = JobPriority::kNormal */) : DeviceNativeTask_Generic(Dev, TaskName, TaskThreadId, IsTaskAsync, TaskPriority) , Callback(LambdaCallback) { DeviceNativeTask::bExecOnly = true; } int32 DeviceNativeTask_Lambda::Exec() { check(Callback); ErrorCode = Callback(); SetPromise(ErrorCode); return ErrorCode; } std::shared_ptr DeviceNativeTask_Lambda::Create(Device* device, int32 TaskPriority, FString TaskName, DeviceNativeTask_Lambda_Func callback) { return Create(device, ENamedThreads::AnyThread, false, TaskPriority, TaskName, callback); } std::shared_ptr DeviceNativeTask_Lambda::Create(Device* device, ENamedThreads::Type threadId, bool isAsync, int32 TaskPriority, FString TaskName, DeviceNativeTask_Lambda_Func callback) { auto task = std::make_shared(callback, device, TaskName, threadId, isAsync, TaskPriority); device->AddNativeTask(task); return std::static_pointer_cast(task); } //cti::continuable DeviceNativeTask_Lambda::Create_Promise(Device* device, int32 TaskPriority, DeviceNativeTask_Lambda_Func callback) //{ // return Create_Promise(device, ENamedThreads::AnyThread, false, TaskPriority, callback); //} // //cti::continuable DeviceNativeTask_Lambda::Create_Promise(Device* device, ENamedThreads::Type threadId, // bool isAsync, int32 TaskPriority, DeviceNativeTask_Lambda_Func callback) //{ // auto task = std::make_shared(callback, device, threadId, isAsync, TaskPriority); // device->AddNativeTask(task); // // return cti::make_continuable([task](auto&& promise) // { // Util::OnBackgroundThread([task, FWD_PROMISE(promise)]() mutable // { // /// The task must not be running on the same thread as the one that the promise is waiting on // /// otherwise, it'll lead to an application freeze! // int32 retVal = task->Future().get(); // promise.set_value(retVal); // }); // }); //} //////////////////////////////////////////////////////////////////////////