// Copyright UnexGames 2025. All Rights Reserved. #include "ThreadLibrary.h" #include "Engine/EngineTypes.h" #include "Kismet/GameplayStatics.h" #include "Engine/World.h" #include "Engine/LatentActionManager.h" void UThreadLibrary::CreateAndExecuteTaskOnThread(UObject* WorldContextObject, FLatentActionInfo LatentInfo, UThreadBase*& Task, EtaskExecutionBranches& Out, ETaskExecutionType ExecutionType) { if (nullptr == WorldContextObject) { FFrame::KismetExecutionMessage(TEXT("Invalid WorldContextObject. Cannot execute."), ELogVerbosity::Error); return; } if (ExecutionType == ETaskExecutionType::ThreadPool) { FFrame::KismetExecutionMessage(TEXT(" Can't use Thread Pool there."), ELogVerbosity::Error); return; } if (UWorld* World = WorldContextObject->GetWorld()) { FLatentActionManager& LatentActionManager = World->GetLatentActionManager(); FThreadTaskWithBody* Action = LatentActionManager.FindExistingAction(LatentInfo.CallbackTarget, LatentInfo.UUID); if (Action && Action->IsRunning()) { FFrame::KismetExecutionMessage(TEXT(" This node is already running."), ELogVerbosity::Error); return; } else { Action = new FThreadTaskWithBody(WorldContextObject, Out, LatentInfo, UThreadTasks::StaticClass(), ExecutionType, Task); LatentActionManager.AddNewAction(LatentInfo.CallbackTarget, LatentInfo.UUID, Action); } } } void UThreadLibrary::ExecuteTaskOnThreadPool(UObject* WorldContextObject, FLatentActionInfo LatentInfo, UThreadBase*& Task, EtaskExecutionBranches& Out, UThreadPool* ThreadPool) { ETaskExecutionType ExecType = ETaskExecutionType::ThreadPool; if (nullptr == WorldContextObject) { FFrame::KismetExecutionMessage(TEXT(" Invalid WorldContextObject. Cannot execute."), ELogVerbosity::Error); return; } if (ThreadPool && ThreadPool->GetThreadsNum() <= 0) { FFrame::KismetExecutionMessage(TEXT(" Invalid Thread Pool"), ELogVerbosity::Error); return; } if (UWorld* World = WorldContextObject->GetWorld()) { FLatentActionManager& LatentActionManager = World->GetLatentActionManager(); FThreadTaskWithBody* Action = LatentActionManager.FindExistingAction(LatentInfo.CallbackTarget, LatentInfo.UUID); if (Action && Action->IsRunning()) { FFrame::KismetExecutionMessage(TEXT(" This node is already running."), ELogVerbosity::Error); return; } else { Action = new FThreadTaskWithBody(WorldContextObject, Out, LatentInfo, UThreadTasks::StaticClass(), ExecType, Task); LatentActionManager.AddNewAction(LatentInfo.CallbackTarget, LatentInfo.UUID, Action); } } } void UThreadLibrary::CreateAndExecuteTaskLoopNrOnThread(UObject* WorldContextObject, FLatentActionInfo LatentInfo, int LoopNumber, float LoopInterval, EtaskExecutionBranches& Out) { if (nullptr == WorldContextObject) { FFrame::KismetExecutionMessage(TEXT("Invalid WorldContextObject. Cannot execute."), ELogVerbosity::Error); return; } if (UWorld* World = WorldContextObject->GetWorld()) { FLatentActionManager& LatentActionManager = World->GetLatentActionManager(); FThreadTasksLoopAction* Action = LatentActionManager.FindExistingAction(LatentInfo.CallbackTarget, LatentInfo.UUID); if (Action && Action->IsRunning()) { FFrame::KismetExecutionMessage(TEXT(" This node is already running."), ELogVerbosity::Error); return; } else { Action = new FThreadTasksLoopAction(WorldContextObject, Out, LatentInfo, UThreadTasksLoop::StaticClass(), LoopNumber, LoopInterval); LatentActionManager.AddNewAction(LatentInfo.CallbackTarget, LatentInfo.UUID, Action); } } } FString UThreadLibrary::GetCurrentThreadName(int32 threadID) { FString ThreadName = FThreadManager::Get().GetThreadName((uint32)threadID); return(ThreadName); } int32 UThreadLibrary::GetCurrentThreadID() { uint32 ThreadID = FPlatformTLS::GetCurrentThreadId(); return(ThreadID); } void UThreadLibrary::Sleep(float Seconds) { if (!IsInGameThread()) { FPlatformProcess::Sleep(Seconds); } } int32 UThreadLibrary::GetThreadStackSize() { return((int32)FPlatformProcess::GetStackSize()); } int32 UThreadLibrary::GetNumberOfCores() { return FPlatformMisc::NumberOfCores(); } int32 UThreadLibrary::GetNumberOfCoresIncludingHyperthreads() { return FPlatformMisc::NumberOfCoresIncludingHyperthreads(); } bool UThreadLibrary::IsInGameThread() { if (GIsGameThreadIdInitialized) { const uint32 CurrentThreadId = FPlatformTLS::GetCurrentThreadId(); return CurrentThreadId == GGameThreadId; } return true; } UThreadMutex* UThreadLibrary::CreateMutex(UObject* WorldContextObject) { UThreadUtility::MutexIndex++; const FString Name = "ThreadMutex" + FString::FromInt(UThreadUtility::MutexIndex); return NewObject(WorldContextObject, FName(*Name), RF_Transient); } UThreadPool* UThreadLibrary::CreateThreadPool(UObject* WorldContextObject, int32 NumQueuedThreads, int32 StackSize, EThreadPoolPriority ThreadPriority, FString Name) { if (nullptr == WorldContextObject) { FFrame::KismetExecutionMessage(TEXT("Invalid WorldContextObject."), ELogVerbosity::Error); return nullptr; } if (NumQueuedThreads <= 0) { FFrame::KismetExecutionMessage(TEXT("NumQueuedThreads must be >= 1."), ELogVerbosity::Error); return nullptr; } if (NumQueuedThreads <= 0) { FFrame::KismetExecutionMessage(TEXT("StackSize must be >= 1."), ELogVerbosity::Error); return nullptr; } UThreadPool* ThreadPool; UThreadUtility::ThreadPoolIndex++; const FString PoolName = "PoolThread" + FString::FromInt(UThreadUtility::ThreadPoolIndex); ThreadPool = NewObject(WorldContextObject, FName(*PoolName), RF_Transient); if (ThreadPool) { EThreadPriority LocalThreadPriority = EThreadPriority::TPri_Normal; switch (ThreadPriority) { case EThreadPoolPriority::AboveNormal: LocalThreadPriority = EThreadPriority::TPri_AboveNormal; break; case EThreadPoolPriority::BelowNormal: LocalThreadPriority = EThreadPriority::TPri_BelowNormal; break; case EThreadPoolPriority::Highest: LocalThreadPriority = EThreadPriority::TPri_Highest; break; case EThreadPoolPriority::Lowest: LocalThreadPriority = EThreadPriority::TPri_Lowest; break; case EThreadPoolPriority::Normal: LocalThreadPriority = EThreadPriority::TPri_Normal; break; case EThreadPoolPriority::SlightlyBelowNormal: LocalThreadPriority = EThreadPriority::TPri_SlightlyBelowNormal; break; case EThreadPoolPriority::TimeCritical: LocalThreadPriority = EThreadPriority::TPri_TimeCritical; break; default: LocalThreadPriority = EThreadPriority::TPri_Normal; } const bool bResult = ThreadPool->Create((uint32)NumQueuedThreads, Name, LocalThreadPriority, (uint32)StackSize); if (bResult) { return ThreadPool; } else { FFrame::KismetExecutionMessage(TEXT("Thread Pool could not be created."), ELogVerbosity::Error); } } if (ThreadPool) { ThreadPool->ConditionalBeginDestroy(); } return nullptr; } void UThreadLibrary::DestroyThreadPoolImmediately(UThreadPool* ThreadPool) { UThreadUtility::RemoveFromRoot(ThreadPool); if (ThreadPool->Obj.IsValid()) { ThreadPool->Obj.Reset(); } }