EvanCui 2015-04-14 19:52:08 +08:00
Родитель 9f967d0faa 2d4008c855
Коммит a0cdf3cd2b
21 изменённых файлов: 514 добавлений и 205 удалений

Просмотреть файл

@ -48,6 +48,8 @@
<Add directory="../../spdlog/include" />
</Compiler>
<Unit filename="Readme.txt" />
<Unit filename="Version.cpp" />
<Unit filename="Version.h" />
<Unit filename="arguments/EndJobArgs.cpp" />
<Unit filename="arguments/EndJobArgs.h" />
<Unit filename="arguments/EndTaskArgs.cpp" />
@ -58,6 +60,7 @@
<Unit filename="arguments/StartJobAndTaskArgs.h" />
<Unit filename="arguments/StartTaskArgs.cpp" />
<Unit filename="arguments/StartTaskArgs.h" />
<Unit filename="common/ErrorCodes.h" />
<Unit filename="core/IRemoteExecutor.h" />
<Unit filename="core/JobTaskTable.cpp" />
<Unit filename="core/JobTaskTable.h" />

Просмотреть файл

@ -24,7 +24,7 @@
<map>
<cpprest/json.h>
1422965830 /home/evanc/casablanca/Release/include/cpprest/json.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/json.h
<memory>
<string>
<sstream>
@ -34,7 +34,7 @@
"cpprest/details/basic_types.h"
"cpprest/asyncrt_utils.h"
1422965830 /home/evanc/casablanca/Release/include/cpprest/details/basic_types.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/details/basic_types.h
<string>
<fstream>
<iostream>
@ -53,13 +53,15 @@
1427351029 /home/evanc/casablanca/Release/include/cpprest/details/nosal.h
1422965830 /home/evanc/casablanca/Release/include/cpprest/details/SafeInt3.hpp
<assert.h>
1427351029 /home/evanc/casablanca/Release/include/cpprest/details/SafeInt3.hpp
<cstddef>
<cstdlib>
<intrin.h>
<stdint.h>
<assert.h>
<stdlib.h>
1422965830 /home/evanc/casablanca/Release/include/cpprest/asyncrt_utils.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/asyncrt_utils.h
<string>
<vector>
<cstdint>
@ -155,12 +157,12 @@
1423402903 /home/evanc/whpc-linux-communicator/NodeManager/Test.h
<cpprest/http_listener.h>
1422965830 /home/evanc/casablanca/Release/include/cpprest/http_listener.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/http_listener.h
<limits>
<functional>
"cpprest/http_msg.h"
1422965830 /home/evanc/casablanca/Release/include/cpprest/http_msg.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/http_msg.h
<map>
<memory>
<string>
@ -178,11 +180,11 @@
"cpprest/details/http_constants.dat"
"cpprest/details/http_constants.dat"
1422965830 /home/evanc/casablanca/Release/include/cpprest/uri.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/uri.h
"cpprest/base_uri.h"
"cpprest/uri_builder.h"
1422965830 /home/evanc/casablanca/Release/include/cpprest/base_uri.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/base_uri.h
<map>
<memory>
<string>
@ -191,13 +193,13 @@
"cpprest/asyncrt_utils.h"
"cpprest/details/basic_types.h"
1422965830 /home/evanc/casablanca/Release/include/cpprest/uri_builder.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/uri_builder.h
<sstream>
<string>
<vector>
"cpprest/base_uri.h"
1422965830 /home/evanc/casablanca/Release/include/cpprest/http_headers.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/http_headers.h
<map>
<memory>
<string>
@ -205,11 +207,11 @@
<system_error>
"cpprest/asyncrt_utils.h"
1422965830 /home/evanc/casablanca/Release/include/cpprest/streams.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/streams.h
"cpprest/astreambuf.h"
<iosfwd>
1422965830 /home/evanc/casablanca/Release/include/cpprest/astreambuf.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/astreambuf.h
<ios>
<memory>
<cstring>
@ -218,7 +220,7 @@
"cpprest/details/basic_types.h"
"cpprest/asyncrt_utils.h"
1422965830 /home/evanc/casablanca/Release/include/cpprest/containerstream.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/containerstream.h
<vector>
<queue>
<algorithm>
@ -227,7 +229,7 @@
"cpprest/astreambuf.h"
"cpprest/streams.h"
1422965830 /home/evanc/casablanca/Release/include/cpprest/details/http_constants.dat
1427351029 /home/evanc/casablanca/Release/include/cpprest/details/http_constants.dat
1423734326 source:/home/evanc/whpc-linux-communicator/NodeManager/RemoteCommunicator.cpp
<sstream>
@ -416,86 +418,88 @@
"../common.h"
"../details/log_msg.h"
1426473053 source:/home/evanc/whpc-linux-communicator/nodemanager/main.cpp
1428909167 source:/home/evanc/whpc-linux-communicator/nodemanager/main.cpp
<iostream>
<cpprest/http_listener.h>
<cpprest/json.h>
"utils/Logger.h"
"core/RemoteCommunicator.h"
"core/RemoteExecutor.h"
"Version.h"
1427353166 /home/evanc/whpc-linux-communicator/nodemanager/utils/Logger.h
1428488455 /home/evanc/whpc-linux-communicator/nodemanager/utils/Logger.h
<syslog.h>
<stdio.h>
<stdarg.h>
<iostream>
<spdlog/spdlog.h>
"String.h"
1426473053 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteCommunicator.h
1427722564 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteCommunicator.h
<cpprest/http_listener.h>
<cpprest/json.h>
"../utils/Logger.h"
"IRemoteExecutor.h"
1424929892 /home/evanc/whpc-linux-communicator/nodemanager/core/IRemoteExecutor.h
1427722694 /home/evanc/whpc-linux-communicator/nodemanager/core/IRemoteExecutor.h
"../arguments/StartJobAndTaskArgs.h"
"../arguments/StartTaskArgs.h"
"../arguments/EndJobArgs.h"
"../arguments/EndTaskArgs.h"
1426473053 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteExecutor.h
1428574742 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteExecutor.h
"IRemoteExecutor.h"
"JobTaskTable.h"
"Monitor.h"
"Process.h"
"Reporter.h"
1424929892 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/EndJobArgs.cpp
1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/EndJobArgs.cpp
"EndJobArgs.h"
"../utils/JsonHelper.h"
1424929892 /home/evanc/whpc-linux-communicator/nodemanager/arguments/EndJobArgs.h
1427353166 /home/evanc/whpc-linux-communicator/nodemanager/arguments/EndJobArgs.h
<cpprest/json.h>
1427272086 /home/evanc/whpc-linux-communicator/nodemanager/utils/JsonHelper.h
1427383873 /home/evanc/whpc-linux-communicator/nodemanager/utils/JsonHelper.h
<vector>
<map>
<cpprest/json.h>
1424929892 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/EndTaskArgs.cpp
1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/EndTaskArgs.cpp
"EndTaskArgs.h"
"../utils/JsonHelper.h"
1424929892 /home/evanc/whpc-linux-communicator/nodemanager/arguments/EndTaskArgs.h
1427353166 /home/evanc/whpc-linux-communicator/nodemanager/arguments/EndTaskArgs.h
<cpprest/json.h>
1426473053 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.cpp
1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.cpp
"ProcessStartInfo.h"
"../utils/JsonHelper.h"
1426473053 /home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.h
1427353166 /home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.h
<string>
<vector>
<map>
<cpprest/json.h>
1424929892 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/StartJobAndTaskArgs.cpp
1427383873 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/StartJobAndTaskArgs.cpp
"StartJobAndTaskArgs.h"
"../utils/JsonHelper.h"
1424929892 /home/evanc/whpc-linux-communicator/nodemanager/arguments/StartJobAndTaskArgs.h
1427383873 /home/evanc/whpc-linux-communicator/nodemanager/arguments/StartJobAndTaskArgs.h
<cpprest/json.h>
"ProcessStartInfo.h"
1424929892 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/StartTaskArgs.cpp
1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/StartTaskArgs.cpp
"StartTaskArgs.h"
"../utils/JsonHelper.h"
1424929892 /home/evanc/whpc-linux-communicator/nodemanager/arguments/StartTaskArgs.h
1427353166 /home/evanc/whpc-linux-communicator/nodemanager/arguments/StartTaskArgs.h
<cpprest/json.h>
"ProcessStartInfo.h"
1427724395 source:/home/evanc/whpc-linux-communicator/nodemanager/core/Process.cpp
1428574597 source:/home/evanc/whpc-linux-communicator/nodemanager/core/Process.cpp
<memory.h>
<sys/wait.h>
<sys/resource.h>
@ -503,8 +507,9 @@
"Process.h"
"../utils/Logger.h"
"../utils/String.h"
"../common/ErrorCodes.h"
1428041425 /home/evanc/whpc-linux-communicator/nodemanager/core/Process.h
1428574604 /home/evanc/whpc-linux-communicator/nodemanager/core/Process.h
<string>
<sstream>
<vector>
@ -518,6 +523,7 @@
"../utils/String.h"
"../utils/Logger.h"
"../utils/System.h"
"../common/ErrorCodes.h"
1427353166 /home/evanc/whpc-linux-communicator/nodemanager/utils/String.h
<string>
@ -525,19 +531,20 @@
<sstream>
<algorithm>
1426473053 source:/home/evanc/whpc-linux-communicator/nodemanager/core/RemoteCommunicator.cpp
1428486600 source:/home/evanc/whpc-linux-communicator/nodemanager/core/RemoteCommunicator.cpp
<sstream>
"RemoteCommunicator.h"
"../utils/String.h"
"../utils/System.h"
"../arguments/StartJobAndTaskArgs.h"
1427383873 /home/evanc/whpc-linux-communicator/nodemanager/utils/System.h
1428574577 /home/evanc/whpc-linux-communicator/nodemanager/utils/System.h
<string>
"String.h"
"Logger.h"
"../common/ErrorCodes.h"
1427192138 source:/home/evanc/whpc-linux-communicator/nodemanager/core/RemoteExecutor.cpp
1428574636 source:/home/evanc/whpc-linux-communicator/nodemanager/core/RemoteExecutor.cpp
<cpprest/http_client.h>
<memory>
"RemoteExecutor.h"
@ -545,14 +552,15 @@
"../utils/ReaderLock.h"
"../utils/Logger.h"
"../utils/System.h"
"../common/ErrorCodes.h"
1426473053 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/JsonHelper.cpp
1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/JsonHelper.cpp
"JsonHelper.h"
1426595924 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/Logger.cpp
1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/Logger.cpp
"Logger.h"
1426473053 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/String.cpp
1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/String.cpp
<sstream>
"String.h"
@ -571,39 +579,39 @@
"String.h"
"Logger.h"
1426602483 source:/home/evanc/whpc-linux-communicator/nodemanager/core/JobTaskTable.cpp
1428485409 source:/home/evanc/whpc-linux-communicator/nodemanager/core/JobTaskTable.cpp
"JobTaskTable.h"
"../utils/WriterLock.h"
"../utils/ReaderLock.h"
1426602464 /home/evanc/whpc-linux-communicator/nodemanager/core/JobTaskTable.h
1428485409 /home/evanc/whpc-linux-communicator/nodemanager/core/JobTaskTable.h
<map>
<cpprest/json.h>
"../data/TaskInfo.h"
"../data/JobInfo.h"
"../data/NodeInfo.h"
1426597393 /home/evanc/whpc-linux-communicator/nodemanager/data/TaskInfo.h
1428485409 /home/evanc/whpc-linux-communicator/nodemanager/data/TaskInfo.h
<cpprest/json.h>
<vector>
<string>
<memory>
1424929911 /home/evanc/whpc-linux-communicator/nodemanager/data/JobInfo.h
1427353166 /home/evanc/whpc-linux-communicator/nodemanager/data/JobInfo.h
<cpprest/json.h>
<map>
"TaskInfo.h"
1424947474 /home/evanc/whpc-linux-communicator/nodemanager/data/NodeInfo.h
1427353166 /home/evanc/whpc-linux-communicator/nodemanager/data/NodeInfo.h
<cpprest/json.h>
<string>
<map>
"JobInfo.h"
1424929892 /home/evanc/whpc-linux-communicator/nodemanager/utils/WriterLock.h
1427353166 /home/evanc/whpc-linux-communicator/nodemanager/utils/WriterLock.h
<pthread.h>
1422965830 /home/evanc/casablanca/Release/include/cpprest/http_client.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/http_client.h
<wrl.h>
<msxml6.h>
<memory>
@ -618,45 +626,45 @@
"cpprest/oauth1.h"
"cpprest/oauth2.h"
1422965830 /home/evanc/casablanca/Release/include/cpprest/details/web_utilities.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/details/web_utilities.h
"cpprest/asyncrt_utils.h"
1422965830 /home/evanc/casablanca/Release/include/cpprest/oauth1.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/oauth1.h
"cpprest/http_msg.h"
"cpprest/details/http_constants.dat"
"cpprest/details/http_constants.dat"
1422965830 /home/evanc/casablanca/Release/include/cpprest/oauth2.h
1427351029 /home/evanc/casablanca/Release/include/cpprest/oauth2.h
"cpprest/http_msg.h"
"cpprest/details/http_constants.dat"
1424940673 source:/home/evanc/whpc-linux-communicator/nodemanager/data/JobInfo.cpp
1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/data/JobInfo.cpp
"JobInfo.h"
1424948178 source:/home/evanc/whpc-linux-communicator/nodemanager/data/NodeInfo.cpp
1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/data/NodeInfo.cpp
"NodeInfo.h"
"../utils/System.h"
1426597885 source:/home/evanc/whpc-linux-communicator/nodemanager/data/TaskInfo.cpp
1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/data/TaskInfo.cpp
"TaskInfo.h"
"../utils/JsonHelper.h"
"../utils/String.h"
1424929892 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/ReaderLock.cpp
1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/ReaderLock.cpp
"ReaderLock.h"
1424929892 /home/evanc/whpc-linux-communicator/nodemanager/utils/ReaderLock.h
1427353166 /home/evanc/whpc-linux-communicator/nodemanager/utils/ReaderLock.h
<pthread.h>
1424929892 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/WriterLock.cpp
"WriterLock.h"
1427272469 /home/evanc/whpc-linux-communicator/nodemanager/core/Monitor.h
1427383873 /home/evanc/whpc-linux-communicator/nodemanager/core/Monitor.h
<cpprest/json.h>
<map>
"../utils/System.h"
1427272649 source:/home/evanc/whpc-linux-communicator/nodemanager/core/Monitor.cpp
1428571893 source:/home/evanc/whpc-linux-communicator/nodemanager/core/Monitor.cpp
<pthread.h>
"Monitor.h"
"../utils/ReaderLock.h"
@ -664,12 +672,23 @@
"../utils/Logger.h"
"../utils/System.h"
1426473053 /home/evanc/whpc-linux-communicator/nodemanager/core/Reporter.h
1428578115 /home/evanc/whpc-linux-communicator/nodemanager/core/Reporter.h
<cpprest/json.h>
<functional>
1426473053 source:/home/evanc/whpc-linux-communicator/nodemanager/core/Reporter.cpp
1428573779 source:/home/evanc/whpc-linux-communicator/nodemanager/core/Reporter.cpp
<cpprest/http_client.h>
"Reporter.h"
"../utils/Logger.h"
1428574569 /home/evanc/whpc-linux-communicator/nodemanager/common/ErrorCodes.h
1428909113 source:/home/evanc/whpc-linux-communicator/nodemanager/Version.cpp
"Version.h"
1429007613 /home/evanc/whpc-linux-communicator/nodemanager/Version.h
<string>
<map>
<vector>
<iostream>

1
nodemanager/Version.cpp Normal file
Просмотреть файл

@ -0,0 +1 @@
#include "Version.h"

87
nodemanager/Version.h Normal file
Просмотреть файл

@ -0,0 +1,87 @@
#ifndef VERSION_H_INCLUDED
#define VERSION_H_INCLUDED
#include <string>
#include <map>
#include <vector>
#include <iostream>
namespace hpc
{
class Version
{
public:
static const std::map<std::string, std::vector<std::string>>& GetVersionHistory()
{
static std::map<std::string, std::vector<std::string>> versionHistory =
{
{ "1.1.1.1",
{
"Node manager main functionality support",
"Added version support",
"Fixed network card reversed order issue",
"Added trace",
"Added error codes definition",
"Fixed a potential node error issue",
}
},
{ "1.1.1.2",
{
"Fixed a long running issue because of callback failure",
"Added version history support",
}
},
{ "1.1.1.3",
{
"Fixed a long running issue because of callback contract mismatch",
}
},
{ "1.1.1.4",
{
"Retry when create cgroup failed",
"Return the exit code and error message when PrepareTask",
"Record the output to message in Process",
}
},
{ "1.1.1.5",
{
"Print out version history",
}
},
};
return versionHistory;
}
static const std::string& GetVersion()
{
auto& h = GetVersionHistory();
auto it = --h.end();
return it->first;
}
static void PrintVersionHistory()
{
auto& h = GetVersionHistory();
for (auto& v : h)
{
std::cout << v.first << std::endl;
std::cout << "================================================================" << std::endl;
int number = 0;
for (auto& m : v.second)
{
number++;
std::cout << number << ". " << m << std::endl;
}
std::cout << std::endl;
}
}
private:
};
}
#endif // VERSION_H_INCLUDED

Просмотреть файл

@ -0,0 +1,20 @@
#ifndef ERRORCODES_H
#define ERRORCODES_H
namespace hpc
{
namespace common
{
enum class ErrorCodes
{
EndJobExitCode = -101,
EndTaskExitCode = -102,
BuildScriptError = -103,
DefaultExitCode = -999,
GetHostNameError = -104,
PopenError = -105,
};
}
}
#endif // ERRORCODES_H

Просмотреть файл

@ -91,7 +91,7 @@ void Monitor::Run()
while (true)
{
time_t t;
time(&t);
time(&t);
long int cpuCurrent = cpuLast + 1, idleCurrent = idleLast;
@ -100,7 +100,7 @@ void Monitor::Run()
long int idleDiff = idleCurrent - idleLast;
float cpuUsage = (float)(100.0f * (totalDiff - idleDiff) / totalDiff);
cpuLast = cpuCurrent;
idleLast = idleCurrent;
idleLast = idleCurrent;
unsigned long available, total;
System::Memory(available, total);
@ -108,9 +108,15 @@ void Monitor::Run()
float totalMemoryMb = (float)total / 1024.0f;
long int networkCurrent = 0;
System::NetworkUsage(networkCurrent, "eth0");
int ret = System::NetworkUsage(networkCurrent, "eth0");
if (ret != 0)
{
Logger::Error("Error occurred while collecting network usage {0}", ret);
}
float networkUsage = (float)(networkCurrent - networkLast) / this->intervalSeconds;
networkLast = networkCurrent;
networkLast = networkCurrent;
// ip address;
std::string ipAddress = System::GetIpAddress(IpAddressVersion::V4, "eth0");
@ -140,6 +146,7 @@ void Monitor::Run()
this->socketCount = sockets;
this->distroInfo = distro;
this->networkInfo = std::move(netInfo);
}
this->isCollected = true;
@ -152,8 +159,9 @@ void* Monitor::MonitoringThread(void* arg)
{
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);
Monitor* m = static_cast<Monitor*>(arg);
Logger::Info("Monitoring thread created. Interval {0}", m->intervalSeconds);
m->Run();
pthread_exit(nullptr);

Просмотреть файл

@ -5,13 +5,17 @@
#include "Process.h"
#include "../utils/Logger.h"
#include "../utils/String.h"
#include "../utils/String.h"
#include "../common/ErrorCodes.h"
using namespace hpc::core;
using namespace hpc::utils;
using namespace hpc::utils;
using namespace hpc::common;
Process::Process(
int taskId,
Process::Process(
int jobId,
int taskId,
int requeueCount,
const std::string& cmdLine,
const std::string& standardOut,
const std::string& standardErr,
@ -19,8 +23,8 @@ Process::Process(
const std::string& workDir,
std::vector<long>&& cpuAffinity,
std::map<std::string, std::string>&& envi,
const std::function<Callback> completed) :
taskId(taskId),
const std::function<Callback> completed) :
jobId(jobId), taskId(taskId), requeueCount(requeueCount), taskExecutionId(String::Join("_", taskId, requeueCount)),
commandLine(cmdLine), stdOutFile(standardOut), stdErrFile(standardErr), stdInFile(standardIn),
workDirectory(workDir), affinity(cpuAffinity), environments(envi), callback(completed), processId(0)
{
@ -36,7 +40,7 @@ pplx::task<pid_t> Process::Start()
{
pthread_create(&this->threadId, nullptr, ForkThread, this);
Logger::Debug("Created thread {0}", this->threadId);
Logger::Debug(this->jobId, this->taskId, this->requeueCount, "Created thread {0}", this->threadId);
return pplx::task<pid_t>(this->started);
}
@ -45,13 +49,13 @@ void Process::Kill(int forcedExitCode)
{
if (forcedExitCode != 0x0FFFFFFF)
{
Logger::Debug(this->jobId, this->taskId, this->requeueCount, "Setting forced ExitCode {0}", forcedExitCode);
this->SetExitCode(forcedExitCode);
}
if (this->processId != 0)
{
this->ExecuteCommand("/bin/bash", "EndTask.sh", this->taskId, this->processId);
this->processId = 0;
this->ExecuteCommand("/bin/bash", "EndTask.sh", this->taskExecutionId, this->processId);
}
this->GetStatisticsFromCGroup();
@ -70,11 +74,11 @@ void Process::OnCompleted()
}
catch (const std::exception& ex)
{
Logger::Error("Exception happened when callback, ex = {0}", ex.what());
Logger::Error(this->jobId, this->taskId, this->requeueCount, "Exception happened when callback, ex = {0}", ex.what());
}
catch (...)
{
Logger::Error("Unknown exception happened when callback");
Logger::Error(this->jobId, this->taskId, this->requeueCount, "Unknown exception happened when callback");
}
}
@ -87,7 +91,7 @@ void* Process::ForkThread(void* arg)
if (ret != 0)
{
p->message << "Task " << p->taskId << ": error when create task folder, ret " << ret << std::endl;
Logger::Error("Task {0}: error when create task folder, ret {1}", p->taskId, ret);
Logger::Error(p->jobId, p->taskId, p->requeueCount, "error when create task folder, ret {0}", ret);
// TODO fetch the errno.
p->SetExitCode(ret);
@ -99,14 +103,14 @@ void* Process::ForkThread(void* arg)
if (path.empty())
{
p->message << "Error when build script." << std::endl;
Logger::Error("Error when build script.");
Logger::Error(p->jobId, p->taskId, p->requeueCount, "Error when build script.");
// TODO fetch the errno.
p->SetExitCode(-1);
p->SetExitCode((int)ErrorCodes::BuildScriptError);
goto Final;
}
if (!p->ExecuteCommand("/bin/bash", "PrepareTask.sh", p->taskId, p->GetAffinity()))
if (!p->ExecuteCommand("/bin/bash", "PrepareTask.sh", p->taskExecutionId, p->GetAffinity()))
{
goto Final;
}
@ -120,7 +124,7 @@ void* Process::ForkThread(void* arg)
p->message << "Failed to fork(), pid = " << p->processId << ", errno = " << errno
<< ", msg = " << errorMessage << std::endl;
Logger::Error("Failed to fork(), pid = {0}, errno = {1}, msg = {2}", p->processId, errno, errorMessage);
Logger::Error(p->jobId, p->taskId, p->requeueCount, "Failed to fork(), pid = {0}, errno = {1}, msg = {2}", p->processId, errno, errorMessage);
p->SetExitCode(errno);
goto Final;
@ -131,14 +135,15 @@ void* Process::ForkThread(void* arg)
p->Run(path);
}
else
{
{
assert(p->processId > 0);
p->started.set(p->processId);
assert(p->processId > 0);
p->Monitor();
}
Final:
p->processId = 0;
p->ExecuteCommand("/bin/bash", "CleanupTask.sh", p->taskId, p->processId);
p->ExecuteCommand("/bin/bash", "CleanupTask.sh", p->taskExecutionId, p->processId);
p->ExecuteCommand("rm -rf", p->taskFolder);
p->OnCompleted();
@ -147,33 +152,39 @@ Final:
void Process::Monitor()
{
Logger::Debug("Monitor the forked process {0}", this->processId);
assert(this->processId > 0);
Logger::Debug(this->jobId, this->taskId, this->requeueCount, "Monitor the forked process {0}", this->processId);
int status;
rusage usage;
assert(this->processId > 0);
pid_t waitedPid = wait4(this->processId, &status, 0, &usage);
assert(this->processId > 0);
if (waitedPid == -1)
{
Logger::Error("wait4 for process {0} error {1}", this->processId, errno);
Logger::Error(this->jobId, this->taskId, this->requeueCount, "wait4 for process {0} error {1}", this->processId, errno);
this->message << "wait4 for process " << this->processId << " error " << errno << std::endl;
this->SetExitCode(errno);
return;
}
assert(this->processId > 0);
if (waitedPid != this->processId)
{
Logger::Error("Process {0}: waited {1}, errno {2}", this->processId, waitedPid, errno);
Logger::Error(this->jobId, this->taskId, this->requeueCount,
"Process {0}: waited {1}, errno {2}", this->processId, waitedPid, errno);
assert(false);
int tmp;
if (WIFEXITED(status)) Logger::Info("Process {0}: WIFEXITED", this->processId);
if ((tmp = WEXITSTATUS(status))) Logger::Info("Process {0}: WEXITSTATUS: {1}", this->processId, tmp);
if (WIFSIGNALED(status)) Logger::Info("Process {0}: WIFSIGNALED", this->processId);
if ((tmp = WTERMSIG(status))) Logger::Info("Process {0}: WTERMSIG: {1}", this->processId, tmp);
if (WCOREDUMP(status)) Logger::Info("Process {0}: Core dumped.", this->processId);
if (WIFSTOPPED(status)) Logger::Info("Process {0}: WIFSTOPPED", this->processId);
if (WSTOPSIG(status)) Logger::Info("Process {0}: WSTOPSIG", this->processId);
if (WIFCONTINUED(status)) Logger::Info("Process {0}: WIFCONTINUED", this->processId);
if (WIFEXITED(status)) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: WIFEXITED", this->processId);
if ((tmp = WEXITSTATUS(status))) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: WEXITSTATUS: {1}", this->processId, tmp);
if (WIFSIGNALED(status)) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: WIFSIGNALED", this->processId);
if ((tmp = WTERMSIG(status))) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: WTERMSIG: {1}", this->processId, tmp);
if (WCOREDUMP(status)) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: Core dumped.", this->processId);
if (WIFSTOPPED(status)) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: WIFSTOPPED", this->processId);
if (WSTOPSIG(status)) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: WSTOPSIG", this->processId);
if (WIFCONTINUED(status)) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: WIFCONTINUED", this->processId);
this->message << "Process " << this->processId << ": waited " << waitedPid << ", errno " << errno << std::endl;
this->SetExitCode(errno);
@ -183,6 +194,8 @@ void Process::Monitor()
if (WIFEXITED(status))
{
Logger::Info(this->jobId, this->taskId, this->requeueCount,
"Process {0}: exite code {1}", this->processId, WEXITSTATUS(status));
this->SetExitCode(WEXITSTATUS(status));
std::string output;
@ -200,8 +213,8 @@ void Process::Monitor()
}
else
{
Logger::Error("wait4 for process {0} status {1}", this->processId, status);
this->SetExitCode(-1);
Logger::Error(this->jobId, this->taskId, this->requeueCount, "wait4 for process {0} status {1}", this->processId, status);
this->SetExitCode(status);
this->message << "wait4 for process " << this->processId << " status " << status << std::endl;
}
@ -210,7 +223,7 @@ void Process::Monitor()
this->userTime = usage.ru_utime;
this->kernelTime = usage.ru_stime;
Logger::Debug("Process {0}: Monitor ended", this->processId);
Logger::Debug(this->jobId, this->taskId, this->requeueCount, "Process {0}: Monitor ended", this->processId);
// TODO: Number of process;
// TODO: ProcessIds
// TODO: WorkingSet
@ -221,13 +234,11 @@ void Process::Run(const std::string& path)
std::vector<char> pathBuffer(path.cbegin(), path.cend());
pathBuffer.push_back('\0');
std::string taskIdString = String::Join("", this->taskId);
char* const args[] =
{
const_cast<char* const>("/bin/bash"),
const_cast<char* const>("StartTask.sh"),
const_cast<char* const>(taskIdString.c_str()),
const_cast<char* const>(this->taskExecutionId.c_str()),
&pathBuffer[0],
nullptr
};
@ -252,7 +263,7 @@ int Process::CreateTaskFolder()
{
char folder[256];
sprintf(folder, "/tmp/nodemanager_task_%d.XXXXXX", this->taskId);
sprintf(folder, "/tmp/nodemanager_task_%d_%d.XXXXXX", this->taskId, this->requeueCount);
char* p = mkdtemp(folder);

Просмотреть файл

@ -15,6 +15,7 @@
#include "../utils/String.h"
#include "../utils/Logger.h"
#include "../utils/System.h"
#include "../common/ErrorCodes.h"
using namespace hpc::utils;
@ -27,8 +28,10 @@ namespace hpc
public:
typedef void Callback(int, std::string&&, timeval userTime, timeval kernelTime);
Process(
int taskId,
Process(
int jobId,
int taskId,
int requeueCount,
const std::string& cmdLine,
const std::string& standardOut,
const std::string& standardErr,
@ -68,8 +71,12 @@ namespace hpc
if (ret != 0)
{
std::string cmdLine = String::Join(" ", cmd, args...);
this->message << "Task " << this->taskId << ": '" << cmdLine << "' failed. exitCode " << ret << "\r\n";
Logger::Error("Task {0}: '{1}' failed. exitCode {2}, output {3}.", this->taskId, cmdLine, ret, output);
this->message
<< "Task " << this->taskId << ": '" << cmdLine
<< "' failed. exitCode " << ret << ". output "
<< output << std::endl;
Logger::Error(this->jobId, this->taskId, this->requeueCount, "'{0}' failed. exitCode {1}, output {2}.", cmdLine, ret, output);
this->SetExitCode(ret);
@ -79,7 +86,7 @@ namespace hpc
return true;
}
static void* ForkThread(void*);
static void* ForkThread(void*);
std::string GetAffinity();
void Run(const std::string& path);
@ -90,13 +97,16 @@ namespace hpc
std::ostringstream stdOut;
std::ostringstream stdErr;
std::ostringstream message;
int exitCode = -1;
int exitCode = (int)hpc::common::ErrorCodes::DefaultExitCode;
bool exitCodeSet = false;
timeval userTime = { 0, 0 };
timeval kernelTime = { 0, 0 };
std::string taskFolder;
const int taskId;
const int jobId;
const int taskId;
const int requeueCount;
const std::string taskExecutionId;
const std::string commandLine;
std::string stdOutFile;
std::string stdErrFile;

Просмотреть файл

@ -105,8 +105,10 @@ void RemoteCommunicator::HandlePost(http_request request)
{
request.extract_json().then([processor, callback = std::move(callbackUri)](pplx::task<json::value> t)
{
// todo: throw exception instead of using the return value.
return processor->second(t.get(), callback);
auto j = t.get();
Logger::Debug("Json: {0}", j.serialize());
return processor->second(j, callback);
})
.then([request, this](pplx::task<json::value> t)
{
@ -130,7 +132,6 @@ void RemoteCommunicator::HandlePost(http_request request)
json::value RemoteCommunicator::StartJobAndTask(const json::value& val, const std::string& callbackUri)
{
Logger::Info("Json: {0}", val.serialize());
auto args = StartJobAndTaskArgs::FromJson(val);
return this->executor.StartJobAndTask(std::move(args), callbackUri);
}

Просмотреть файл

@ -6,6 +6,7 @@
#include "../utils/ReaderLock.h"
#include "../utils/Logger.h"
#include "../utils/System.h"
#include "../common/ErrorCodes.h"
using namespace web::http;
using namespace web;
@ -13,6 +14,7 @@ using namespace hpc::core;
using namespace hpc::utils;
using namespace hpc::arguments;
using namespace hpc::data;
using namespace hpc::common;
RemoteExecutor::RemoteExecutor(const std::string& networkName)
: monitor(System::GetNodeName(), networkName, MetricReportInterval), lock(PTHREAD_RWLOCK_INITIALIZER)
@ -37,13 +39,13 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c
if (taskInfo->TaskRequeueCount < args.StartInfo.TaskRequeueCount)
{
Logger::Info("Task {0}: Change requeue count from {1} to {2}", args.TaskId, taskInfo->TaskRequeueCount, args.StartInfo.TaskRequeueCount);
Logger::Info(args.JobId, args.TaskId, args.StartInfo.TaskRequeueCount, "Change requeue count from {0} to {1}", taskInfo->TaskRequeueCount, args.StartInfo.TaskRequeueCount);
taskInfo->TaskRequeueCount = args.StartInfo.TaskRequeueCount;
}
if (args.StartInfo.CommandLine.empty())
{
Logger::Info("Job {0}, task {1} MPI non-master task found, skip creating the process.", args.JobId, args.TaskId);
Logger::Info(args.JobId, args.TaskId, args.StartInfo.TaskRequeueCount, "MPI non-master task found, skip creating the process.");
}
else
{
@ -51,7 +53,9 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c
isNewEntry)
{
auto process = std::shared_ptr<Process>(new Process(
args.TaskId,
taskInfo->JobId,
taskInfo->TaskId,
taskInfo->TaskRequeueCount,
std::move(args.StartInfo.CommandLine),
std::move(args.StartInfo.StdOutText),
std::move(args.StartInfo.StdErrText),
@ -67,7 +71,8 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c
{
if (taskInfo->Exited)
{
Logger::Debug("Task {0}: Ended already by EndTask.", taskInfo->TaskId);
Logger::Debug(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount,
"Ended already by EndTask.");
}
else
{
@ -77,26 +82,32 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c
taskInfo->KernelProcessorTime = kernelTime.tv_sec * 1000000 + kernelTime.tv_usec;
taskInfo->UserProcessorTime = userTime.tv_sec * 1000000 + userTime.tv_usec;
auto jsonBody = taskInfo->ToJson();
Logger::Debug("Task {0}: Callback to {1} with {2}", taskInfo->TaskId, callbackUri, jsonBody);
auto jsonBody = taskInfo->ToCompletionEventArgJson();
Logger::Debug(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount,
"Callback to {0} with {1}", callbackUri, jsonBody);
client::http_client_config config;
config.set_validate_certificates(false);
client::http_client client(callbackUri, config);
client.request(methods::POST, "", jsonBody).then([&callbackUri, this, taskInfo](http_response response)
{
Logger::Info("Task {0}: Callback to {1} response code {2}", taskInfo->TaskId, callbackUri, response.status_code());
Logger::Info(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount,
"Callback to {0} response code {1}", callbackUri, response.status_code());
}).wait();
}
// this won't remove the task entry added later as attempt id doesn't match
this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetAttemptId());
}
catch (const std::exception& ex)
{
Logger::Error("Exception when sending back task result. {0}", ex.what());
Logger::Error(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount,
"Exception when sending back task result. {0}", ex.what());
}
// this won't remove the task entry added later as attempt id doesn't match
this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetAttemptId());
Logger::Debug("Task {0}: attemptId {1}, erasing process", taskInfo->TaskId, taskInfo->GetAttemptId());
Logger::Debug(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount,
"attemptId {0}, erasing process", taskInfo->GetAttemptId());
// Process will be deleted here.
this->processes.erase(taskInfo->GetAttemptId());
@ -104,17 +115,19 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c
this->processes[taskInfo->GetAttemptId()] = process;
process->Start().then([this] (pid_t pid)
process->Start().then([this, taskInfo] (pid_t pid)
{
if (pid > 0)
{
Logger::Debug("Process started {0}", pid);
Logger::Debug(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount,
"Process started {0}", pid);
}
});
}
else
{
Logger::Warn("The task {0} has started already.", args.TaskId);
Logger::Warn(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount,
"The task has started already.");
// Found the original process.
// TODO: assert the job task table call is the same.
}
@ -127,17 +140,40 @@ json::value RemoteExecutor::EndJob(hpc::arguments::EndJobArgs&& args)
{
ReaderLock readerLock(&this->lock);
Logger::Info(args.JobId, this->UnknowId, this->UnknowId, "EndJob: starting");
auto jobInfo = this->jobTaskTable.RemoveJob(args.JobId);
json::value jsonBody;
if (jobInfo)
{
for (auto& taskPair : jobInfo->Tasks)
{
this->TerminateTask(taskPair.first);
this->TerminateTask(taskPair.first, (int)ErrorCodes::EndJobExitCode);
auto taskInfo = taskPair.second;
if (taskInfo)
{
taskInfo->Exited = true;
taskInfo->ExitCode = (int)ErrorCodes::EndJobExitCode;
Logger::Debug(args.JobId, taskPair.first, taskInfo->TaskRequeueCount, "EndJob: starting");
}
else
{
Logger::Warn(args.JobId, taskPair.first, this->UnknowId,
"EndJob: Task is already finished");
assert(false);
}
}
jsonBody = jobInfo->ToJson();
Logger::Info(args.JobId, this->UnknowId, this->UnknowId, "EndJob: ended {0}", jsonBody);
}
else
{
Logger::Warn(args.JobId, this->UnknowId, this->UnknowId, "EndJob: Job is already finished");
}
return jsonBody;
@ -146,19 +182,27 @@ json::value RemoteExecutor::EndJob(hpc::arguments::EndJobArgs&& args)
json::value RemoteExecutor::EndTask(hpc::arguments::EndTaskArgs&& args)
{
ReaderLock readerLock(&this->lock);
Logger::Info(args.JobId, args.TaskId, this->UnknowId, "EndTask: starting");
auto taskInfo = this->jobTaskTable.GetTask(args.JobId, args.TaskId);
this->TerminateTask(args.TaskId);
this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetAttemptId());
this->TerminateTask(args.TaskId, (int)ErrorCodes::EndTaskExitCode);
json::value jsonBody;
if (taskInfo)
{
this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetAttemptId());
taskInfo->Exited = true;
taskInfo->ExitCode = -1;
taskInfo->ExitCode = (int)ErrorCodes::EndTaskExitCode;
jsonBody = taskInfo->ToJson();
Logger::Info(args.JobId, args.TaskId, this->UnknowId, "EndTask: ended {0}", jsonBody);
}
else
{
Logger::Warn(args.JobId, args.TaskId, this->UnknowId, "EndTask: Task is already finished");
}
return jsonBody;
@ -180,13 +224,13 @@ json::value RemoteExecutor::Metric(const std::string& callbackUri)
return json::value();
}
bool RemoteExecutor::TerminateTask(int taskId)
bool RemoteExecutor::TerminateTask(int taskId, int exitCode)
{
auto p = this->processes.find(taskId);
if (p != this->processes.end())
{
p->second->Kill(-1);
p->second->Kill(exitCode);
return true;
}

Просмотреть файл

@ -29,8 +29,9 @@ namespace hpc
private:
std::string LoadReportUri(const std::string& fileName);
void SaveReportUri(const std::string& fileName, const std::string& uri);
bool TerminateTask(int taskId);
bool TerminateTask(int taskId, int exitCode);
const int UnknowId = 999;
const int NodeInfoReportInterval = 30;
const int MetricReportInterval = 2;
const std::string NodeInfoUriFileName = "NodeInfoReportUri";

Просмотреть файл

@ -10,13 +10,10 @@ using namespace web::http;
Reporter::Reporter(const std::string& uri, int interval, std::function<json::value()> fetcher)
: reportUri(uri), intervalSeconds(interval), valueFetcher(fetcher),
isRunning(true), inRequest(false)
isRunning(true)
{
if (!uri.empty())
{
http_client_config config;
config.set_validate_certificates(false);
this->client = std::shared_ptr<http_client>(new http_client(uri, config));
pthread_create(&this->threadId, nullptr, ReportingThread, this);
}
}
@ -28,16 +25,15 @@ Reporter::~Reporter()
this->isRunning = false;
this->cts.cancel();
if (this->threadId != 0)
{
{
while (this->inRequest) usleep(1);
pthread_cancel(this->threadId);
pthread_join(this->threadId, nullptr);
Logger::Debug("Destructed Reporter {0}", this->reportUri);
}
while (this->inRequest) usleep(1);
}
pplx::task<void> Reporter::Report()
void Reporter::Report()
{
const std::string& uri = this->reportUri;
if (!uri.empty())
@ -46,7 +42,7 @@ pplx::task<void> Reporter::Report()
if (jsonBody.is_null())
{
Logger::Info("Skipped reporting to {0} because json is null", uri);
return pplx::task_from_result();
return;
}
if (this->intervalSeconds > 10)
@ -54,25 +50,32 @@ pplx::task<void> Reporter::Report()
Logger::Info("---------> Report to {0} with {1}", uri, jsonBody);
}
http_client_config config;
config.set_validate_certificates(false);
http_client client(uri, config);
try
{
return this->client->request(methods::POST, "", jsonBody, this->cts.get_token()).then([&uri, this](http_response response)
client.request(methods::POST, "", jsonBody, this->cts.get_token()).then([&uri, this](http_response response)
{
if (this->intervalSeconds > 10)
{
Logger::Debug("---------> Reported to {0} response code {1}", uri, response.status_code());
}
});
}).wait();
}
catch (std::exception& ex)
{
Logger::Error("Reporting exception occurred {0}, {1}", ex.what(), jsonBody);
return pplx::task_from_result();
catch (const http_exception& httpEx)
{
Logger::Warn("HttpException occurred when report to {0}, ex {1}", this->reportUri, httpEx.what());
}
}
else
{
return pplx::task_from_result();
catch (const std::exception& ex)
{
Logger::Error("Exception occurred when report to {0}, ex {1}", this->reportUri, ex.what());
}
catch (...)
{
Logger::Error("Unknown error occurred when report to {0}", this->reportUri);
}
}
}
@ -84,32 +87,10 @@ void* Reporter::ReportingThread(void * arg)
Reporter* r = static_cast<Reporter*>(arg);
while (r->isRunning)
{
{
r->inRequest = true;
r->Report().then([r](auto t)
{
try
{
t.wait();
}
catch (const http_exception& httpEx)
{
Logger::Warn("HttpException occurred when report to {0}, ex {1}", r->reportUri, httpEx.what());
}
catch (const std::exception& ex)
{
Logger::Error("Exception occurred when report to {0}, ex {1}", r->reportUri, ex.what());
//exit(-1);
}
catch (...)
{
Logger::Error("Unknown error occurred when report to {0}", r->reportUri);
//exit(-1);
}
r->inRequest = false;
});
r->Report();
r->inRequest = false;
sleep(r->intervalSeconds);
}

Просмотреть файл

@ -16,7 +16,7 @@ namespace hpc
Reporter(const std::string& uri, int interval, std::function<json::value()> fetcher);
~Reporter();
pplx::task<void> Report();
void Report();
protected:
private:
static void* ReportingThread(void* arg);
@ -28,8 +28,7 @@ namespace hpc
pthread_t threadId;
pplx::cancellation_token_source cts;
bool isRunning;
bool inRequest;
std::shared_ptr<http::client::http_client> client;
bool inRequest = false;
};
}
}

Просмотреть файл

@ -20,7 +20,14 @@ json::value TaskInfo::ToJson() const
j["NumberOfProcesses"] = this->NumberOfProcesses;
j["PrimaryTask"] = this->IsPrimaryTask;
j["Message"] = JsonHelper<std::string>::ToJson(this->Message);
j["ProcessIds"] = JsonHelper<std::string>::ToJson(String::Join<','>(this->ProcessIds));
j["ProcessIds"] = JsonHelper<std::string>::ToJson(String::Join<','>(this->ProcessIds));
return j;
}
json::value TaskInfo::ToCompletionEventArgJson() const
{
json::value j = this->ToJson();
json::value jobIdArg;
jobIdArg["JobId"] = this->JobId;

Просмотреть файл

@ -23,6 +23,7 @@ namespace hpc
TaskInfo(TaskInfo&& t) = default;
web::json::value ToJson() const;
web::json::value ToCompletionEventArgJson() const;
const std::string& NodeName;

Просмотреть файл

@ -1,19 +1,30 @@
#include <iostream>
#include <iostream>
#include <string>
#include <cpprest/http_listener.h>
#include <cpprest/json.h>
#include "utils/Logger.h"
#include "core/RemoteCommunicator.h"
#include "core/RemoteExecutor.h"
#include "core/RemoteExecutor.h"
#include "Version.h"
using namespace std;
using namespace hpc::core;
using namespace hpc::utils;
using namespace hpc::utils;
using namespace hpc;
int main(int argc, char* argv[])
{
if (argc > 1)
{
if (string("-v") == argv[1])
Version::PrintVersionHistory();
return 0;
}
int main()
{
std::cout << "Node manager started." << std::endl;
Logger::Info("Log system works.");
Logger::Info("Log system works.");
Logger::Info("Version: {0}", Version::GetVersion());
const std::string networkName = "eth0";
RemoteExecutor executor(networkName);

Просмотреть файл

@ -15,29 +15,36 @@ if $CGInstalled; then
tasks=$CGroupRoot/cpuset/$groupName/tasks
freezerState=$CGroupRoot/freezer/$groupName/freezer.state
[ ! -f $tasks ] && echo "task is ended" && exit 1
[ ! -f $freezerState ] && echo "task is ended" && exit 1
[ ! -f $tasks ] && echo "$tasks doesn't exist" && exit -200
[ ! -f $freezerState ] && echo "$freezerState doesn't exist" && exit -201
# freeze the task
echo FROZEN > $freezerState
while ! grep -Fxq FROZEN $freezerState; do
maxLoop=20
while [ -f $freezerState ] && ! grep -Fxq FROZEN $freezerState && [ $maxLoop -gt 0 ]
do
sleep .1
((maxLoop--))
done
# kill all tasks
for pid in $(cat $tasks); do
for pid in $(cat $tasks);
do
[ -d /proc/$pid ] && kill -TERM $pid
done
# resume tasks
echo THAWED > $freezerState
while ! grep -Fxq THAWED $freezerState; do
maxLoop=20
while [ -f $freezerState ] && ! grep -Fxq THAWED $freezerState && [ $maxLoop -gt 0 ]
do
sleep .1
((maxLoop--))
done
else
kill -s TERM $(pstree -l -p $taskId | grep "([[:digit:]]*)" -o | tr -d '()')
kill -s TERM $(pstree -l -p $processId | grep "([[:digit:]]*)" -o | tr -d '()')
fi
exit 0

Просмотреть файл

@ -8,10 +8,67 @@
taskId=$1
if $CGInstalled; then
groupName=$(GetCGroupName $taskId)
group=$CGroupSubSys:$groupName
cgcreate -g $group
echo "$2" > $CGroupRoot/cpuset/$groupName/cpuset.cpus
echo 0 > $CGroupRoot/cpuset/$groupName/cpuset.mems
groupName=$(GetCGroupName $taskId)
group=$CGroupSubSys:$groupName
maxLoop=3
while [ $maxLoop -gt 0 ]
do
cgcreate -g $group
ec=$?
if [ $ec -eq 0 ]
then
break
fi
echo "Failed to create cgroup $group, error code $ec, retry after .5 seconds"
((maxLoop--))
sleep .5
done
if [ $ec -ne 0 ]
then
exit $ec
fi
maxLoop=3
while [ $maxLoop -gt 0 ]
do
echo "$2" > $CGroupRoot/cpuset/$groupName/cpuset.cpus
ec=$?
if [ $ec -eq 0 ]
then
break
fi
echo "Failed to set cpus for $group, error code $ec, retry after .5 seconds"
((maxLoop--))
sleep .5
done
if [ $ec -ne 0 ]
then
exit $ec
fi
maxLoop=3
while [ $maxLoop -gt 0 ]
do
echo 0 > $CGroupRoot/cpuset/$groupName/cpuset.mems
ec=$?
if [ $ec -eq 0 ]
then
break
fi
echo "Failed to set mems for $group, error code $ec, retry after .5 seconds"
((maxLoop--))
sleep .5
done
if [ $ec -ne 0 ]
then
exit $ec
fi
fi

Просмотреть файл

@ -5,7 +5,9 @@
#include <stdio.h>
#include <stdarg.h>
#include <iostream>
#include <spdlog/spdlog.h>
#include <spdlog/spdlog.h>
#include "String.h"
namespace hpc
{
@ -50,6 +52,34 @@ namespace hpc
Log(LogLevel::Debug, fmt, args...);
}
template <typename ...Args>
static void Info(int jobId, int taskId, int requeue, const char* fmt, Args ...args)
{
auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt);
Log(LogLevel::Info, f.c_str(), args...);
}
template <typename ...Args>
static void Error(int jobId, int taskId, int requeue, const char* fmt, Args ...args)
{
auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt);
Log(LogLevel::Error, f.c_str(), args...);
}
template <typename ...Args>
static void Warn(int jobId, int taskId, int requeue, const char* fmt, Args ...args)
{
auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt);
Log(LogLevel::Warning, f.c_str(), args...);
}
template <typename ...Args>
static void Debug(int jobId, int taskId, int requeue, const char* fmt, Args ...args)
{
auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt);
Log(LogLevel::Debug, f.c_str(), args...);
}
template <typename ...Args>
static void Log(LogLevel level, const char* fmt, Args ...args)
{

Просмотреть файл

@ -12,8 +12,10 @@
#include "System.h"
#include "String.h"
#include "Logger.h"
#include "../common/ErrorCodes.h"
using namespace hpc::utils;
using namespace hpc::common;
std::vector<System::NetInfo> System::GetNetworkInfo()
{
@ -183,8 +185,9 @@ void System::CPU(int &cores, int &sockets)
fs.close();
}
void System::NetworkUsage(long int &network, const std::string& netName)
int System::NetworkUsage(long int &network, const std::string& netName)
{
int ret = 1;
std::ifstream fs("/proc/net/dev", std::ios::in);
std::string name;
int receive, send;
@ -193,14 +196,21 @@ void System::NetworkUsage(long int &network, const std::string& netName)
fs.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
std::string tmp = netName + ":";
while (name != tmp)
while (name != tmp && fs.good())
{
fs >> name >> receive >> send;
fs.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
}
network = receive + send;
if (name == tmp)
{
network = receive + send;
ret = 0;
}
fs.close();
return ret;
}
const std::string& System::GetNodeName()
@ -212,7 +222,7 @@ const std::string& System::GetNodeName()
if (-1 == gethostname(buffer, 255))
{
Logger::Error("gethostname failed with errno {0}", errno);
exit(-1);
exit((int)ErrorCodes::GetHostNameError);
}
nodeName = buffer;

Просмотреть файл

@ -4,7 +4,8 @@
#include <string>
#include "String.h"
#include "Logger.h"
#include "Logger.h"
#include "../common/ErrorCodes.h"
namespace hpc
{
@ -26,7 +27,7 @@ namespace hpc
static void CPUUsage(long int &total, long int &idle);
static void Memory(unsigned long &available, unsigned long &total);
static void CPU(int &cores, int &sockets);
static void NetworkUsage(long int &network, const std::string& netName);
static int NetworkUsage(long int &network, const std::string& netName);
static const std::string& GetNodeName();
static bool IsCGroupInstalled();
@ -41,7 +42,7 @@ namespace hpc
FILE* stream = popen(command.c_str(), "r");
std::ostringstream result;
int exitCode = -1;
int exitCode = (int)hpc::common::ErrorCodes::PopenError;
if (stream)
{