diff --git a/nodemanager/Version.h b/nodemanager/Version.h index d222c72..04d4319 100644 --- a/nodemanager/Version.h +++ b/nodemanager/Version.h @@ -638,6 +638,18 @@ namespace hpc "Improve GPU instance name readability in metric info", } }, + { "2.5.0.0", + { + "Update building environment (from cpprest2.8 on ubuntu1404 with system gnu libs to cpprest2.10 on ubuntu1804) to fix memory leak issue", + "Fix a bug that heartbeat thread may be stuck due to deadlock", + } + }, + { "2.5.1.0", + { + "Fix a bug that task statistics may be incorrect", + "Decrease node resync and service location requests when the connection to scheduler is poor", + } + }, }; return versionHistory; diff --git a/nodemanager/core/Monitor.cpp b/nodemanager/core/Monitor.cpp index 7e8eac4..90ee51a 100644 --- a/nodemanager/core/Monitor.cpp +++ b/nodemanager/core/Monitor.cpp @@ -22,11 +22,8 @@ Monitor::Monitor(const std::string& nodeName, const std::string& netName, int in : name(nodeName), networkName(netName), lock(PTHREAD_RWLOCK_INITIALIZER), intervalSeconds(interval), isCollected(false) { - Logger::Info("Initializing GPU driver."); - std::string output; - this->gpuInitRet = System::ExecuteCommandOut(output, "nvidia-smi -pm 1"); - Logger::Info("Initialize GPU ret code {0}", this->gpuInitRet); - + InitializeGpuDriver(); + this->collectors["\\Processor\\% Processor Time"] = std::make_shared([this] (const std::string& instanceName) { if (instanceName == "_Total") @@ -631,6 +628,17 @@ void Monitor::Run() } } +void Monitor::InitializeGpuDriver() +{ + Logger::Info("Check nvidia-smi and enable persistence mode for GPU."); + std::string output; + this->gpuInitRet = System::ExecuteCommandOut(output, "nvidia-smi -pm 1 2>/dev/null"); + if (this->gpuInitRet != 0) + { + Logger::Warn("GPU metrics will not be collected."); + } +} + void Monitor::InitializeMetadataRequester() { web::http::client::http_client_config config; diff --git a/nodemanager/core/Monitor.h b/nodemanager/core/Monitor.h index f2b98f4..6509bf3 100644 --- a/nodemanager/core/Monitor.h +++ b/nodemanager/core/Monitor.h @@ -63,6 +63,8 @@ namespace hpc int gpuInitRet; System::GpuInfoList gpuInfo; + void InitializeGpuDriver(); + pthread_rwlock_t lock; int intervalSeconds; diff --git a/nodemanager/core/NamingClient.cpp b/nodemanager/core/NamingClient.cpp index 8a73536..ed45929 100644 --- a/nodemanager/core/NamingClient.cpp +++ b/nodemanager/core/NamingClient.cpp @@ -13,13 +13,21 @@ using namespace hpc::utils; void NamingClient::InvalidateCache() { auto instance = GetInstance(NodeManagerConfig::GetNamingServiceUri()); - if (instance) { + // Avoid updating cache too frequently to decrease scheduler pressure + const int ImmunTimeSeconds = 30; + if ((std::chrono::system_clock::now() - instance->lastClearTime).count() < ImmunTimeSeconds) + { + Logger::Debug("ResolveServiceLocation> Skipped cache clearing."); + return; + } + WriterLock writerLock(&instance->lock); Logger::Debug("ResolveServiceLocation> Cleared."); instance->serviceLocations.clear(); + instance->lastClearTime = std::chrono::system_clock::now(); } } @@ -59,7 +67,7 @@ std::string NamingClient::GetServiceLocation(const std::string& serviceName, ppl } } - Logger::Info("ResolveServiceLocation> Resolved serviceLocation {0} for {1}", location->second, serviceName); + Logger::Debug("ResolveServiceLocation> Resolved serviceLocation {0} for {1}", location->second, serviceName); return location->second; } @@ -75,7 +83,7 @@ void NamingClient::RequestForServiceLocation(const std::string& serviceName, std { selected %= this->namingServicesUri.size(); uri = this->namingServicesUri[selected++] + serviceName; - Logger::Debug("ResolveServiceLocation> Fetching from {0}", uri); + Logger::Info("ResolveServiceLocation> Fetching from {0}", uri); auto client = HttpHelper::GetHttpClient(uri); auto request = HttpHelper::GetHttpRequest(methods::GET); diff --git a/nodemanager/core/NamingClient.h b/nodemanager/core/NamingClient.h index f70d5a0..2960ee7 100644 --- a/nodemanager/core/NamingClient.h +++ b/nodemanager/core/NamingClient.h @@ -5,6 +5,7 @@ #include #include #include "../utils/WriterLock.h" +#include namespace hpc { @@ -47,6 +48,7 @@ namespace hpc std::vector namingServicesUri; pplx::cancellation_token_source cts; pthread_rwlock_t lock; + std::chrono::time_point lastClearTime; }; } } diff --git a/nodemanager/core/Process.cpp b/nodemanager/core/Process.cpp index 81fcb5c..5d99b89 100644 --- a/nodemanager/core/Process.cpp +++ b/nodemanager/core/Process.cpp @@ -94,18 +94,21 @@ const ProcessStatistics& Process::GetStatisticsFromCGroup() std::istringstream statIn(stat); WriterLock writerLock(&this->lock); + + this->statistics.UserTimeMs = 0; statIn >> this->statistics.UserTimeMs; this->statistics.UserTimeMs *= 10; + this->statistics.KernelTimeMs = 0; statIn >> this->statistics.KernelTimeMs; this->statistics.KernelTimeMs *= 10; + this->statistics.WorkingSetKb = 0; statIn >> this->statistics.WorkingSetKb; Logger::Debug(this->jobId, this->taskId, this->requeueCount, "WorkingSet {0}", this->statistics.WorkingSetKb); this->statistics.WorkingSetKb /= 1024; this->statistics.ProcessIds.clear(); - int id; while (statIn >> id) { diff --git a/nodemanager/core/RemoteExecutor.cpp b/nodemanager/core/RemoteExecutor.cpp index 8e85804..621e951 100644 --- a/nodemanager/core/RemoteExecutor.cpp +++ b/nodemanager/core/RemoteExecutor.cpp @@ -606,7 +606,7 @@ void RemoteExecutor::StartHeartbeat() [this]() { this->UpdateStatistics(); return this->jobTaskTable.ToJson(); }, [this](int retryCount) { NamingClient::InvalidateCache(); - if (retryCount > 2) + if (retryCount > 5) { this->jobTaskTable.RequestResync(); } @@ -617,7 +617,7 @@ void RemoteExecutor::StartHeartbeat() void RemoteExecutor::UpdateStatistics() { - Logger::Debug(0, 0, 0, "Updating tasks' statistics."); + Logger::Info("Update tasks' statistics."); auto* table = JobTaskTable::GetInstance(); if (table != nullptr) @@ -625,6 +625,7 @@ void RemoteExecutor::UpdateStatistics() auto tasks = table->GetAllTasks(); for (const auto& taskInfo : tasks) { + ReaderLock readerLock(&this->lock); auto p = this->processes.find(taskInfo->ProcessKey); if (p != this->processes.end()) { @@ -686,7 +687,7 @@ void RemoteExecutor::StartRegister() [this]() { return this->monitor.GetRegisterInfo(); }, [this](int retryCount) { NamingClient::InvalidateCache(); - if (retryCount > 2) + if (retryCount > 5) { this->jobTaskTable.RequestResync(); } @@ -742,6 +743,7 @@ pplx::task RemoteExecutor::Metric(std::string&& callbackUri) NodeManagerConfig::SaveMetricUri(callbackUri); // callbackUri is like udp://server:port/api/nodeguid/metricreported + Logger::Info("Start reporting metrics to {0}", callbackUri); this->StartMetric(); } diff --git a/nodemanager/core/Reporter.h b/nodemanager/core/Reporter.h index 5ebc9c4..f45133f 100644 --- a/nodemanager/core/Reporter.h +++ b/nodemanager/core/Reporter.h @@ -28,19 +28,20 @@ namespace hpc if (this->getReportUri) { pthread_create(&this->threadId, nullptr, ReportingThread, this); + Logger::Debug("Started the thread {0} for Reporter {1}", this->threadId, this->name); } } void Stop() { - Logger::Debug("Stopping the thread of Reporter {0}", this->name); + Logger::Debug("Stopping the thread {0} for Reporter {1}", this->threadId, this->name); this->isRunning = false; this->cts.cancel(); if (this->threadId != 0) { while (this->inRequest) usleep(1); pthread_join(this->threadId, nullptr); - Logger::Debug("Stopped the thread of Reporter {0}", this->name); + Logger::Debug("Stopped the thread {0} for Reporter {1}", this->threadId, this->name); } } diff --git a/nodemanager/filters/OnJobTaskStart.sh b/nodemanager/filters/OnJobTaskStart.sh index d326b70..5d5b306 100755 --- a/nodemanager/filters/OnJobTaskStart.sh +++ b/nodemanager/filters/OnJobTaskStart.sh @@ -1 +1 @@ -sed -s 's/123/456/g' +cat \ No newline at end of file diff --git a/nodemanager/filters/OnTaskStart.sh b/nodemanager/filters/OnTaskStart.sh index d326b70..5d5b306 100755 --- a/nodemanager/filters/OnTaskStart.sh +++ b/nodemanager/filters/OnTaskStart.sh @@ -1 +1 @@ -sed -s 's/123/456/g' +cat \ No newline at end of file diff --git a/nodemanager/scripts/CleanupAllTasks.sh b/nodemanager/scripts/CleanupAllTasks.sh index 7452ada..c4d0e7d 100644 --- a/nodemanager/scripts/CleanupAllTasks.sh +++ b/nodemanager/scripts/CleanupAllTasks.sh @@ -4,7 +4,7 @@ echo -docker version > /dev/nul +docker version >/dev/null 2>&1 if [ $? -eq 0 ]; then echo "Cleaning up docker containers..." containers=$(docker ps -a -q -f name=^/$(GetContainerName)) diff --git a/nodemanager/scripts/Statistics.sh b/nodemanager/scripts/Statistics.sh index 07e4a8c..c8d0a02 100644 --- a/nodemanager/scripts/Statistics.sh +++ b/nodemanager/scripts/Statistics.sh @@ -13,8 +13,8 @@ isDockerTask=$(CheckDockerEnvFileExist $taskFolder) userTime10Ms=0 kernelTime10Ms=0 -processes="" workingSetBytes=0 +processes="" function GetCpuStatFile { @@ -22,10 +22,10 @@ function GetCpuStatFile GetGroupFile "$groupName" cpuacct cpuacct.stat } -function GetCpuacctTasksFile +function GetCpusetTasksFile { local groupName=$1 - GetGroupFile "$groupName" cpuacct tasks + GetGroupFile "$groupName" cpuset tasks } function GetMemoryMaxusageFile @@ -44,18 +44,21 @@ if $CGInstalled && ! $cgDisabled; then fi statFile=$(GetCpuStatFile "$groupName") - tasksFile=$(GetCpuacctTasksFile "$groupName") workingSetFile=$(GetMemoryMaxusageFile "$groupName") + tasksFile=$(GetCpusetTasksFile "$groupName") - cut -d" " -f2 "$statFile" - cat "$workingSetFile" - tr "\\n" " " < "$tasksFile" - - echo -else - echo $userTime10Ms - echo $kernelTime10Ms - echo $workingSetBytes - echo $processes - echo + read ignore tempUserTime < <(sed -n 1p "$statFile") + [ -z "$tempUserTime" ] || userTime10Ms=$tempUserTime + read ignore tempKernelTime < <(sed -n 2p "$statFile") + [ -z "$tempKernelTime" ] || kernelTime10Ms=$tempKernelTime + tempWorkingSet=`cat "$workingSetFile"` + [ -z "$tempWorkingSet" ] || workingSetBytes=$tempWorkingSet + tempProcesses=`cat "$tasksFile"` + [ -z "$tempProcesses" ] || processes=$tempProcesses fi + +echo $userTime10Ms +echo $kernelTime10Ms +echo $workingSetBytes +echo $processes +echo