From 6f53f808180ccf06de7e89b9e215ca1ba9f43240 Mon Sep 17 00:00:00 2001 From: evanc Date: Sat, 4 Apr 2015 09:48:49 -0700 Subject: [PATCH 01/13] Fix a crash when end task --- nodemanager/core/RemoteExecutor.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nodemanager/core/RemoteExecutor.cpp b/nodemanager/core/RemoteExecutor.cpp index eb52b0e..1e66667 100644 --- a/nodemanager/core/RemoteExecutor.cpp +++ b/nodemanager/core/RemoteExecutor.cpp @@ -150,11 +150,13 @@ json::value RemoteExecutor::EndTask(hpc::arguments::EndTaskArgs&& args) auto taskInfo = this->jobTaskTable.GetTask(args.JobId, args.TaskId); this->TerminateTask(args.TaskId); - this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetAttemptId()); json::value jsonBody; + if (taskInfo) { + this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetAttemptId()); + taskInfo->Exited = true; taskInfo->ExitCode = -1; From e55e7ac533542b8f94958d5504da2194e419c185 Mon Sep 17 00:00:00 2001 From: evanc Date: Tue, 7 Apr 2015 21:03:29 -0700 Subject: [PATCH 02/13] Fix the infinite loop --- nodemanager/scripts/EndTask.sh | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/nodemanager/scripts/EndTask.sh b/nodemanager/scripts/EndTask.sh index 0356c2d..e3c4f85 100755 --- a/nodemanager/scripts/EndTask.sh +++ b/nodemanager/scripts/EndTask.sh @@ -21,7 +21,8 @@ if $CGInstalled; then # freeze the task echo FROZEN > $freezerState - while ! grep -Fxq FROZEN $freezerState; do + while [ -f $freezerState ] && ! grep -Fxq FROZEN $freezerState; do + echo FROZEN > $freezerState sleep .1 done @@ -33,7 +34,8 @@ if $CGInstalled; then # resume tasks echo THAWED > $freezerState - while ! grep -Fxq THAWED $freezerState; do + while [ -f $freezerState ] && ! grep -Fxq THAWED $freezerState; do + echo THAWED > $freezerState sleep .1 done else From f3e275ca64da9eb81178a08a89293693c73a3566 Mon Sep 17 00:00:00 2001 From: evanc Date: Tue, 7 Apr 2015 22:10:31 -0700 Subject: [PATCH 03/13] let process have unique id --- nodemanager/core/Process.cpp | 4 ++-- nodemanager/core/Process.h | 4 ++-- nodemanager/core/RemoteExecutor.cpp | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/nodemanager/core/Process.cpp b/nodemanager/core/Process.cpp index 77f7012..bae9b60 100644 --- a/nodemanager/core/Process.cpp +++ b/nodemanager/core/Process.cpp @@ -11,7 +11,7 @@ using namespace hpc::core; using namespace hpc::utils; Process::Process( - int taskId, + long long taskId, const std::string& cmdLine, const std::string& standardOut, const std::string& standardErr, @@ -252,7 +252,7 @@ int Process::CreateTaskFolder() { char folder[256]; - sprintf(folder, "/tmp/nodemanager_task_%d.XXXXXX", this->taskId); + sprintf(folder, "/tmp/nodemanager_task_%lld.XXXXXX", this->taskId); char* p = mkdtemp(folder); diff --git a/nodemanager/core/Process.h b/nodemanager/core/Process.h index a7e0a80..3e17fcf 100644 --- a/nodemanager/core/Process.h +++ b/nodemanager/core/Process.h @@ -28,7 +28,7 @@ namespace hpc typedef void Callback(int, std::string&&, timeval userTime, timeval kernelTime); Process( - int taskId, + long long taskId, const std::string& cmdLine, const std::string& standardOut, const std::string& standardErr, @@ -96,7 +96,7 @@ namespace hpc timeval kernelTime = { 0, 0 }; std::string taskFolder; - const int taskId; + const long long taskId; const std::string commandLine; std::string stdOutFile; std::string stdErrFile; diff --git a/nodemanager/core/RemoteExecutor.cpp b/nodemanager/core/RemoteExecutor.cpp index 1e66667..cb2fc2d 100644 --- a/nodemanager/core/RemoteExecutor.cpp +++ b/nodemanager/core/RemoteExecutor.cpp @@ -51,7 +51,7 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c isNewEntry) { auto process = std::shared_ptr(new Process( - args.TaskId, + taskInfo->GetAttemptId(), std::move(args.StartInfo.CommandLine), std::move(args.StartInfo.StdOutText), std::move(args.StartInfo.StdErrText), From f894bea83cf96fc4037827ecca841f6c280cc2b9 Mon Sep 17 00:00:00 2001 From: evanc Date: Wed, 8 Apr 2015 06:39:42 -0700 Subject: [PATCH 04/13] Let process run at unique ID --- nodemanager/NodeManager.depend | 5 +- nodemanager/core/Process.cpp | 72 +++++++++++++------------ nodemanager/core/Process.h | 15 ++++-- nodemanager/core/RemoteCommunicator.cpp | 7 +-- nodemanager/core/RemoteExecutor.cpp | 31 +++++++---- nodemanager/scripts/EndTask.sh | 2 - nodemanager/utils/Logger.h | 32 ++++++++++- 7 files changed, 107 insertions(+), 57 deletions(-) diff --git a/nodemanager/NodeManager.depend b/nodemanager/NodeManager.depend index 5b6ec8d..1580dd4 100755 --- a/nodemanager/NodeManager.depend +++ b/nodemanager/NodeManager.depend @@ -424,12 +424,13 @@ "core/RemoteCommunicator.h" "core/RemoteExecutor.h" -1427353166 /home/evanc/whpc-linux-communicator/nodemanager/utils/Logger.h +1428488455 /home/evanc/whpc-linux-communicator/nodemanager/utils/Logger.h + "String.h" 1426473053 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteCommunicator.h @@ -549,7 +550,7 @@ 1426473053 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/JsonHelper.cpp "JsonHelper.h" -1426595924 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/Logger.cpp +1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/Logger.cpp "Logger.h" 1426473053 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/String.cpp diff --git a/nodemanager/core/Process.cpp b/nodemanager/core/Process.cpp index bae9b60..07119fd 100644 --- a/nodemanager/core/Process.cpp +++ b/nodemanager/core/Process.cpp @@ -10,8 +10,10 @@ using namespace hpc::core; using namespace hpc::utils; -Process::Process( - long long taskId, +Process::Process( + int jobId, + int taskId, + int requeueCount, const std::string& cmdLine, const std::string& standardOut, const std::string& standardErr, @@ -19,8 +21,8 @@ Process::Process( const std::string& workDir, std::vector&& cpuAffinity, std::map&& envi, - const std::function completed) : - taskId(taskId), + const std::function completed) : + jobId(jobId), taskId(taskId), requeueCount(requeueCount), taskExecutionId(String::Join("_", taskId, requeueCount)), commandLine(cmdLine), stdOutFile(standardOut), stdErrFile(standardErr), stdInFile(standardIn), workDirectory(workDir), affinity(cpuAffinity), environments(envi), callback(completed), processId(0) { @@ -36,7 +38,7 @@ pplx::task Process::Start() { pthread_create(&this->threadId, nullptr, ForkThread, this); - Logger::Debug("Created thread {0}", this->threadId); + Logger::Debug(this->jobId, this->taskId, this->requeueCount, "Created thread {0}", this->threadId); return pplx::task(this->started); } @@ -50,8 +52,7 @@ void Process::Kill(int forcedExitCode) if (this->processId != 0) { - this->ExecuteCommand("/bin/bash", "EndTask.sh", this->taskId, this->processId); - this->processId = 0; + this->ExecuteCommand("/bin/bash", "EndTask.sh", this->taskExecutionId, this->processId); } this->GetStatisticsFromCGroup(); @@ -70,11 +71,11 @@ void Process::OnCompleted() } catch (const std::exception& ex) { - Logger::Error("Exception happened when callback, ex = {0}", ex.what()); + Logger::Error(this->jobId, this->taskId, this->requeueCount, "Exception happened when callback, ex = {0}", ex.what()); } catch (...) { - Logger::Error("Unknown exception happened when callback"); + Logger::Error(this->jobId, this->taskId, this->requeueCount, "Unknown exception happened when callback"); } } @@ -87,7 +88,7 @@ void* Process::ForkThread(void* arg) if (ret != 0) { p->message << "Task " << p->taskId << ": error when create task folder, ret " << ret << std::endl; - Logger::Error("Task {0}: error when create task folder, ret {1}", p->taskId, ret); + Logger::Error(p->jobId, p->taskId, p->requeueCount, "error when create task folder, ret {0}", ret); // TODO fetch the errno. p->SetExitCode(ret); @@ -99,14 +100,14 @@ void* Process::ForkThread(void* arg) if (path.empty()) { p->message << "Error when build script." << std::endl; - Logger::Error("Error when build script."); + Logger::Error(p->jobId, p->taskId, p->requeueCount, "Error when build script."); // TODO fetch the errno. p->SetExitCode(-1); goto Final; } - if (!p->ExecuteCommand("/bin/bash", "PrepareTask.sh", p->taskId, p->GetAffinity())) + if (!p->ExecuteCommand("/bin/bash", "PrepareTask.sh", p->taskExecutionId, p->GetAffinity())) { goto Final; } @@ -120,7 +121,7 @@ void* Process::ForkThread(void* arg) p->message << "Failed to fork(), pid = " << p->processId << ", errno = " << errno << ", msg = " << errorMessage << std::endl; - Logger::Error("Failed to fork(), pid = {0}, errno = {1}, msg = {2}", p->processId, errno, errorMessage); + Logger::Error(p->jobId, p->taskId, p->requeueCount, "Failed to fork(), pid = {0}, errno = {1}, msg = {2}", p->processId, errno, errorMessage); p->SetExitCode(errno); goto Final; @@ -131,14 +132,15 @@ void* Process::ForkThread(void* arg) p->Run(path); } else - { + { + assert(p->processId > 0); p->started.set(p->processId); + assert(p->processId > 0); p->Monitor(); } Final: - p->processId = 0; - p->ExecuteCommand("/bin/bash", "CleanupTask.sh", p->taskId, p->processId); + p->ExecuteCommand("/bin/bash", "CleanupTask.sh", p->taskExecutionId, p->processId); p->ExecuteCommand("rm -rf", p->taskFolder); p->OnCompleted(); @@ -147,33 +149,39 @@ Final: void Process::Monitor() { - Logger::Debug("Monitor the forked process {0}", this->processId); + assert(this->processId > 0); + Logger::Debug(this->jobId, this->taskId, this->requeueCount, "Monitor the forked process {0}", this->processId); int status; rusage usage; + assert(this->processId > 0); pid_t waitedPid = wait4(this->processId, &status, 0, &usage); + assert(this->processId > 0); if (waitedPid == -1) { - Logger::Error("wait4 for process {0} error {1}", this->processId, errno); + Logger::Error(this->jobId, this->taskId, this->requeueCount, "wait4 for process {0} error {1}", this->processId, errno); this->message << "wait4 for process " << this->processId << " error " << errno << std::endl; this->SetExitCode(errno); return; } + assert(this->processId > 0); if (waitedPid != this->processId) { - Logger::Error("Process {0}: waited {1}, errno {2}", this->processId, waitedPid, errno); + Logger::Error(this->jobId, this->taskId, this->requeueCount, + "Process {0}: waited {1}, errno {2}", this->processId, waitedPid, errno); + assert(false); int tmp; - if (WIFEXITED(status)) Logger::Info("Process {0}: WIFEXITED", this->processId); - if ((tmp = WEXITSTATUS(status))) Logger::Info("Process {0}: WEXITSTATUS: {1}", this->processId, tmp); - if (WIFSIGNALED(status)) Logger::Info("Process {0}: WIFSIGNALED", this->processId); - if ((tmp = WTERMSIG(status))) Logger::Info("Process {0}: WTERMSIG: {1}", this->processId, tmp); - if (WCOREDUMP(status)) Logger::Info("Process {0}: Core dumped.", this->processId); - if (WIFSTOPPED(status)) Logger::Info("Process {0}: WIFSTOPPED", this->processId); - if (WSTOPSIG(status)) Logger::Info("Process {0}: WSTOPSIG", this->processId); - if (WIFCONTINUED(status)) Logger::Info("Process {0}: WIFCONTINUED", this->processId); + if (WIFEXITED(status)) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: WIFEXITED", this->processId); + if ((tmp = WEXITSTATUS(status))) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: WEXITSTATUS: {1}", this->processId, tmp); + if (WIFSIGNALED(status)) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: WIFSIGNALED", this->processId); + if ((tmp = WTERMSIG(status))) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: WTERMSIG: {1}", this->processId, tmp); + if (WCOREDUMP(status)) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: Core dumped.", this->processId); + if (WIFSTOPPED(status)) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: WIFSTOPPED", this->processId); + if (WSTOPSIG(status)) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: WSTOPSIG", this->processId); + if (WIFCONTINUED(status)) Logger::Info(this->jobId, this->taskId, this->requeueCount, "Process {0}: WIFCONTINUED", this->processId); this->message << "Process " << this->processId << ": waited " << waitedPid << ", errno " << errno << std::endl; this->SetExitCode(errno); @@ -200,7 +208,7 @@ void Process::Monitor() } else { - Logger::Error("wait4 for process {0} status {1}", this->processId, status); + Logger::Error(this->jobId, this->taskId, this->requeueCount, "wait4 for process {0} status {1}", this->processId, status); this->SetExitCode(-1); this->message << "wait4 for process " << this->processId << " status " << status << std::endl; @@ -210,7 +218,7 @@ void Process::Monitor() this->userTime = usage.ru_utime; this->kernelTime = usage.ru_stime; - Logger::Debug("Process {0}: Monitor ended", this->processId); + Logger::Debug(this->jobId, this->taskId, this->requeueCount, "Process {0}: Monitor ended", this->processId); // TODO: Number of process; // TODO: ProcessIds // TODO: WorkingSet @@ -221,13 +229,11 @@ void Process::Run(const std::string& path) std::vector pathBuffer(path.cbegin(), path.cend()); pathBuffer.push_back('\0'); - std::string taskIdString = String::Join("", this->taskId); - char* const args[] = { const_cast("/bin/bash"), const_cast("StartTask.sh"), - const_cast(taskIdString.c_str()), + const_cast(this->taskExecutionId.c_str()), &pathBuffer[0], nullptr }; @@ -252,7 +258,7 @@ int Process::CreateTaskFolder() { char folder[256]; - sprintf(folder, "/tmp/nodemanager_task_%lld.XXXXXX", this->taskId); + sprintf(folder, "/tmp/nodemanager_task_%d_%d.XXXXXX", this->taskId, this->requeueCount); char* p = mkdtemp(folder); diff --git a/nodemanager/core/Process.h b/nodemanager/core/Process.h index 3e17fcf..d59d2b7 100644 --- a/nodemanager/core/Process.h +++ b/nodemanager/core/Process.h @@ -27,8 +27,10 @@ namespace hpc public: typedef void Callback(int, std::string&&, timeval userTime, timeval kernelTime); - Process( - long long taskId, + Process( + int jobId, + int taskId, + int requeueCount, const std::string& cmdLine, const std::string& standardOut, const std::string& standardErr, @@ -69,7 +71,7 @@ namespace hpc { std::string cmdLine = String::Join(" ", cmd, args...); this->message << "Task " << this->taskId << ": '" << cmdLine << "' failed. exitCode " << ret << "\r\n"; - Logger::Error("Task {0}: '{1}' failed. exitCode {2}, output {3}.", this->taskId, cmdLine, ret, output); + Logger::Error(this->jobId, this->taskId, this->requeueCount, "'{0}' failed. exitCode {1}, output {2}.", cmdLine, ret, output); this->SetExitCode(ret); @@ -79,7 +81,7 @@ namespace hpc return true; } - static void* ForkThread(void*); + static void* ForkThread(void*); std::string GetAffinity(); void Run(const std::string& path); @@ -96,7 +98,10 @@ namespace hpc timeval kernelTime = { 0, 0 }; std::string taskFolder; - const long long taskId; + const int jobId; + const int taskId; + const int requeueCount; + const std::string taskExecutionId; const std::string commandLine; std::string stdOutFile; std::string stdErrFile; diff --git a/nodemanager/core/RemoteCommunicator.cpp b/nodemanager/core/RemoteCommunicator.cpp index b363fca..884854c 100644 --- a/nodemanager/core/RemoteCommunicator.cpp +++ b/nodemanager/core/RemoteCommunicator.cpp @@ -105,8 +105,10 @@ void RemoteCommunicator::HandlePost(http_request request) { request.extract_json().then([processor, callback = std::move(callbackUri)](pplx::task t) { - // todo: throw exception instead of using the return value. - return processor->second(t.get(), callback); + auto j = t.get(); + Logger::Debug("Json: {0}", j.serialize()); + + return processor->second(j, callback); }) .then([request, this](pplx::task t) { @@ -130,7 +132,6 @@ void RemoteCommunicator::HandlePost(http_request request) json::value RemoteCommunicator::StartJobAndTask(const json::value& val, const std::string& callbackUri) { - Logger::Info("Json: {0}", val.serialize()); auto args = StartJobAndTaskArgs::FromJson(val); return this->executor.StartJobAndTask(std::move(args), callbackUri); } diff --git a/nodemanager/core/RemoteExecutor.cpp b/nodemanager/core/RemoteExecutor.cpp index cb2fc2d..14847ff 100644 --- a/nodemanager/core/RemoteExecutor.cpp +++ b/nodemanager/core/RemoteExecutor.cpp @@ -37,13 +37,13 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c if (taskInfo->TaskRequeueCount < args.StartInfo.TaskRequeueCount) { - Logger::Info("Task {0}: Change requeue count from {1} to {2}", args.TaskId, taskInfo->TaskRequeueCount, args.StartInfo.TaskRequeueCount); + Logger::Info(args.JobId, args.TaskId, args.StartInfo.TaskRequeueCount, "Change requeue count from {0} to {1}", taskInfo->TaskRequeueCount, args.StartInfo.TaskRequeueCount); taskInfo->TaskRequeueCount = args.StartInfo.TaskRequeueCount; } if (args.StartInfo.CommandLine.empty()) { - Logger::Info("Job {0}, task {1} MPI non-master task found, skip creating the process.", args.JobId, args.TaskId); + Logger::Info(args.JobId, args.TaskId, args.StartInfo.TaskRequeueCount, "MPI non-master task found, skip creating the process."); } else { @@ -51,7 +51,9 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c isNewEntry) { auto process = std::shared_ptr(new Process( - taskInfo->GetAttemptId(), + taskInfo->JobId, + taskInfo->TaskId, + taskInfo->TaskRequeueCount, std::move(args.StartInfo.CommandLine), std::move(args.StartInfo.StdOutText), std::move(args.StartInfo.StdErrText), @@ -67,7 +69,8 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c { if (taskInfo->Exited) { - Logger::Debug("Task {0}: Ended already by EndTask.", taskInfo->TaskId); + Logger::Debug(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount, + "Ended already by EndTask."); } else { @@ -78,25 +81,29 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c taskInfo->UserProcessorTime = userTime.tv_sec * 1000000 + userTime.tv_usec; auto jsonBody = taskInfo->ToJson(); - Logger::Debug("Task {0}: Callback to {1} with {2}", taskInfo->TaskId, callbackUri, jsonBody); + Logger::Debug(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount, + "Callback to {0} with {1}", callbackUri, jsonBody); client::http_client_config config; config.set_validate_certificates(false); client::http_client client(callbackUri, config); client.request(methods::POST, "", jsonBody).then([&callbackUri, this, taskInfo](http_response response) { - Logger::Info("Task {0}: Callback to {1} response code {2}", taskInfo->TaskId, callbackUri, response.status_code()); + Logger::Info(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount, + "Callback to {0} response code {1}", callbackUri, response.status_code()); }).wait(); } } catch (const std::exception& ex) { - Logger::Error("Exception when sending back task result. {0}", ex.what()); + Logger::Error(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount, + "Exception when sending back task result. {0}", ex.what()); } // this won't remove the task entry added later as attempt id doesn't match this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetAttemptId()); - Logger::Debug("Task {0}: attemptId {1}, erasing process", taskInfo->TaskId, taskInfo->GetAttemptId()); + Logger::Debug(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount, + "attemptId {0}, erasing process", taskInfo->GetAttemptId()); // Process will be deleted here. this->processes.erase(taskInfo->GetAttemptId()); @@ -104,17 +111,19 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c this->processes[taskInfo->GetAttemptId()] = process; - process->Start().then([this] (pid_t pid) + process->Start().then([this, taskInfo] (pid_t pid) { if (pid > 0) { - Logger::Debug("Process started {0}", pid); + Logger::Debug(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount, + "Process started {0}", pid); } }); } else { - Logger::Warn("The task {0} has started already.", args.TaskId); + Logger::Warn(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount, + "The task has started already."); // Found the original process. // TODO: assert the job task table call is the same. } diff --git a/nodemanager/scripts/EndTask.sh b/nodemanager/scripts/EndTask.sh index e3c4f85..ddca8fd 100755 --- a/nodemanager/scripts/EndTask.sh +++ b/nodemanager/scripts/EndTask.sh @@ -22,7 +22,6 @@ if $CGInstalled; then echo FROZEN > $freezerState while [ -f $freezerState ] && ! grep -Fxq FROZEN $freezerState; do - echo FROZEN > $freezerState sleep .1 done @@ -35,7 +34,6 @@ if $CGInstalled; then echo THAWED > $freezerState while [ -f $freezerState ] && ! grep -Fxq THAWED $freezerState; do - echo THAWED > $freezerState sleep .1 done else diff --git a/nodemanager/utils/Logger.h b/nodemanager/utils/Logger.h index 8b4bff0..5edd678 100644 --- a/nodemanager/utils/Logger.h +++ b/nodemanager/utils/Logger.h @@ -5,7 +5,9 @@ #include #include #include -#include +#include + +#include "String.h" namespace hpc { @@ -50,6 +52,34 @@ namespace hpc Log(LogLevel::Debug, fmt, args...); } + template + static void Info(int jobId, int taskId, int requeue, const char* fmt, Args ...args) + { + auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt); + Log(LogLevel::Info, f.c_str(), args...); + } + + template + static void Error(int jobId, int taskId, int requeue, const char* fmt, Args ...args) + { + auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt); + Log(LogLevel::Error, f.c_str(), args...); + } + + template + static void Warn(int jobId, int taskId, int requeue, const char* fmt, Args ...args) + { + auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt); + Log(LogLevel::Warning, f.c_str(), args...); + } + + template + static void Debug(int jobId, int taskId, int requeue, const char* fmt, Args ...args) + { + auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt); + Log(LogLevel::Debug, f.c_str(), args...); + } + template static void Log(LogLevel level, const char* fmt, Args ...args) { From 670ef14f952e68e69a3e9178183afe31b48d006d Mon Sep 17 00:00:00 2001 From: evanc Date: Wed, 8 Apr 2015 09:30:12 -0700 Subject: [PATCH 05/13] prevent task result being sent back --- nodemanager/core/RemoteExecutor.cpp | 24 ++++++++++++++++++++++++ nodemanager/core/RemoteExecutor.h | 3 ++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/nodemanager/core/RemoteExecutor.cpp b/nodemanager/core/RemoteExecutor.cpp index 14847ff..0e713b8 100644 --- a/nodemanager/core/RemoteExecutor.cpp +++ b/nodemanager/core/RemoteExecutor.cpp @@ -139,15 +139,35 @@ json::value RemoteExecutor::EndJob(hpc::arguments::EndJobArgs&& args) auto jobInfo = this->jobTaskTable.RemoveJob(args.JobId); json::value jsonBody; + if (jobInfo) { for (auto& taskPair : jobInfo->Tasks) { this->TerminateTask(taskPair.first); + + auto taskInfo = taskPair.second; + + if (taskInfo) + { + taskInfo->Exited = true; + taskInfo->ExitCode = -1; + } + else + { + Logger::Warn(args.JobId, taskPair.first, this->UnknowId, + "EndJob: Task is already finished"); + + assert(false); + } } jsonBody = jobInfo->ToJson(); } + else + { + Logger::Warn(args.JobId, this->UnknowId, this->UnknowId, "EndJob: Job is already finished"); + } return jsonBody; } @@ -171,6 +191,10 @@ json::value RemoteExecutor::EndTask(hpc::arguments::EndTaskArgs&& args) jsonBody = taskInfo->ToJson(); } + else + { + Logger::Warn(args.JobId, args.TaskId, this->UnknowId, "EndTask: Task is already finished"); + } return jsonBody; } diff --git a/nodemanager/core/RemoteExecutor.h b/nodemanager/core/RemoteExecutor.h index e625be4..994e7ff 100644 --- a/nodemanager/core/RemoteExecutor.h +++ b/nodemanager/core/RemoteExecutor.h @@ -30,7 +30,8 @@ namespace hpc std::string LoadReportUri(const std::string& fileName); void SaveReportUri(const std::string& fileName, const std::string& uri); bool TerminateTask(int taskId); - + + const int UnknowId = 999; const int NodeInfoReportInterval = 30; const int MetricReportInterval = 2; const std::string NodeInfoUriFileName = "NodeInfoReportUri"; From b892e1c836d029d9d7b83322f95bd3a9b9b2db5c Mon Sep 17 00:00:00 2001 From: evanc Date: Thu, 9 Apr 2015 03:26:04 -0700 Subject: [PATCH 06/13] Add logs, fix the network usage detection error --- nodemanager/NodeManager.cbp | 1 + nodemanager/NodeManager.depend | 120 +++++++++++++++------------- nodemanager/common/ErrorCodes.h | 20 +++++ nodemanager/core/Monitor.cpp | 18 +++-- nodemanager/core/Process.cpp | 13 ++- nodemanager/core/Process.h | 3 +- nodemanager/core/RemoteExecutor.cpp | 19 +++-- nodemanager/core/RemoteExecutor.h | 2 +- nodemanager/core/Reporter.cpp | 2 - nodemanager/utils/System.cpp | 18 ++++- nodemanager/utils/System.h | 7 +- 11 files changed, 141 insertions(+), 82 deletions(-) create mode 100644 nodemanager/common/ErrorCodes.h diff --git a/nodemanager/NodeManager.cbp b/nodemanager/NodeManager.cbp index 21f61bd..9899143 100644 --- a/nodemanager/NodeManager.cbp +++ b/nodemanager/NodeManager.cbp @@ -58,6 +58,7 @@ + diff --git a/nodemanager/NodeManager.depend b/nodemanager/NodeManager.depend index 1580dd4..e9f69d8 100755 --- a/nodemanager/NodeManager.depend +++ b/nodemanager/NodeManager.depend @@ -24,7 +24,7 @@ -1422965830 /home/evanc/casablanca/Release/include/cpprest/json.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/json.h @@ -34,7 +34,7 @@ "cpprest/details/basic_types.h" "cpprest/asyncrt_utils.h" -1422965830 /home/evanc/casablanca/Release/include/cpprest/details/basic_types.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/details/basic_types.h @@ -53,13 +53,15 @@ 1427351029 /home/evanc/casablanca/Release/include/cpprest/details/nosal.h -1422965830 /home/evanc/casablanca/Release/include/cpprest/details/SafeInt3.hpp - +1427351029 /home/evanc/casablanca/Release/include/cpprest/details/SafeInt3.hpp + + + -1422965830 /home/evanc/casablanca/Release/include/cpprest/asyncrt_utils.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/asyncrt_utils.h @@ -155,12 +157,12 @@ 1423402903 /home/evanc/whpc-linux-communicator/NodeManager/Test.h -1422965830 /home/evanc/casablanca/Release/include/cpprest/http_listener.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/http_listener.h "cpprest/http_msg.h" -1422965830 /home/evanc/casablanca/Release/include/cpprest/http_msg.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/http_msg.h @@ -178,11 +180,11 @@ "cpprest/details/http_constants.dat" "cpprest/details/http_constants.dat" -1422965830 /home/evanc/casablanca/Release/include/cpprest/uri.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/uri.h "cpprest/base_uri.h" "cpprest/uri_builder.h" -1422965830 /home/evanc/casablanca/Release/include/cpprest/base_uri.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/base_uri.h @@ -191,13 +193,13 @@ "cpprest/asyncrt_utils.h" "cpprest/details/basic_types.h" -1422965830 /home/evanc/casablanca/Release/include/cpprest/uri_builder.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/uri_builder.h "cpprest/base_uri.h" -1422965830 /home/evanc/casablanca/Release/include/cpprest/http_headers.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/http_headers.h @@ -205,11 +207,11 @@ "cpprest/asyncrt_utils.h" -1422965830 /home/evanc/casablanca/Release/include/cpprest/streams.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/streams.h "cpprest/astreambuf.h" -1422965830 /home/evanc/casablanca/Release/include/cpprest/astreambuf.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/astreambuf.h @@ -218,7 +220,7 @@ "cpprest/details/basic_types.h" "cpprest/asyncrt_utils.h" -1422965830 /home/evanc/casablanca/Release/include/cpprest/containerstream.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/containerstream.h @@ -227,7 +229,7 @@ "cpprest/astreambuf.h" "cpprest/streams.h" -1422965830 /home/evanc/casablanca/Release/include/cpprest/details/http_constants.dat +1427351029 /home/evanc/casablanca/Release/include/cpprest/details/http_constants.dat 1423734326 source:/home/evanc/whpc-linux-communicator/NodeManager/RemoteCommunicator.cpp @@ -416,7 +418,7 @@ "../common.h" "../details/log_msg.h" -1426473053 source:/home/evanc/whpc-linux-communicator/nodemanager/main.cpp +1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/main.cpp @@ -432,71 +434,71 @@ "String.h" -1426473053 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteCommunicator.h +1427722564 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteCommunicator.h "../utils/Logger.h" "IRemoteExecutor.h" -1424929892 /home/evanc/whpc-linux-communicator/nodemanager/core/IRemoteExecutor.h +1427722694 /home/evanc/whpc-linux-communicator/nodemanager/core/IRemoteExecutor.h "../arguments/StartJobAndTaskArgs.h" "../arguments/StartTaskArgs.h" "../arguments/EndJobArgs.h" "../arguments/EndTaskArgs.h" -1426473053 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteExecutor.h +1428574742 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteExecutor.h "IRemoteExecutor.h" "JobTaskTable.h" "Monitor.h" "Process.h" "Reporter.h" -1424929892 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/EndJobArgs.cpp +1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/EndJobArgs.cpp "EndJobArgs.h" "../utils/JsonHelper.h" -1424929892 /home/evanc/whpc-linux-communicator/nodemanager/arguments/EndJobArgs.h +1427353166 /home/evanc/whpc-linux-communicator/nodemanager/arguments/EndJobArgs.h -1427272086 /home/evanc/whpc-linux-communicator/nodemanager/utils/JsonHelper.h +1427383873 /home/evanc/whpc-linux-communicator/nodemanager/utils/JsonHelper.h -1424929892 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/EndTaskArgs.cpp +1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/EndTaskArgs.cpp "EndTaskArgs.h" "../utils/JsonHelper.h" -1424929892 /home/evanc/whpc-linux-communicator/nodemanager/arguments/EndTaskArgs.h +1427353166 /home/evanc/whpc-linux-communicator/nodemanager/arguments/EndTaskArgs.h -1426473053 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.cpp +1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.cpp "ProcessStartInfo.h" "../utils/JsonHelper.h" -1426473053 /home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.h +1427353166 /home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.h -1424929892 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/StartJobAndTaskArgs.cpp +1427383873 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/StartJobAndTaskArgs.cpp "StartJobAndTaskArgs.h" "../utils/JsonHelper.h" -1424929892 /home/evanc/whpc-linux-communicator/nodemanager/arguments/StartJobAndTaskArgs.h +1427383873 /home/evanc/whpc-linux-communicator/nodemanager/arguments/StartJobAndTaskArgs.h "ProcessStartInfo.h" -1424929892 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/StartTaskArgs.cpp +1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/StartTaskArgs.cpp "StartTaskArgs.h" "../utils/JsonHelper.h" -1424929892 /home/evanc/whpc-linux-communicator/nodemanager/arguments/StartTaskArgs.h +1427353166 /home/evanc/whpc-linux-communicator/nodemanager/arguments/StartTaskArgs.h "ProcessStartInfo.h" -1427724395 source:/home/evanc/whpc-linux-communicator/nodemanager/core/Process.cpp +1428574597 source:/home/evanc/whpc-linux-communicator/nodemanager/core/Process.cpp @@ -504,8 +506,9 @@ "Process.h" "../utils/Logger.h" "../utils/String.h" + "../common/ErrorCodes.h" -1428041425 /home/evanc/whpc-linux-communicator/nodemanager/core/Process.h +1428574604 /home/evanc/whpc-linux-communicator/nodemanager/core/Process.h @@ -519,6 +522,7 @@ "../utils/String.h" "../utils/Logger.h" "../utils/System.h" + "../common/ErrorCodes.h" 1427353166 /home/evanc/whpc-linux-communicator/nodemanager/utils/String.h @@ -526,19 +530,20 @@ -1426473053 source:/home/evanc/whpc-linux-communicator/nodemanager/core/RemoteCommunicator.cpp +1428486600 source:/home/evanc/whpc-linux-communicator/nodemanager/core/RemoteCommunicator.cpp "RemoteCommunicator.h" "../utils/String.h" "../utils/System.h" "../arguments/StartJobAndTaskArgs.h" -1427383873 /home/evanc/whpc-linux-communicator/nodemanager/utils/System.h +1428574577 /home/evanc/whpc-linux-communicator/nodemanager/utils/System.h "String.h" "Logger.h" + "../common/ErrorCodes.h" -1427192138 source:/home/evanc/whpc-linux-communicator/nodemanager/core/RemoteExecutor.cpp +1428574636 source:/home/evanc/whpc-linux-communicator/nodemanager/core/RemoteExecutor.cpp "RemoteExecutor.h" @@ -546,14 +551,15 @@ "../utils/ReaderLock.h" "../utils/Logger.h" "../utils/System.h" + "../common/ErrorCodes.h" -1426473053 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/JsonHelper.cpp +1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/JsonHelper.cpp "JsonHelper.h" 1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/Logger.cpp "Logger.h" -1426473053 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/String.cpp +1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/String.cpp "String.h" @@ -572,39 +578,39 @@ "String.h" "Logger.h" -1426602483 source:/home/evanc/whpc-linux-communicator/nodemanager/core/JobTaskTable.cpp +1428485409 source:/home/evanc/whpc-linux-communicator/nodemanager/core/JobTaskTable.cpp "JobTaskTable.h" "../utils/WriterLock.h" "../utils/ReaderLock.h" -1426602464 /home/evanc/whpc-linux-communicator/nodemanager/core/JobTaskTable.h +1428485409 /home/evanc/whpc-linux-communicator/nodemanager/core/JobTaskTable.h "../data/TaskInfo.h" "../data/JobInfo.h" "../data/NodeInfo.h" -1426597393 /home/evanc/whpc-linux-communicator/nodemanager/data/TaskInfo.h +1428485409 /home/evanc/whpc-linux-communicator/nodemanager/data/TaskInfo.h -1424929911 /home/evanc/whpc-linux-communicator/nodemanager/data/JobInfo.h +1427353166 /home/evanc/whpc-linux-communicator/nodemanager/data/JobInfo.h "TaskInfo.h" -1424947474 /home/evanc/whpc-linux-communicator/nodemanager/data/NodeInfo.h +1427353166 /home/evanc/whpc-linux-communicator/nodemanager/data/NodeInfo.h "JobInfo.h" -1424929892 /home/evanc/whpc-linux-communicator/nodemanager/utils/WriterLock.h +1427353166 /home/evanc/whpc-linux-communicator/nodemanager/utils/WriterLock.h -1422965830 /home/evanc/casablanca/Release/include/cpprest/http_client.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/http_client.h @@ -619,45 +625,45 @@ "cpprest/oauth1.h" "cpprest/oauth2.h" -1422965830 /home/evanc/casablanca/Release/include/cpprest/details/web_utilities.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/details/web_utilities.h "cpprest/asyncrt_utils.h" -1422965830 /home/evanc/casablanca/Release/include/cpprest/oauth1.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/oauth1.h "cpprest/http_msg.h" "cpprest/details/http_constants.dat" "cpprest/details/http_constants.dat" -1422965830 /home/evanc/casablanca/Release/include/cpprest/oauth2.h +1427351029 /home/evanc/casablanca/Release/include/cpprest/oauth2.h "cpprest/http_msg.h" "cpprest/details/http_constants.dat" -1424940673 source:/home/evanc/whpc-linux-communicator/nodemanager/data/JobInfo.cpp +1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/data/JobInfo.cpp "JobInfo.h" -1424948178 source:/home/evanc/whpc-linux-communicator/nodemanager/data/NodeInfo.cpp +1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/data/NodeInfo.cpp "NodeInfo.h" "../utils/System.h" -1426597885 source:/home/evanc/whpc-linux-communicator/nodemanager/data/TaskInfo.cpp +1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/data/TaskInfo.cpp "TaskInfo.h" "../utils/JsonHelper.h" "../utils/String.h" -1424929892 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/ReaderLock.cpp +1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/ReaderLock.cpp "ReaderLock.h" -1424929892 /home/evanc/whpc-linux-communicator/nodemanager/utils/ReaderLock.h +1427353166 /home/evanc/whpc-linux-communicator/nodemanager/utils/ReaderLock.h 1424929892 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/WriterLock.cpp "WriterLock.h" -1427272469 /home/evanc/whpc-linux-communicator/nodemanager/core/Monitor.h +1427383873 /home/evanc/whpc-linux-communicator/nodemanager/core/Monitor.h "../utils/System.h" -1427272649 source:/home/evanc/whpc-linux-communicator/nodemanager/core/Monitor.cpp +1428571893 source:/home/evanc/whpc-linux-communicator/nodemanager/core/Monitor.cpp "Monitor.h" "../utils/ReaderLock.h" @@ -665,12 +671,14 @@ "../utils/Logger.h" "../utils/System.h" -1426473053 /home/evanc/whpc-linux-communicator/nodemanager/core/Reporter.h +1427353166 /home/evanc/whpc-linux-communicator/nodemanager/core/Reporter.h -1426473053 source:/home/evanc/whpc-linux-communicator/nodemanager/core/Reporter.cpp +1428573779 source:/home/evanc/whpc-linux-communicator/nodemanager/core/Reporter.cpp "Reporter.h" "../utils/Logger.h" +1428574569 /home/evanc/whpc-linux-communicator/nodemanager/common/ErrorCodes.h + diff --git a/nodemanager/common/ErrorCodes.h b/nodemanager/common/ErrorCodes.h new file mode 100644 index 0000000..6a2c0bc --- /dev/null +++ b/nodemanager/common/ErrorCodes.h @@ -0,0 +1,20 @@ +#ifndef ERRORCODES_H +#define ERRORCODES_H + +namespace hpc +{ + namespace common + { + enum class ErrorCodes + { + EndJobExitCode = -101, + EndTaskExitCode = -102, + BuildScriptError = -103, + DefaultExitCode = -999, + GetHostNameError = -104, + PopenError = -105, + }; + } +} + +#endif // ERRORCODES_H diff --git a/nodemanager/core/Monitor.cpp b/nodemanager/core/Monitor.cpp index 5d84df6..62196b5 100644 --- a/nodemanager/core/Monitor.cpp +++ b/nodemanager/core/Monitor.cpp @@ -91,7 +91,7 @@ void Monitor::Run() while (true) { time_t t; - time(&t); + time(&t); long int cpuCurrent = cpuLast + 1, idleCurrent = idleLast; @@ -100,7 +100,7 @@ void Monitor::Run() long int idleDiff = idleCurrent - idleLast; float cpuUsage = (float)(100.0f * (totalDiff - idleDiff) / totalDiff); cpuLast = cpuCurrent; - idleLast = idleCurrent; + idleLast = idleCurrent; unsigned long available, total; System::Memory(available, total); @@ -108,9 +108,15 @@ void Monitor::Run() float totalMemoryMb = (float)total / 1024.0f; long int networkCurrent = 0; - System::NetworkUsage(networkCurrent, "eth0"); + int ret = System::NetworkUsage(networkCurrent, "eth0"); + + if (ret != 0) + { + Logger::Error("Error occurred while collecting network usage {0}", ret); + } + float networkUsage = (float)(networkCurrent - networkLast) / this->intervalSeconds; - networkLast = networkCurrent; + networkLast = networkCurrent; // ip address; std::string ipAddress = System::GetIpAddress(IpAddressVersion::V4, "eth0"); @@ -140,6 +146,7 @@ void Monitor::Run() this->socketCount = sockets; this->distroInfo = distro; this->networkInfo = std::move(netInfo); + } this->isCollected = true; @@ -152,8 +159,9 @@ void* Monitor::MonitoringThread(void* arg) { pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); - + Monitor* m = static_cast(arg); + Logger::Info("Monitoring thread created. Interval {0}", m->intervalSeconds); m->Run(); pthread_exit(nullptr); diff --git a/nodemanager/core/Process.cpp b/nodemanager/core/Process.cpp index 07119fd..66ea7e9 100644 --- a/nodemanager/core/Process.cpp +++ b/nodemanager/core/Process.cpp @@ -5,10 +5,12 @@ #include "Process.h" #include "../utils/Logger.h" -#include "../utils/String.h" +#include "../utils/String.h" +#include "../common/ErrorCodes.h" using namespace hpc::core; -using namespace hpc::utils; +using namespace hpc::utils; +using namespace hpc::common; Process::Process( int jobId, @@ -47,6 +49,7 @@ void Process::Kill(int forcedExitCode) { if (forcedExitCode != 0x0FFFFFFF) { + Logger::Debug(this->jobId, this->taskId, this->requeueCount, "Setting forced ExitCode {0}", forcedExitCode); this->SetExitCode(forcedExitCode); } @@ -103,7 +106,7 @@ void* Process::ForkThread(void* arg) Logger::Error(p->jobId, p->taskId, p->requeueCount, "Error when build script."); // TODO fetch the errno. - p->SetExitCode(-1); + p->SetExitCode((int)ErrorCodes::BuildScriptError); goto Final; } @@ -191,6 +194,8 @@ void Process::Monitor() if (WIFEXITED(status)) { + Logger::Info(this->jobId, this->taskId, this->requeueCount, + "Process {0}: exite code {1}", this->processId, WEXITSTATUS(status)); this->SetExitCode(WEXITSTATUS(status)); std::string output; @@ -209,7 +214,7 @@ void Process::Monitor() else { Logger::Error(this->jobId, this->taskId, this->requeueCount, "wait4 for process {0} status {1}", this->processId, status); - this->SetExitCode(-1); + this->SetExitCode(status); this->message << "wait4 for process " << this->processId << " status " << status << std::endl; } diff --git a/nodemanager/core/Process.h b/nodemanager/core/Process.h index d59d2b7..81c67d8 100644 --- a/nodemanager/core/Process.h +++ b/nodemanager/core/Process.h @@ -15,6 +15,7 @@ #include "../utils/String.h" #include "../utils/Logger.h" #include "../utils/System.h" +#include "../common/ErrorCodes.h" using namespace hpc::utils; @@ -92,7 +93,7 @@ namespace hpc std::ostringstream stdOut; std::ostringstream stdErr; std::ostringstream message; - int exitCode = -1; + int exitCode = (int)hpc::common::ErrorCodes::DefaultExitCode; bool exitCodeSet = false; timeval userTime = { 0, 0 }; timeval kernelTime = { 0, 0 }; diff --git a/nodemanager/core/RemoteExecutor.cpp b/nodemanager/core/RemoteExecutor.cpp index 0e713b8..732d260 100644 --- a/nodemanager/core/RemoteExecutor.cpp +++ b/nodemanager/core/RemoteExecutor.cpp @@ -6,6 +6,7 @@ #include "../utils/ReaderLock.h" #include "../utils/Logger.h" #include "../utils/System.h" +#include "../common/ErrorCodes.h" using namespace web::http; using namespace web; @@ -13,6 +14,7 @@ using namespace hpc::core; using namespace hpc::utils; using namespace hpc::arguments; using namespace hpc::data; +using namespace hpc::common; RemoteExecutor::RemoteExecutor(const std::string& networkName) : monitor(System::GetNodeName(), networkName, MetricReportInterval), lock(PTHREAD_RWLOCK_INITIALIZER) @@ -136,6 +138,7 @@ json::value RemoteExecutor::EndJob(hpc::arguments::EndJobArgs&& args) { ReaderLock readerLock(&this->lock); + Logger::Info(args.JobId, this->UnknowId, this->UnknowId, "EndJob: starting"); auto jobInfo = this->jobTaskTable.RemoveJob(args.JobId); json::value jsonBody; @@ -144,14 +147,15 @@ json::value RemoteExecutor::EndJob(hpc::arguments::EndJobArgs&& args) { for (auto& taskPair : jobInfo->Tasks) { - this->TerminateTask(taskPair.first); + this->TerminateTask(taskPair.first, (int)ErrorCodes::EndJobExitCode); auto taskInfo = taskPair.second; if (taskInfo) { taskInfo->Exited = true; - taskInfo->ExitCode = -1; + taskInfo->ExitCode = (int)ErrorCodes::EndJobExitCode; + Logger::Debug(args.JobId, taskPair.first, taskInfo->TaskRequeueCount, "EndJob: starting"); } else { @@ -163,6 +167,7 @@ json::value RemoteExecutor::EndJob(hpc::arguments::EndJobArgs&& args) } jsonBody = jobInfo->ToJson(); + Logger::Info(args.JobId, this->UnknowId, this->UnknowId, "EndJob: ended {0}", jsonBody); } else { @@ -175,10 +180,11 @@ json::value RemoteExecutor::EndJob(hpc::arguments::EndJobArgs&& args) json::value RemoteExecutor::EndTask(hpc::arguments::EndTaskArgs&& args) { ReaderLock readerLock(&this->lock); + Logger::Info(args.JobId, args.TaskId, this->UnknowId, "EndTask: starting"); auto taskInfo = this->jobTaskTable.GetTask(args.JobId, args.TaskId); - this->TerminateTask(args.TaskId); + this->TerminateTask(args.TaskId, (int)ErrorCodes::EndTaskExitCode); json::value jsonBody; @@ -187,9 +193,10 @@ json::value RemoteExecutor::EndTask(hpc::arguments::EndTaskArgs&& args) this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetAttemptId()); taskInfo->Exited = true; - taskInfo->ExitCode = -1; + taskInfo->ExitCode = (int)ErrorCodes::EndTaskExitCode; jsonBody = taskInfo->ToJson(); + Logger::Info(args.JobId, args.TaskId, this->UnknowId, "EndTask: ended {0}", jsonBody); } else { @@ -215,13 +222,13 @@ json::value RemoteExecutor::Metric(const std::string& callbackUri) return json::value(); } -bool RemoteExecutor::TerminateTask(int taskId) +bool RemoteExecutor::TerminateTask(int taskId, int exitCode) { auto p = this->processes.find(taskId); if (p != this->processes.end()) { - p->second->Kill(-1); + p->second->Kill(exitCode); return true; } diff --git a/nodemanager/core/RemoteExecutor.h b/nodemanager/core/RemoteExecutor.h index 994e7ff..d8a58da 100644 --- a/nodemanager/core/RemoteExecutor.h +++ b/nodemanager/core/RemoteExecutor.h @@ -29,7 +29,7 @@ namespace hpc private: std::string LoadReportUri(const std::string& fileName); void SaveReportUri(const std::string& fileName, const std::string& uri); - bool TerminateTask(int taskId); + bool TerminateTask(int taskId, int exitCode); const int UnknowId = 999; const int NodeInfoReportInterval = 30; diff --git a/nodemanager/core/Reporter.cpp b/nodemanager/core/Reporter.cpp index 4dbd736..7a3638b 100644 --- a/nodemanager/core/Reporter.cpp +++ b/nodemanager/core/Reporter.cpp @@ -100,12 +100,10 @@ void* Reporter::ReportingThread(void * arg) catch (const std::exception& ex) { Logger::Error("Exception occurred when report to {0}, ex {1}", r->reportUri, ex.what()); - //exit(-1); } catch (...) { Logger::Error("Unknown error occurred when report to {0}", r->reportUri); - //exit(-1); } r->inRequest = false; diff --git a/nodemanager/utils/System.cpp b/nodemanager/utils/System.cpp index 472dc09..abe1de4 100644 --- a/nodemanager/utils/System.cpp +++ b/nodemanager/utils/System.cpp @@ -12,8 +12,10 @@ #include "System.h" #include "String.h" #include "Logger.h" +#include "../common/ErrorCodes.h" using namespace hpc::utils; +using namespace hpc::common; std::vector System::GetNetworkInfo() { @@ -183,8 +185,9 @@ void System::CPU(int &cores, int &sockets) fs.close(); } -void System::NetworkUsage(long int &network, const std::string& netName) +int System::NetworkUsage(long int &network, const std::string& netName) { + int ret = 1; std::ifstream fs("/proc/net/dev", std::ios::in); std::string name; int receive, send; @@ -193,14 +196,21 @@ void System::NetworkUsage(long int &network, const std::string& netName) fs.ignore(std::numeric_limits::max(), '\n'); std::string tmp = netName + ":"; - while (name != tmp) + while (name != tmp && fs.good()) { fs >> name >> receive >> send; + fs.ignore(std::numeric_limits::max(), '\n'); } - network = receive + send; + if (name == tmp) + { + network = receive + send; + ret = 0; + } fs.close(); + + return ret; } const std::string& System::GetNodeName() @@ -212,7 +222,7 @@ const std::string& System::GetNodeName() if (-1 == gethostname(buffer, 255)) { Logger::Error("gethostname failed with errno {0}", errno); - exit(-1); + exit((int)ErrorCodes::GetHostNameError); } nodeName = buffer; diff --git a/nodemanager/utils/System.h b/nodemanager/utils/System.h index 1a206ad..671c7a1 100644 --- a/nodemanager/utils/System.h +++ b/nodemanager/utils/System.h @@ -4,7 +4,8 @@ #include #include "String.h" -#include "Logger.h" +#include "Logger.h" +#include "../common/ErrorCodes.h" namespace hpc { @@ -26,7 +27,7 @@ namespace hpc static void CPUUsage(long int &total, long int &idle); static void Memory(unsigned long &available, unsigned long &total); static void CPU(int &cores, int &sockets); - static void NetworkUsage(long int &network, const std::string& netName); + static int NetworkUsage(long int &network, const std::string& netName); static const std::string& GetNodeName(); static bool IsCGroupInstalled(); @@ -41,7 +42,7 @@ namespace hpc FILE* stream = popen(command.c_str(), "r"); std::ostringstream result; - int exitCode = -1; + int exitCode = (int)hpc::common::ErrorCodes::PopenError; if (stream) { From cc78c0baf2a2106265aab895b893fd4d42ca1e6f Mon Sep 17 00:00:00 2001 From: evanc Date: Thu, 9 Apr 2015 04:19:42 -0700 Subject: [PATCH 07/13] Try to fix the node error --- nodemanager/core/Reporter.cpp | 67 +++++++++++++---------------------- nodemanager/core/Reporter.h | 5 ++- 2 files changed, 27 insertions(+), 45 deletions(-) diff --git a/nodemanager/core/Reporter.cpp b/nodemanager/core/Reporter.cpp index 7a3638b..8093a31 100644 --- a/nodemanager/core/Reporter.cpp +++ b/nodemanager/core/Reporter.cpp @@ -10,13 +10,10 @@ using namespace web::http; Reporter::Reporter(const std::string& uri, int interval, std::function fetcher) : reportUri(uri), intervalSeconds(interval), valueFetcher(fetcher), - isRunning(true), inRequest(false) + isRunning(true) { if (!uri.empty()) { - http_client_config config; - config.set_validate_certificates(false); - this->client = std::shared_ptr(new http_client(uri, config)); pthread_create(&this->threadId, nullptr, ReportingThread, this); } } @@ -28,16 +25,15 @@ Reporter::~Reporter() this->isRunning = false; this->cts.cancel(); if (this->threadId != 0) - { + { + while (this->inRequest) usleep(1); pthread_cancel(this->threadId); pthread_join(this->threadId, nullptr); Logger::Debug("Destructed Reporter {0}", this->reportUri); } - - while (this->inRequest) usleep(1); } -pplx::task Reporter::Report() +void Reporter::Report() { const std::string& uri = this->reportUri; if (!uri.empty()) @@ -46,7 +42,7 @@ pplx::task Reporter::Report() if (jsonBody.is_null()) { Logger::Info("Skipped reporting to {0} because json is null", uri); - return pplx::task_from_result(); + return; } if (this->intervalSeconds > 10) @@ -54,25 +50,32 @@ pplx::task Reporter::Report() Logger::Info("---------> Report to {0} with {1}", uri, jsonBody); } + http_client_config config; + config.set_validate_certificates(false); + http_client client(uri, config); + try { - return this->client->request(methods::POST, "", jsonBody, this->cts.get_token()).then([&uri, this](http_response response) + client.request(methods::POST, "", jsonBody, this->cts.get_token()).then([&uri, this](http_response response) { if (this->intervalSeconds > 10) { Logger::Debug("---------> Reported to {0} response code {1}", uri, response.status_code()); } - }); + }).wait(); } - catch (std::exception& ex) - { - Logger::Error("Reporting exception occurred {0}, {1}", ex.what(), jsonBody); - return pplx::task_from_result(); + catch (const http_exception& httpEx) + { + Logger::Warn("HttpException occurred when report to {0}, ex {1}", this->reportUri, httpEx.what()); } - } - else - { - return pplx::task_from_result(); + catch (const std::exception& ex) + { + Logger::Error("Exception occurred when report to {0}, ex {1}", this->reportUri, ex.what()); + } + catch (...) + { + Logger::Error("Unknown error occurred when report to {0}", this->reportUri); + } } } @@ -84,30 +87,10 @@ void* Reporter::ReportingThread(void * arg) Reporter* r = static_cast(arg); while (r->isRunning) - { + { r->inRequest = true; - - r->Report().then([r](auto t) - { - try - { - t.wait(); - } - catch (const http_exception& httpEx) - { - Logger::Warn("HttpException occurred when report to {0}, ex {1}", r->reportUri, httpEx.what()); - } - catch (const std::exception& ex) - { - Logger::Error("Exception occurred when report to {0}, ex {1}", r->reportUri, ex.what()); - } - catch (...) - { - Logger::Error("Unknown error occurred when report to {0}", r->reportUri); - } - - r->inRequest = false; - }); + r->Report(); + r->inRequest = false; sleep(r->intervalSeconds); } diff --git a/nodemanager/core/Reporter.h b/nodemanager/core/Reporter.h index 203f5ca..61be290 100644 --- a/nodemanager/core/Reporter.h +++ b/nodemanager/core/Reporter.h @@ -16,7 +16,7 @@ namespace hpc Reporter(const std::string& uri, int interval, std::function fetcher); ~Reporter(); - pplx::task Report(); + void Report(); protected: private: static void* ReportingThread(void* arg); @@ -28,8 +28,7 @@ namespace hpc pthread_t threadId; pplx::cancellation_token_source cts; bool isRunning; - bool inRequest; - std::shared_ptr client; + bool inRequest = false; }; } } From e585b787c0cbb60c2f85524b21f8954123e16216 Mon Sep 17 00:00:00 2001 From: evanc Date: Mon, 13 Apr 2015 00:00:03 -0700 Subject: [PATCH 08/13] Fix potential node error --- nodemanager/scripts/EndTask.sh | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/nodemanager/scripts/EndTask.sh b/nodemanager/scripts/EndTask.sh index ddca8fd..b2e1f5c 100755 --- a/nodemanager/scripts/EndTask.sh +++ b/nodemanager/scripts/EndTask.sh @@ -15,26 +15,33 @@ if $CGInstalled; then tasks=$CGroupRoot/cpuset/$groupName/tasks freezerState=$CGroupRoot/freezer/$groupName/freezer.state - [ ! -f $tasks ] && echo "task is ended" && exit 1 - [ ! -f $freezerState ] && echo "task is ended" && exit 1 + [ ! -f $tasks ] && echo "$tasks doesn't exist" && exit -200 + [ ! -f $freezerState ] && echo "$freezerState doesn't exist" && exit -201 # freeze the task echo FROZEN > $freezerState - while [ -f $freezerState ] && ! grep -Fxq FROZEN $freezerState; do + maxLoop=20 + while [ -f $freezerState ] && ! grep -Fxq FROZEN $freezerState && [ $maxLoop -gt 0 ] + do sleep .1 + ((maxLoop--)) done # kill all tasks - for pid in $(cat $tasks); do + for pid in $(cat $tasks); + do [ -d /proc/$pid ] && kill -TERM $pid done # resume tasks echo THAWED > $freezerState - while [ -f $freezerState ] && ! grep -Fxq THAWED $freezerState; do + maxLoop=20 + while [ -f $freezerState ] && ! grep -Fxq THAWED $freezerState && [ $maxLoop -gt 0 ] + do sleep .1 + ((maxLoop--)) done else kill -s TERM $(pstree -l -p $taskId | grep "([[:digit:]]*)" -o | tr -d '()') From 1066feaf29157e1bb13c4207b915dcc452c2f4bf Mon Sep 17 00:00:00 2001 From: evanc Date: Mon, 13 Apr 2015 00:15:30 -0700 Subject: [PATCH 09/13] Get Version info --- nodemanager/NodeManager.cbp | 2 ++ nodemanager/Version.cpp | 1 + nodemanager/Version.h | 20 ++++++++++++++++++++ nodemanager/main.cpp | 9 ++++++--- 4 files changed, 29 insertions(+), 3 deletions(-) create mode 100644 nodemanager/Version.cpp create mode 100644 nodemanager/Version.h diff --git a/nodemanager/NodeManager.cbp b/nodemanager/NodeManager.cbp index 9899143..df429da 100644 --- a/nodemanager/NodeManager.cbp +++ b/nodemanager/NodeManager.cbp @@ -48,6 +48,8 @@ + + diff --git a/nodemanager/Version.cpp b/nodemanager/Version.cpp new file mode 100644 index 0000000..50b5049 --- /dev/null +++ b/nodemanager/Version.cpp @@ -0,0 +1 @@ +#include "Version.h" diff --git a/nodemanager/Version.h b/nodemanager/Version.h new file mode 100644 index 0000000..f21c6bb --- /dev/null +++ b/nodemanager/Version.h @@ -0,0 +1,20 @@ +#ifndef VERSION_H_INCLUDED +#define VERSION_H_INCLUDED + +#include + +namespace hpc +{ + class Version + { + public: + static const std::string& GetVersion() + { + static std::string version = "1.1.1.0"; + + return version; + } + }; +} + +#endif // VERSION_H_INCLUDED diff --git a/nodemanager/main.cpp b/nodemanager/main.cpp index 801ad0b..085efd6 100644 --- a/nodemanager/main.cpp +++ b/nodemanager/main.cpp @@ -4,16 +4,19 @@ #include "utils/Logger.h" #include "core/RemoteCommunicator.h" -#include "core/RemoteExecutor.h" +#include "core/RemoteExecutor.h" +#include "Version.h" using namespace std; using namespace hpc::core; -using namespace hpc::utils; +using namespace hpc::utils; +using namespace hpc; int main() { std::cout << "Node manager started." << std::endl; - Logger::Info("Log system works."); + Logger::Info("Log system works."); + Logger::Info("Version: {0}", Version::GetVersion()); const std::string networkName = "eth0"; RemoteExecutor executor(networkName); From 97b75de953dec682d1b5a39b3cbb76833208e170 Mon Sep 17 00:00:00 2001 From: evanc Date: Tue, 14 Apr 2015 01:29:23 -0700 Subject: [PATCH 10/13] Fixed a long running because of callback failure --- nodemanager/NodeManager.depend | 13 +++++++++++-- nodemanager/Version.h | 26 +++++++++++++++++++++++++- nodemanager/core/RemoteExecutor.cpp | 5 +++-- 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/nodemanager/NodeManager.depend b/nodemanager/NodeManager.depend index e9f69d8..2681b0c 100755 --- a/nodemanager/NodeManager.depend +++ b/nodemanager/NodeManager.depend @@ -418,13 +418,14 @@ "../common.h" "../details/log_msg.h" -1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/main.cpp +1428909167 source:/home/evanc/whpc-linux-communicator/nodemanager/main.cpp "utils/Logger.h" "core/RemoteCommunicator.h" "core/RemoteExecutor.h" + "Version.h" 1428488455 /home/evanc/whpc-linux-communicator/nodemanager/utils/Logger.h @@ -671,7 +672,7 @@ "../utils/Logger.h" "../utils/System.h" -1427353166 /home/evanc/whpc-linux-communicator/nodemanager/core/Reporter.h +1428578115 /home/evanc/whpc-linux-communicator/nodemanager/core/Reporter.h @@ -682,3 +683,11 @@ 1428574569 /home/evanc/whpc-linux-communicator/nodemanager/common/ErrorCodes.h +1428909113 source:/home/evanc/whpc-linux-communicator/nodemanager/Version.cpp + "Version.h" + +1429000026 /home/evanc/whpc-linux-communicator/nodemanager/Version.h + + + + diff --git a/nodemanager/Version.h b/nodemanager/Version.h index f21c6bb..b87c1f9 100644 --- a/nodemanager/Version.h +++ b/nodemanager/Version.h @@ -2,6 +2,8 @@ #define VERSION_H_INCLUDED #include +#include +#include namespace hpc { @@ -10,7 +12,29 @@ namespace hpc public: static const std::string& GetVersion() { - static std::string version = "1.1.1.0"; + static std::string version; + + static std::map> versionHistory = + { + { "1.1.1.1", + { + "Node manager main functionality support", + "Added version support", + "Fixed network card reversed order issue", + "Added trace", + "Added error codes definition", + "Fixed a potential node error issue", + } + }, + { "1.1.1.2", + { + "Fixed a long running issue because of callback failure", + } + }, + }; + + auto it = --versionHistory.end(); + version = it->first; return version; } diff --git a/nodemanager/core/RemoteExecutor.cpp b/nodemanager/core/RemoteExecutor.cpp index 732d260..752d069 100644 --- a/nodemanager/core/RemoteExecutor.cpp +++ b/nodemanager/core/RemoteExecutor.cpp @@ -94,6 +94,9 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c "Callback to {0} response code {1}", callbackUri, response.status_code()); }).wait(); } + + // this won't remove the task entry added later as attempt id doesn't match + this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetAttemptId()); } catch (const std::exception& ex) { @@ -101,8 +104,6 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c "Exception when sending back task result. {0}", ex.what()); } - // this won't remove the task entry added later as attempt id doesn't match - this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetAttemptId()); Logger::Debug(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount, "attemptId {0}, erasing process", taskInfo->GetAttemptId()); From 05c60801ef487d3493bf4626c9c17d1a06600f4d Mon Sep 17 00:00:00 2001 From: evanc Date: Tue, 14 Apr 2015 02:14:14 -0700 Subject: [PATCH 11/13] Fixed another long running issue --- nodemanager/Version.h | 6 ++++++ nodemanager/core/RemoteExecutor.cpp | 3 ++- nodemanager/data/TaskInfo.cpp | 9 ++++++++- nodemanager/data/TaskInfo.h | 1 + 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/nodemanager/Version.h b/nodemanager/Version.h index b87c1f9..24f56a4 100644 --- a/nodemanager/Version.h +++ b/nodemanager/Version.h @@ -29,6 +29,12 @@ namespace hpc { "1.1.1.2", { "Fixed a long running issue because of callback failure", + "Added version history support", + } + }, + { "1.1.1.3", + { + "Fixed a long running issue because of callback contract mismatch", } }, }; diff --git a/nodemanager/core/RemoteExecutor.cpp b/nodemanager/core/RemoteExecutor.cpp index 752d069..ef7d4af 100644 --- a/nodemanager/core/RemoteExecutor.cpp +++ b/nodemanager/core/RemoteExecutor.cpp @@ -82,7 +82,8 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c taskInfo->KernelProcessorTime = kernelTime.tv_sec * 1000000 + kernelTime.tv_usec; taskInfo->UserProcessorTime = userTime.tv_sec * 1000000 + userTime.tv_usec; - auto jsonBody = taskInfo->ToJson(); + auto jsonBody = taskInfo->ToCompletionEventArgJson(); + Logger::Debug(taskInfo->JobId, taskInfo->TaskId, taskInfo->TaskRequeueCount, "Callback to {0} with {1}", callbackUri, jsonBody); client::http_client_config config; diff --git a/nodemanager/data/TaskInfo.cpp b/nodemanager/data/TaskInfo.cpp index 31ee03f..66d87ba 100644 --- a/nodemanager/data/TaskInfo.cpp +++ b/nodemanager/data/TaskInfo.cpp @@ -20,7 +20,14 @@ json::value TaskInfo::ToJson() const j["NumberOfProcesses"] = this->NumberOfProcesses; j["PrimaryTask"] = this->IsPrimaryTask; j["Message"] = JsonHelper::ToJson(this->Message); - j["ProcessIds"] = JsonHelper::ToJson(String::Join<','>(this->ProcessIds)); + j["ProcessIds"] = JsonHelper::ToJson(String::Join<','>(this->ProcessIds)); + + return j; +} + +json::value TaskInfo::ToCompletionEventArgJson() const +{ + json::value j = this->ToJson(); json::value jobIdArg; jobIdArg["JobId"] = this->JobId; diff --git a/nodemanager/data/TaskInfo.h b/nodemanager/data/TaskInfo.h index 09c3b60..babf213 100644 --- a/nodemanager/data/TaskInfo.h +++ b/nodemanager/data/TaskInfo.h @@ -23,6 +23,7 @@ namespace hpc TaskInfo(TaskInfo&& t) = default; web::json::value ToJson() const; + web::json::value ToCompletionEventArgJson() const; const std::string& NodeName; From 73577df98de04ecda65f26f5bf405ed8cbb8037c Mon Sep 17 00:00:00 2001 From: evanc Date: Tue, 14 Apr 2015 03:19:18 -0700 Subject: [PATCH 12/13] Retry when create cgroup failed. Enhanced logging and exit code catch --- nodemanager/Version.h | 7 ++++ nodemanager/core/Process.h | 6 ++- nodemanager/scripts/EndTask.sh | 2 +- nodemanager/scripts/PrepareTask.sh | 67 +++++++++++++++++++++++++++--- 4 files changed, 75 insertions(+), 7 deletions(-) diff --git a/nodemanager/Version.h b/nodemanager/Version.h index 24f56a4..30484a4 100644 --- a/nodemanager/Version.h +++ b/nodemanager/Version.h @@ -37,6 +37,13 @@ namespace hpc "Fixed a long running issue because of callback contract mismatch", } }, + { "1.1.1.4", + { + "Retry when create cgroup failed", + "Return the exit code and error message when PrepareTask", + "Record the output to message in Process", + } + }, }; auto it = --versionHistory.end(); diff --git a/nodemanager/core/Process.h b/nodemanager/core/Process.h index 81c67d8..724a5c7 100644 --- a/nodemanager/core/Process.h +++ b/nodemanager/core/Process.h @@ -71,7 +71,11 @@ namespace hpc if (ret != 0) { std::string cmdLine = String::Join(" ", cmd, args...); - this->message << "Task " << this->taskId << ": '" << cmdLine << "' failed. exitCode " << ret << "\r\n"; + this->message + << "Task " << this->taskId << ": '" << cmdLine + << "' failed. exitCode " << ret << ". output " + << output << std::endl; + Logger::Error(this->jobId, this->taskId, this->requeueCount, "'{0}' failed. exitCode {1}, output {2}.", cmdLine, ret, output); this->SetExitCode(ret); diff --git a/nodemanager/scripts/EndTask.sh b/nodemanager/scripts/EndTask.sh index b2e1f5c..38dd14e 100755 --- a/nodemanager/scripts/EndTask.sh +++ b/nodemanager/scripts/EndTask.sh @@ -44,7 +44,7 @@ if $CGInstalled; then ((maxLoop--)) done else - kill -s TERM $(pstree -l -p $taskId | grep "([[:digit:]]*)" -o | tr -d '()') + kill -s TERM $(pstree -l -p $processId | grep "([[:digit:]]*)" -o | tr -d '()') fi exit 0 diff --git a/nodemanager/scripts/PrepareTask.sh b/nodemanager/scripts/PrepareTask.sh index 35df230..00189f1 100755 --- a/nodemanager/scripts/PrepareTask.sh +++ b/nodemanager/scripts/PrepareTask.sh @@ -8,10 +8,67 @@ taskId=$1 if $CGInstalled; then - groupName=$(GetCGroupName $taskId) - group=$CGroupSubSys:$groupName - cgcreate -g $group - echo "$2" > $CGroupRoot/cpuset/$groupName/cpuset.cpus - echo 0 > $CGroupRoot/cpuset/$groupName/cpuset.mems + groupName=$(GetCGroupName $taskId) + group=$CGroupSubSys:$groupName + + maxLoop=3 + while [ $maxLoop -gt 0 ] + do + cgcreate -g $group + ec=$? + if [ $ec -eq 0 ] + then + break + fi + + echo "Failed to create cgroup $group, error code $ec, retry after .5 seconds" + ((maxLoop--)) + sleep .5 + done + + if [ $ec -ne 0 ] + then + exit $ec + fi + + maxLoop=3 + while [ $maxLoop -gt 0 ] + do + echo "$2" > $CGroupRoot/cpuset/$groupName/cpuset.cpus + ec=$? + if [ $ec -eq 0 ] + then + break + fi + + echo "Failed to set cpus for $group, error code $ec, retry after .5 seconds" + ((maxLoop--)) + sleep .5 + done + + if [ $ec -ne 0 ] + then + exit $ec + fi + + maxLoop=3 + while [ $maxLoop -gt 0 ] + do + echo 0 > $CGroupRoot/cpuset/$groupName/cpuset.mems + ec=$? + if [ $ec -eq 0 ] + then + break + fi + + echo "Failed to set mems for $group, error code $ec, retry after .5 seconds" + ((maxLoop--)) + sleep .5 + done + + if [ $ec -ne 0 ] + then + exit $ec + fi fi From 2d4008c8551d9ecf1bd78e1f41d23114ff1fb5c2 Mon Sep 17 00:00:00 2001 From: evanc Date: Tue, 14 Apr 2015 03:37:54 -0700 Subject: [PATCH 13/13] Print out history --- nodemanager/NodeManager.depend | 3 ++- nodemanager/Version.h | 44 ++++++++++++++++++++++++++++------ nodemanager/main.cpp | 14 ++++++++--- 3 files changed, 50 insertions(+), 11 deletions(-) diff --git a/nodemanager/NodeManager.depend b/nodemanager/NodeManager.depend index 2681b0c..dc386c0 100755 --- a/nodemanager/NodeManager.depend +++ b/nodemanager/NodeManager.depend @@ -686,8 +686,9 @@ 1428909113 source:/home/evanc/whpc-linux-communicator/nodemanager/Version.cpp "Version.h" -1429000026 /home/evanc/whpc-linux-communicator/nodemanager/Version.h +1429007613 /home/evanc/whpc-linux-communicator/nodemanager/Version.h + diff --git a/nodemanager/Version.h b/nodemanager/Version.h index 30484a4..ef5cf86 100644 --- a/nodemanager/Version.h +++ b/nodemanager/Version.h @@ -4,16 +4,15 @@ #include #include #include +#include namespace hpc { class Version { public: - static const std::string& GetVersion() + static const std::map>& GetVersionHistory() { - static std::string version; - static std::map> versionHistory = { { "1.1.1.1", @@ -44,13 +43,44 @@ namespace hpc "Record the output to message in Process", } }, + { "1.1.1.5", + { + "Print out version history", + } + }, }; - auto it = --versionHistory.end(); - version = it->first; - - return version; + return versionHistory; } + + static const std::string& GetVersion() + { + auto& h = GetVersionHistory(); + auto it = --h.end(); + return it->first; + } + + static void PrintVersionHistory() + { + auto& h = GetVersionHistory(); + for (auto& v : h) + { + std::cout << v.first << std::endl; + std::cout << "================================================================" << std::endl; + + int number = 0; + for (auto& m : v.second) + { + number++; + std::cout << number << ". " << m << std::endl; + } + + std::cout << std::endl; + } + } + + private: + }; } diff --git a/nodemanager/main.cpp b/nodemanager/main.cpp index 085efd6..9dade66 100644 --- a/nodemanager/main.cpp +++ b/nodemanager/main.cpp @@ -1,4 +1,5 @@ -#include +#include +#include #include #include @@ -12,8 +13,15 @@ using namespace hpc::core; using namespace hpc::utils; using namespace hpc; -int main() -{ +int main(int argc, char* argv[]) +{ + if (argc > 1) + { + if (string("-v") == argv[1]) + Version::PrintVersionHistory(); + return 0; + } + std::cout << "Node manager started." << std::endl; Logger::Info("Log system works."); Logger::Info("Version: {0}", Version::GetVersion());