October3d55/M/MultiThread/Source/MultiThreadLibrary/Private/ThreadLibrary.cpp

283 lines
7.1 KiB
C++

// Copyright UnexGames 2025. All Rights Reserved.
#include "ThreadLibrary.h"
#include "Engine/EngineTypes.h"
#include "Kismet/GameplayStatics.h"
#include "Engine/World.h"
#include "Engine/LatentActionManager.h"
void UThreadLibrary::CreateAndExecuteTaskOnThread(UObject* WorldContextObject, FLatentActionInfo LatentInfo, UThreadBase*& Task, EtaskExecutionBranches& Out, ETaskExecutionType ExecutionType)
{
if (nullptr == WorldContextObject)
{
FFrame::KismetExecutionMessage(TEXT("Invalid WorldContextObject. Cannot execute."), ELogVerbosity::Error);
return;
}
if (ExecutionType == ETaskExecutionType::ThreadPool)
{
FFrame::KismetExecutionMessage(TEXT(" Can't use Thread Pool there."), ELogVerbosity::Error);
return;
}
if (UWorld* World = WorldContextObject->GetWorld())
{
FLatentActionManager& LatentActionManager = World->GetLatentActionManager();
FThreadTaskWithBody* Action = LatentActionManager.FindExistingAction<FThreadTaskWithBody>(LatentInfo.CallbackTarget, LatentInfo.UUID);
if (Action && Action->IsRunning())
{
FFrame::KismetExecutionMessage(TEXT(" This node is already running."), ELogVerbosity::Error);
return;
}
else {
Action = new FThreadTaskWithBody(WorldContextObject, Out, LatentInfo, UThreadTasks::StaticClass(), ExecutionType, Task);
LatentActionManager.AddNewAction(LatentInfo.CallbackTarget, LatentInfo.UUID, Action);
}
}
}
void UThreadLibrary::ExecuteTaskOnThreadPool(UObject* WorldContextObject, FLatentActionInfo LatentInfo, UThreadBase*& Task, EtaskExecutionBranches& Out, UThreadPool* ThreadPool)
{
ETaskExecutionType ExecType = ETaskExecutionType::ThreadPool;
if (nullptr == WorldContextObject)
{
FFrame::KismetExecutionMessage(TEXT(" Invalid WorldContextObject. Cannot execute."), ELogVerbosity::Error);
return;
}
if (ThreadPool && ThreadPool->GetThreadsNum() <= 0)
{
FFrame::KismetExecutionMessage(TEXT(" Invalid Thread Pool"), ELogVerbosity::Error);
return;
}
if (UWorld* World = WorldContextObject->GetWorld())
{
FLatentActionManager& LatentActionManager = World->GetLatentActionManager();
FThreadTaskWithBody* Action = LatentActionManager.FindExistingAction<FThreadTaskWithBody>(LatentInfo.CallbackTarget, LatentInfo.UUID);
if (Action && Action->IsRunning())
{
FFrame::KismetExecutionMessage(TEXT(" This node is already running."), ELogVerbosity::Error);
return;
}
else {
Action = new FThreadTaskWithBody(WorldContextObject, Out, LatentInfo, UThreadTasks::StaticClass(), ExecType, Task);
LatentActionManager.AddNewAction(LatentInfo.CallbackTarget, LatentInfo.UUID, Action);
}
}
}
void UThreadLibrary::CreateAndExecuteTaskLoopNrOnThread(UObject* WorldContextObject, FLatentActionInfo LatentInfo, int LoopNumber, float LoopInterval, EtaskExecutionBranches& Out)
{
if (nullptr == WorldContextObject)
{
FFrame::KismetExecutionMessage(TEXT("Invalid WorldContextObject. Cannot execute."), ELogVerbosity::Error);
return;
}
if (UWorld* World = WorldContextObject->GetWorld())
{
FLatentActionManager& LatentActionManager = World->GetLatentActionManager();
FThreadTasksLoopAction* Action = LatentActionManager.FindExistingAction<FThreadTasksLoopAction>(LatentInfo.CallbackTarget, LatentInfo.UUID);
if (Action && Action->IsRunning())
{
FFrame::KismetExecutionMessage(TEXT(" This node is already running."), ELogVerbosity::Error);
return;
}
else
{
Action = new FThreadTasksLoopAction(WorldContextObject, Out, LatentInfo, UThreadTasksLoop::StaticClass(), LoopNumber, LoopInterval);
LatentActionManager.AddNewAction(LatentInfo.CallbackTarget, LatentInfo.UUID, Action);
}
}
}
FString UThreadLibrary::GetCurrentThreadName(int32 threadID)
{
FString ThreadName = FThreadManager::Get().GetThreadName((uint32)threadID);
return(ThreadName);
}
int32 UThreadLibrary::GetCurrentThreadID()
{
uint32 ThreadID = FPlatformTLS::GetCurrentThreadId();
return(ThreadID);
}
void UThreadLibrary::Sleep(float Seconds)
{
if (!IsInGameThread())
{
FPlatformProcess::Sleep(Seconds);
}
}
int32 UThreadLibrary::GetThreadStackSize()
{
return((int32)FPlatformProcess::GetStackSize());
}
int32 UThreadLibrary::GetNumberOfCores()
{
return FPlatformMisc::NumberOfCores();
}
int32 UThreadLibrary::GetNumberOfCoresIncludingHyperthreads()
{
return FPlatformMisc::NumberOfCoresIncludingHyperthreads();
}
bool UThreadLibrary::IsInGameThread()
{
if (GIsGameThreadIdInitialized)
{
const uint32 CurrentThreadId = FPlatformTLS::GetCurrentThreadId();
return CurrentThreadId == GGameThreadId;
}
return true;
}
UThreadMutex* UThreadLibrary::CreateMutex(UObject* WorldContextObject)
{
UThreadUtility::MutexIndex++;
const FString Name = "ThreadMutex" + FString::FromInt(UThreadUtility::MutexIndex);
return NewObject<UThreadMutex>(WorldContextObject, FName(*Name), RF_Transient);
}
UThreadPool* UThreadLibrary::CreateThreadPool(UObject* WorldContextObject, int32 NumQueuedThreads, int32 StackSize, EThreadPoolPriority ThreadPriority, FString Name)
{
if (nullptr == WorldContextObject)
{
FFrame::KismetExecutionMessage(TEXT("Invalid WorldContextObject."), ELogVerbosity::Error);
return nullptr;
}
if (NumQueuedThreads <= 0)
{
FFrame::KismetExecutionMessage(TEXT("NumQueuedThreads must be >= 1."), ELogVerbosity::Error);
return nullptr;
}
if (NumQueuedThreads <= 0)
{
FFrame::KismetExecutionMessage(TEXT("StackSize must be >= 1."), ELogVerbosity::Error);
return nullptr;
}
UThreadPool* ThreadPool;
UThreadUtility::ThreadPoolIndex++;
const FString PoolName = "PoolThread" + FString::FromInt(UThreadUtility::ThreadPoolIndex);
ThreadPool = NewObject<UThreadPool>(WorldContextObject, FName(*PoolName), RF_Transient);
if (ThreadPool)
{
EThreadPriority LocalThreadPriority = EThreadPriority::TPri_Normal;
switch (ThreadPriority)
{
case EThreadPoolPriority::AboveNormal:
LocalThreadPriority = EThreadPriority::TPri_AboveNormal;
break;
case EThreadPoolPriority::BelowNormal:
LocalThreadPriority = EThreadPriority::TPri_BelowNormal;
break;
case EThreadPoolPriority::Highest:
LocalThreadPriority = EThreadPriority::TPri_Highest;
break;
case EThreadPoolPriority::Lowest:
LocalThreadPriority = EThreadPriority::TPri_Lowest;
break;
case EThreadPoolPriority::Normal:
LocalThreadPriority = EThreadPriority::TPri_Normal;
break;
case EThreadPoolPriority::SlightlyBelowNormal:
LocalThreadPriority = EThreadPriority::TPri_SlightlyBelowNormal;
break;
case EThreadPoolPriority::TimeCritical:
LocalThreadPriority = EThreadPriority::TPri_TimeCritical;
break;
default:
LocalThreadPriority = EThreadPriority::TPri_Normal;
}
const bool bResult = ThreadPool->Create((uint32)NumQueuedThreads, Name, LocalThreadPriority, (uint32)StackSize);
if (bResult)
{
return ThreadPool;
}
else {
FFrame::KismetExecutionMessage(TEXT("Thread Pool could not be created."), ELogVerbosity::Error);
}
}
if (ThreadPool)
{
ThreadPool->ConditionalBeginDestroy();
}
return nullptr;
}
void UThreadLibrary::DestroyThreadPoolImmediately(UThreadPool* ThreadPool)
{
UThreadUtility::RemoveFromRoot(ThreadPool);
if (ThreadPool->Obj.IsValid())
{
ThreadPool->Obj.Reset();
}
}