This commit is contained in:
chezhang 2021-06-30 21:38:03 +08:00
Родитель 5f95b862b3 98c508be5f
Коммит 5e5f3ea021
12 изменённых файлов: 73 добавлений и 32 удалений

Просмотреть файл

@ -638,6 +638,18 @@ namespace hpc
"Improve GPU instance name readability in metric info",
}
},
{ "2.5.0.0",
{
"Update building environment (from cpprest2.8 on ubuntu1404 with system gnu libs to cpprest2.10 on ubuntu1804) to fix memory leak issue",
"Fix a bug that heartbeat thread may be stuck due to deadlock",
}
},
{ "2.5.1.0",
{
"Fix a bug that task statistics may be incorrect",
"Decrease node resync and service location requests when the connection to scheduler is poor",
}
},
};
return versionHistory;

Просмотреть файл

@ -22,11 +22,8 @@ Monitor::Monitor(const std::string& nodeName, const std::string& netName, int in
: name(nodeName), networkName(netName), lock(PTHREAD_RWLOCK_INITIALIZER), intervalSeconds(interval),
isCollected(false)
{
Logger::Info("Initializing GPU driver.");
std::string output;
this->gpuInitRet = System::ExecuteCommandOut(output, "nvidia-smi -pm 1");
Logger::Info("Initialize GPU ret code {0}", this->gpuInitRet);
InitializeGpuDriver();
this->collectors["\\Processor\\% Processor Time"] = std::make_shared<MetricCollectorBase>([this] (const std::string& instanceName)
{
if (instanceName == "_Total")
@ -631,6 +628,17 @@ void Monitor::Run()
}
}
void Monitor::InitializeGpuDriver()
{
Logger::Info("Check nvidia-smi and enable persistence mode for GPU.");
std::string output;
this->gpuInitRet = System::ExecuteCommandOut(output, "nvidia-smi -pm 1 2>/dev/null");
if (this->gpuInitRet != 0)
{
Logger::Warn("GPU metrics will not be collected.");
}
}
void Monitor::InitializeMetadataRequester()
{
web::http::client::http_client_config config;

Просмотреть файл

@ -63,6 +63,8 @@ namespace hpc
int gpuInitRet;
System::GpuInfoList gpuInfo;
void InitializeGpuDriver();
pthread_rwlock_t lock;
int intervalSeconds;

Просмотреть файл

@ -13,13 +13,21 @@ using namespace hpc::utils;
void NamingClient::InvalidateCache()
{
auto instance = GetInstance(NodeManagerConfig::GetNamingServiceUri());
if (instance)
{
// Avoid updating cache too frequently to decrease scheduler pressure
const int ImmunTimeSeconds = 30;
if ((std::chrono::system_clock::now() - instance->lastClearTime).count() < ImmunTimeSeconds)
{
Logger::Debug("ResolveServiceLocation> Skipped cache clearing.");
return;
}
WriterLock writerLock(&instance->lock);
Logger::Debug("ResolveServiceLocation> Cleared.");
instance->serviceLocations.clear();
instance->lastClearTime = std::chrono::system_clock::now();
}
}
@ -59,7 +67,7 @@ std::string NamingClient::GetServiceLocation(const std::string& serviceName, ppl
}
}
Logger::Info("ResolveServiceLocation> Resolved serviceLocation {0} for {1}", location->second, serviceName);
Logger::Debug("ResolveServiceLocation> Resolved serviceLocation {0} for {1}", location->second, serviceName);
return location->second;
}
@ -75,7 +83,7 @@ void NamingClient::RequestForServiceLocation(const std::string& serviceName, std
{
selected %= this->namingServicesUri.size();
uri = this->namingServicesUri[selected++] + serviceName;
Logger::Debug("ResolveServiceLocation> Fetching from {0}", uri);
Logger::Info("ResolveServiceLocation> Fetching from {0}", uri);
auto client = HttpHelper::GetHttpClient(uri);
auto request = HttpHelper::GetHttpRequest(methods::GET);

Просмотреть файл

@ -5,6 +5,7 @@
#include <cpprest/http_client.h>
#include <functional>
#include "../utils/WriterLock.h"
#include <chrono>
namespace hpc
{
@ -47,6 +48,7 @@ namespace hpc
std::vector<std::string> namingServicesUri;
pplx::cancellation_token_source cts;
pthread_rwlock_t lock;
std::chrono::time_point<std::chrono::system_clock> lastClearTime;
};
}
}

Просмотреть файл

@ -94,18 +94,21 @@ const ProcessStatistics& Process::GetStatisticsFromCGroup()
std::istringstream statIn(stat);
WriterLock writerLock(&this->lock);
this->statistics.UserTimeMs = 0;
statIn >> this->statistics.UserTimeMs;
this->statistics.UserTimeMs *= 10;
this->statistics.KernelTimeMs = 0;
statIn >> this->statistics.KernelTimeMs;
this->statistics.KernelTimeMs *= 10;
this->statistics.WorkingSetKb = 0;
statIn >> this->statistics.WorkingSetKb;
Logger::Debug(this->jobId, this->taskId, this->requeueCount, "WorkingSet {0}", this->statistics.WorkingSetKb);
this->statistics.WorkingSetKb /= 1024;
this->statistics.ProcessIds.clear();
int id;
while (statIn >> id)
{

Просмотреть файл

@ -606,7 +606,7 @@ void RemoteExecutor::StartHeartbeat()
[this]() { this->UpdateStatistics(); return this->jobTaskTable.ToJson(); },
[this](int retryCount) {
NamingClient::InvalidateCache();
if (retryCount > 2)
if (retryCount > 5)
{
this->jobTaskTable.RequestResync();
}
@ -617,7 +617,7 @@ void RemoteExecutor::StartHeartbeat()
void RemoteExecutor::UpdateStatistics()
{
Logger::Debug(0, 0, 0, "Updating tasks' statistics.");
Logger::Info("Update tasks' statistics.");
auto* table = JobTaskTable::GetInstance();
if (table != nullptr)
@ -625,6 +625,7 @@ void RemoteExecutor::UpdateStatistics()
auto tasks = table->GetAllTasks();
for (const auto& taskInfo : tasks)
{
ReaderLock readerLock(&this->lock);
auto p = this->processes.find(taskInfo->ProcessKey);
if (p != this->processes.end())
{
@ -686,7 +687,7 @@ void RemoteExecutor::StartRegister()
[this]() { return this->monitor.GetRegisterInfo(); },
[this](int retryCount) {
NamingClient::InvalidateCache();
if (retryCount > 2)
if (retryCount > 5)
{
this->jobTaskTable.RequestResync();
}
@ -742,6 +743,7 @@ pplx::task<json::value> RemoteExecutor::Metric(std::string&& callbackUri)
NodeManagerConfig::SaveMetricUri(callbackUri);
// callbackUri is like udp://server:port/api/nodeguid/metricreported
Logger::Info("Start reporting metrics to {0}", callbackUri);
this->StartMetric();
}

Просмотреть файл

@ -28,19 +28,20 @@ namespace hpc
if (this->getReportUri)
{
pthread_create(&this->threadId, nullptr, ReportingThread, this);
Logger::Debug("Started the thread {0} for Reporter {1}", this->threadId, this->name);
}
}
void Stop()
{
Logger::Debug("Stopping the thread of Reporter {0}", this->name);
Logger::Debug("Stopping the thread {0} for Reporter {1}", this->threadId, this->name);
this->isRunning = false;
this->cts.cancel();
if (this->threadId != 0)
{
while (this->inRequest) usleep(1);
pthread_join(this->threadId, nullptr);
Logger::Debug("Stopped the thread of Reporter {0}", this->name);
Logger::Debug("Stopped the thread {0} for Reporter {1}", this->threadId, this->name);
}
}

Просмотреть файл

@ -1 +1 @@
sed -s 's/123/456/g'
cat

Просмотреть файл

@ -1 +1 @@
sed -s 's/123/456/g'
cat

Просмотреть файл

@ -4,7 +4,7 @@
echo
docker version > /dev/nul
docker version >/dev/null 2>&1
if [ $? -eq 0 ]; then
echo "Cleaning up docker containers..."
containers=$(docker ps -a -q -f name=^/$(GetContainerName))

Просмотреть файл

@ -13,8 +13,8 @@ isDockerTask=$(CheckDockerEnvFileExist $taskFolder)
userTime10Ms=0
kernelTime10Ms=0
processes=""
workingSetBytes=0
processes=""
function GetCpuStatFile
{
@ -22,10 +22,10 @@ function GetCpuStatFile
GetGroupFile "$groupName" cpuacct cpuacct.stat
}
function GetCpuacctTasksFile
function GetCpusetTasksFile
{
local groupName=$1
GetGroupFile "$groupName" cpuacct tasks
GetGroupFile "$groupName" cpuset tasks
}
function GetMemoryMaxusageFile
@ -44,18 +44,21 @@ if $CGInstalled && ! $cgDisabled; then
fi
statFile=$(GetCpuStatFile "$groupName")
tasksFile=$(GetCpuacctTasksFile "$groupName")
workingSetFile=$(GetMemoryMaxusageFile "$groupName")
tasksFile=$(GetCpusetTasksFile "$groupName")
cut -d" " -f2 "$statFile"
cat "$workingSetFile"
tr "\\n" " " < "$tasksFile"
echo
else
echo $userTime10Ms
echo $kernelTime10Ms
echo $workingSetBytes
echo $processes
echo
read ignore tempUserTime < <(sed -n 1p "$statFile")
[ -z "$tempUserTime" ] || userTime10Ms=$tempUserTime
read ignore tempKernelTime < <(sed -n 2p "$statFile")
[ -z "$tempKernelTime" ] || kernelTime10Ms=$tempKernelTime
tempWorkingSet=`cat "$workingSetFile"`
[ -z "$tempWorkingSet" ] || workingSetBytes=$tempWorkingSet
tempProcesses=`cat "$tasksFile"`
[ -z "$tempProcesses" ] || processes=$tempProcesses
fi
echo $userTime10Ms
echo $kernelTime10Ms
echo $workingSetBytes
echo $processes
echo