// Copyright Epic Games, Inc. All Rights Reserved. #include "NDisplayLiveLinkSubjectReplicator.h" #include "Cluster/IDisplayClusterClusterManager.h" #include "DisplayClusterEnums.h" #include "Features/IModularFeatures.h" #include "IDisplayCluster.h" #include "ILiveLinkClient.h" #include "LiveLinkRole.h" #include "LiveLinkSubjectSettings.h" #include "Misc/CoreDelegates.h" #include "NDisplayAgentVirtualSubject.h" #include "Serialization/MemoryReader.h" #include "Serialization/MemoryWriter.h" #include "UObject/Package.h" DEFINE_LOG_CATEGORY_STATIC(LogNDisplayLiveLinkSubjectReplicator, Log, All); FNDisplayLiveLinkSubjectReplicator::~FNDisplayLiveLinkSubjectReplicator() { Deactivate(); if (LiveLinkClient) { LiveLinkClient->OnLiveLinkTicked().RemoveAll(this); LiveLinkClient->OnLiveLinkSourceRemoved().RemoveAll(this); LiveLinkClient->RemoveSource(LiveLinkSourceGuid); } //Cleanup our virtual source from LiveLink TrackedSubjects.Empty(); IModularFeatures::Get().OnModularFeatureUnregistered().RemoveAll(this); FCoreDelegates::OnFEngineLoopInitComplete.RemoveAll(this); FCoreDelegates::OnBeginFrame.RemoveAll(this); } bool FNDisplayLiveLinkSubjectReplicator::IsActive() const { return LiveLinkClient != nullptr; } FString FNDisplayLiveLinkSubjectReplicator::GetSyncId() const { static const FString SyncId = TEXT("NDisplayLiveLinkSyncObject"); return SyncId; } FString FNDisplayLiveLinkSubjectReplicator::SerializeToString() const { //Serialization is done on the controller. Payload was created on LiveLinkTicked callback FScopeLock Lock(&PayloadCriticalSection); const FString ThisFramePayload = LiveLinkPayload; return ThisFramePayload; } bool FNDisplayLiveLinkSubjectReplicator::DeserializeFromString(const FString& Str) { // New nDisplay Failover subsystem calls DeserializeFromString on every node, // including the primary one. This breaks the original logic of this replicator. // We expect this method to be called on non-primary nodes only. // This is a temporary. A failover-friendly fix will arrive later. if (IDisplayCluster::Get().GetClusterMgr()->IsPrimary()) { return true; } //Deserialization is done on each agent TArray TempBytes; TempBytes.AddUninitialized(Str.Len()); StringToBytes(Str, TempBytes.GetData(), Str.Len()); FMemoryReader Ar(TempBytes); OnDataSynchronization(Ar); return true; } void FNDisplayLiveLinkSubjectReplicator::AddReferencedObjects(FReferenceCollector& Collector) { for (auto& Subject : TrackedSubjects) { Collector.AddReferencedObject(Subject); } } void FNDisplayLiveLinkSubjectReplicator::Initialize() { if (IModularFeatures::Get().IsModularFeatureAvailable(ILiveLinkClient::ModularFeatureName)) { //Cache LiveLink client if it was available and listen for it being unregistered. LiveLinkClient = &IModularFeatures::Get().GetModularFeature(ILiveLinkClient::ModularFeatureName); IModularFeatures::Get().OnModularFeatureUnregistered().AddRaw(this, &FNDisplayLiveLinkSubjectReplicator::OnModularFeatureRemoved); if (IDisplayCluster::IsAvailable()) { if (IDisplayCluster::Get().GetClusterMgr()->IsPrimary()) { //If we're a controller, listen to LiveLinkTicked callback to bundle up the SyncObject. Subjects frames will have been processed at that point. LiveLinkClient->OnLiveLinkTicked().AddRaw(this, &FNDisplayLiveLinkSubjectReplicator::OnLiveLinkTicked); } else if (IDisplayCluster::Get().GetClusterMgr()->IsSecondary()) { //If we're a agent, listen for new subject and disables them if we're tracking that subject FCoreDelegates::OnBeginFrame.AddRaw(this, &FNDisplayLiveLinkSubjectReplicator::OnEngineBeginFrame); //Used to reinitialize ourself if we are removed from livelink LiveLinkClient->OnLiveLinkSourceRemoved().AddRaw(this, &FNDisplayLiveLinkSubjectReplicator::OnLiveLinkSourceRemoved); ReInitializeVirtualSource(); } } else { UE_LOG(LogNDisplayLiveLinkSubjectReplicator, Error, TEXT("Can't initialize LiveLink Subject Replicator for nDisplay because nDisplay is not available.")); } } else { UE_LOG(LogNDisplayLiveLinkSubjectReplicator, Error, TEXT("Can't initialize LiveLink Subject Replicator for nDisplay because LiveLink is not available.")); } } void FNDisplayLiveLinkSubjectReplicator::Activate() { UE_LOG(LogNDisplayLiveLinkSubjectReplicator, Log, TEXT("Registering sync object.")); //Register SyncObject on PreTick to have this behavior //Order of operation will be //1. When LiveLink creates snapshots for the frame, Controller will create the sync object payload //2. On PreTick, the SyncObject will be synchronized between cluster machines //3. On object processing, Agents will push received data for each subject to have the same data then Controller if (IDisplayCluster::IsAvailable()) { IDisplayClusterClusterManager* ClusterManager = IDisplayCluster::Get().GetClusterMgr(); ClusterManager->RegisterSyncObject(this, EDisplayClusterSyncGroup::PreTick); } else { UE_LOG(LogNDisplayLiveLinkSubjectReplicator, Warning, TEXT("Can't activate LiveLink Subject Replicator for nDisplay because nDisplay is not available.")); } } void FNDisplayLiveLinkSubjectReplicator::Deactivate() { UE_LOG(LogNDisplayLiveLinkSubjectReplicator, Log, TEXT("Unregistering sync object.")); if (IDisplayCluster::IsAvailable()) { if (IDisplayClusterClusterManager* ClusterManager = IDisplayCluster::Get().GetClusterMgr()) { ClusterManager->UnregisterSyncObject(this); } } } void FNDisplayLiveLinkSubjectReplicator::OnLiveLinkTicked() { TArray TempBytes; FMemoryWriter Ar(TempBytes, /*bIsPersistent=*/ true); OnDataSynchronization(Ar); FScopeLock Lock(&PayloadCriticalSection); LiveLinkPayload = BytesToString(TempBytes.GetData(), TempBytes.Num()); } void FNDisplayLiveLinkSubjectReplicator::OnEngineBeginFrame() { //Each frame, disable all subjects. When we are enabled, we are replicating ALL subjects from the controller to each agent. //Only our replicated VirtualSubjects should be enabled if (LiveLinkClient) { check(IDisplayCluster::Get().GetClusterMgr()->IsSecondary()); const bool bIncludeDisabled = false; const bool bIncludeVirtual = true; const TArray Subjects = LiveLinkClient->GetSubjects(bIncludeDisabled, bIncludeVirtual); for (const FLiveLinkSubjectKey& Subject : Subjects) { if (Subject.Source != LiveLinkSourceGuid) { LiveLinkClient->SetSubjectEnabled(Subject, false); } } //Make sure all our replicated subjects are enabled for (const UNDisplayAgentVirtualSubject* OurSubject : TrackedSubjects) { LiveLinkClient->SetSubjectEnabled(OurSubject->GetSubjectKey(), true); } } } void FNDisplayLiveLinkSubjectReplicator::OnDataSynchronization(FArchive& Ar) { //Controller is saving, agents are loading const bool bIsSaving = Ar.IsSaving(); bool bContinue = bIsSaving; EFrameType FrameType = EFrameType::DataOnly; //Fill in current tracked subject list. If a subject is not processed this frame, we remove it TArray> LastFrameSubjectKeys; for (const UNDisplayAgentVirtualSubject* Subject : TrackedSubjects) { LastFrameSubjectKeys.Add(Subject->GetAssociatedSubjectKey()); } if (bIsSaving) { if (LiveLinkClient) { /** * We're the controller, and serializing data out to clients: * - Iterate over all of the snapshots. * - Determine the type of frame this is (new static data, new subject, data only). * - Serialize a True continuation byte. * - Serialize the frame. * - Once all snapshots have been added, serialize a False continuation byte. */ { //Get all subjects except the ones that are disabled //Since we are replicating using a VirtualSubject, agents will have to remove any VirtualSubject being replicated const bool bIncludeDisabled = false; const bool bIncludeVirtuals = true; TArray ThisFramesSubjects = LiveLinkClient->GetSubjects(bIncludeDisabled, bIncludeVirtuals); for (FLiveLinkSubjectKey& SubjectKey : ThisFramesSubjects) { const FName SubjectName = SubjectKey.SubjectName; TSubclassOf SubjectRole = LiveLinkClient->GetSubjectRole_AnyThread(SubjectKey); // Check if the subject's source is pending kill. If it is, the subject itself will soon be killed and we can skip evaluating it. TArray ValidSources = LiveLinkClient->GetSources(false); if (bIncludeVirtuals) { ValidSources += LiveLinkClient->GetVirtualSources(false); } const bool bIsSourceValid = ValidSources.ContainsByPredicate([SubjectKey](const FGuid& Other) { return Other == SubjectKey.Source; }); if (!bIsSourceValid) { UE_LOG(LogNDisplayLiveLinkSubjectReplicator, Verbose, TEXT("Controller could not evaluate Subject '%s'. Its source is marked as pending kill."), *SubjectName.ToString()); continue; } FLiveLinkSubjectFrameData SubjectFrameData; if (LiveLinkClient->EvaluateFrame_AnyThread(SubjectName, SubjectRole, SubjectFrameData)) { if (auto* FoundSujectDataPtr = TrackedSubjects.FindByPredicate( [SubjectKey](const UNDisplayAgentVirtualSubject* Other) { return Other->GetAssociatedSubjectKey().SubjectName == SubjectKey.SubjectName; })) { UNDisplayAgentVirtualSubject* FoundSubjectData = *FoundSujectDataPtr; if (SubjectFrameData.StaticData != FoundSubjectData->GetStaticData() || FoundSubjectData->GetAssociatedSubjectKey() != SubjectKey || FoundSubjectData->GetRole() != SubjectRole) { FrameType = EFrameType::UpdatedSubject; } else { FrameType = EFrameType::DataOnly; } } else { FrameType = EFrameType::NewSubject; } //This subject was processed LastFrameSubjectKeys.Remove(SubjectKey); Ar << bContinue; HandleFrame(Ar, FrameType, SubjectKey, SubjectRole, SubjectFrameData); } else { UE_LOG(LogNDisplayLiveLinkSubjectReplicator, Verbose, TEXT("Controller could not evaluate Subject '%s'. It could have starved."), *SubjectName.ToString()); } } } } //We're done processing subjects, mark the frame as done bContinue = false; Ar << bContinue; } else { /** * We're a agent, and serializing data from the controller: * - Serialize the continuation byte * - If False, we're done. * - If True: * - Serialize the frame. * - Based on frame type, do required process. * - Repeat. */ TSubclassOf SubjectRole; FLiveLinkSubjectKey SubjectKey; FLiveLinkSubjectFrameData SubjectFrameData; while (true) { Ar << bContinue; if (bContinue) { //Process frame data and fill this subject's data HandleFrame(Ar, FrameType, SubjectKey, SubjectRole, SubjectFrameData); if (!SubjectKey.SubjectName.IsNone()) { ProcessLiveLinkData_Agent(FrameType, SubjectKey, SubjectFrameData.FrameData); LastFrameSubjectKeys.RemoveSingleSwap(SubjectKey); } } else { break; } } } // Cleanup missing subjects. Agents will need to remove VirtualSubjects that were created if (LiveLinkClient) { for (const FLiveLinkSubjectKey& ItemSubjectKey : LastFrameSubjectKeys) { UE_LOG(LogNDisplayLiveLinkSubjectReplicator, Verbose, TEXT("Subject '%s' was not processed. Removing it from our list."), *ItemSubjectKey.SubjectName.ToString()); if (!bIsSaving) { const FLiveLinkSubjectKey ReplicatedSubjectKey(LiveLinkSourceGuid, ItemSubjectKey.SubjectName); LiveLinkClient->RemoveVirtualSubject(ReplicatedSubjectKey); } const int32 FoundIndex = TrackedSubjects.IndexOfByPredicate([ItemSubjectKey](const UNDisplayAgentVirtualSubject* Other) { return Other->GetAssociatedSubjectKey() == ItemSubjectKey; }); if (TrackedSubjects.IsValidIndex(FoundIndex)) { TrackedSubjects.RemoveAtSwap(FoundIndex); } } } else { TrackedSubjects.Reset(); } } void FNDisplayLiveLinkSubjectReplicator::HandleFrame(FArchive& Ar, EFrameType& FrameType, FLiveLinkSubjectKey& SubjectKey, TSubclassOf& SubjectRole, FLiveLinkSubjectFrameData& SubjectFrame) { //Archive always start with the type and subject identifier Ar << FrameType; Ar << SubjectKey; //If it's a new subject / updated one, process static data and role if (FrameType == EFrameType::NewSubject || FrameType == EFrameType::UpdatedSubject) { HandleNewSubject(Ar, SubjectKey, SubjectRole, SubjectFrame); } //End archive with Frame data Ar << SubjectFrame.FrameData; UE_LOG(LogNDisplayLiveLinkSubjectReplicator, VeryVerbose, TEXT("HandleFrame SubjectName=%s FrameType=%d"), *SubjectKey.SubjectName.ToString(), static_cast(FrameType)); } void FNDisplayLiveLinkSubjectReplicator::HandleNewSubject(FArchive& Ar, FLiveLinkSubjectKey& SubjectKey, TSubclassOf& SubjectRole, FLiveLinkSubjectFrameData& SubjectFrame) { //New or updated subjects will have static data Ar << SubjectFrame.StaticData; UE_LOG(LogNDisplayLiveLinkSubjectReplicator, Verbose, TEXT("HandleNewSubject SubjectName=%s"), *SubjectKey.SubjectName.ToString()); //Verify if we're already tracking that subject UNDisplayAgentVirtualSubject* TrackedSubject = nullptr; if (auto* FoundSujectDataPtr = TrackedSubjects.FindByPredicate( [SubjectKey](const UNDisplayAgentVirtualSubject* Other) { return Other->GetAssociatedSubjectKey().SubjectName == SubjectKey.SubjectName; })) { TrackedSubject = *FoundSujectDataPtr; UE_LOG(LogNDisplayLiveLinkSubjectReplicator, Verbose, TEXT("HandleNewSubject SubjectName=%s but was already tracked. Updating its info."), *SubjectKey.SubjectName.ToString()); } if (Ar.IsSaving()) { FString RoleClassName = SubjectRole->GetPathName(); Ar << RoleClassName; //If we're not tracking that subject yet, create it if (TrackedSubject == nullptr) { TrackedSubject = NewObject(GetTransientPackage()); TrackedSubjects.Add(TrackedSubject); } //When handling new subject, always setup role and static data TrackedSubject->SetTrackedSubjectInfo(SubjectKey, SubjectRole); TrackedSubject->GetStaticData().InitializeWith(SubjectFrame.StaticData); } else { FString RoleClassName; Ar << RoleClassName; SubjectRole = StaticLoadClass(UObject::StaticClass(), NULL, *RoleClassName, NULL, LOAD_None, NULL); check(SubjectRole); //On Agents, create the virtual subject directly in LiveLink so it's part of the system ULiveLinkVirtualSubject* ReplicatedSubject = Cast(LiveLinkClient->GetSubjectSettings(SubjectKey)); //Always disable the subject we're replicating LiveLinkClient->SetSubjectEnabled(SubjectKey, false); //If we're not tracking that subject yet, create it if (TrackedSubject == nullptr) { const FLiveLinkSubjectKey ReplicatedSubjectKey(LiveLinkSourceGuid, SubjectKey.SubjectName); if (LiveLinkClient->AddVirtualSubject(ReplicatedSubjectKey, UNDisplayAgentVirtualSubject::StaticClass())) { //Retrieve the newly created subject to add it to our tracking array const bool bIncludeDisabled = false; const bool bIncludeVirtual = true; for (const FLiveLinkSubjectKey& FoundSubjectKey : LiveLinkClient->GetSubjects(bIncludeDisabled, bIncludeVirtual)) { if (FoundSubjectKey.SubjectName == SubjectKey.SubjectName) { //If this subject has the same subject name, verify if it's the VirtualOne. Settings object will return the VirtualSubject directly if (UNDisplayAgentVirtualSubject* NewlyCreatedSubject = Cast(LiveLinkClient->GetSubjectSettings(FoundSubjectKey))) { TrackedSubject = NewlyCreatedSubject; TrackedSubjects.Add(NewlyCreatedSubject); } } } checkf(TrackedSubject, TEXT("TrackedSubject '%s' was added but could not be found afterwards."), *SubjectKey.SubjectName.ToString()); } else { UE_LOG(LogNDisplayLiveLinkSubjectReplicator, Warning, TEXT("Could not create new subject '%s'. It won't be tracked correctly on this node."), *SubjectKey.SubjectName.ToString()); } } if (TrackedSubject) { //VirtualSubjects need translators to be able to be evaluated in different output format. Special treatment required for virtual subjects since they won't/can't exist in the client if (ReplicatedSubject) { const TArray& Translators = ReplicatedSubject->GetTranslators(); TrackedSubject->UpdateTranslators(Translators); } else { if (ULiveLinkSubjectSettings* SubjectSetting = Cast(LiveLinkClient->GetSubjectSettings(SubjectKey))) { TrackedSubject->UpdateTranslators(SubjectSetting->Translators); } else { UE_LOG(LogNDisplayLiveLinkSubjectReplicator, Warning, TEXT("Replicating subject '%s' but could not find its settings. Translators won't work."), *SubjectKey.SubjectName.ToString()); } } //When handling new subject, always setup role and static data TrackedSubject->SetTrackedSubjectInfo(SubjectKey, SubjectRole); TrackedSubject->GetStaticData().InitializeWith(SubjectFrame.StaticData); } } } void FNDisplayLiveLinkSubjectReplicator::ProcessLiveLinkData_Agent(EFrameType FrameType, const FLiveLinkSubjectKey& SubjectKey, FLiveLinkFrameDataStruct& FrameData) { //Find associated tracked subject data to broadcast role and static data. Should always be found auto* FoundTrackedSubjectPtr = TrackedSubjects.FindByPredicate([SubjectKey](const UNDisplayAgentVirtualSubject* Other) { return Other->GetAssociatedSubjectKey() == SubjectKey; }); check(FoundTrackedSubjectPtr); UNDisplayAgentVirtualSubject* FoundTrackedSubject = *FoundTrackedSubjectPtr; FrameData.GetBaseData()->WorldTime = FPlatformTime::Seconds(); UE_LOG(LogNDisplayLiveLinkSubjectReplicator, VeryVerbose, TEXT("ProcessLiveLinkData SubjectUpdated=%s UpdateType=%d Timecode=%s") , *(SubjectKey.SubjectName.ToString()), static_cast(FrameType) , *(FTimecode::FromFrameNumber(FrameData.GetBaseData()->MetaData.SceneTime.Time.FrameNumber, FrameData.GetBaseData()->MetaData.SceneTime.Rate, false).ToString())); FoundTrackedSubject->UpdateFrameData(MoveTemp(FrameData)); } void FNDisplayLiveLinkSubjectReplicator::ReInitializeVirtualSource() { //if we are rebuilding our source, let go of any subject we are currently tracking for (const UNDisplayAgentVirtualSubject* Subject : TrackedSubjects) { LiveLinkClient->RemoveVirtualSubject(Subject->GetSubjectKey()); } //Start source with a clean slate TrackedSubjects.Empty(); //Create our Source to hold all VirtualSubject we'll create LiveLinkSourceGuid = LiveLinkClient->AddVirtualSubjectSource(TEXT("nDisplaySubjectReplicator")); if (!LiveLinkSourceGuid.IsValid()) { UE_LOG(LogNDisplayLiveLinkSubjectReplicator, Error, TEXT("Could not add VirtualSubject Source for nDisplay replication.")); } } void FNDisplayLiveLinkSubjectReplicator::OnLiveLinkSourceRemoved(FGuid SourceGuid) { if (LiveLinkSourceGuid.IsValid() && LiveLinkSourceGuid == SourceGuid) { UE_LOG(LogNDisplayLiveLinkSubjectReplicator, Verbose, TEXT("nDisplay LiveLink Source was removed. Reinitializing ourself.")); ReInitializeVirtualSource(); } } void FNDisplayLiveLinkSubjectReplicator::OnModularFeatureRemoved(const FName& Type, IModularFeature* ModularFeature) { //If LiveLink gets unregistered, invalidate our cached client which make the replicator do nothing //To support hot reload, we would need to register for the feature to be loaded again if (Type == ILiveLinkClient::ModularFeatureName) { UE_LOG(LogNDisplayLiveLinkSubjectReplicator, Verbose, TEXT("LiveLink feature was removed.")); LiveLinkClient = nullptr; } }