From b3fcc87d75c4972eec3aa6f5a1b25a94d8dce260 Mon Sep 17 00:00:00 2001 From: Chenling Zhang Date: Mon, 4 Sep 2017 21:06:34 +0800 Subject: [PATCH 1/2] peek task output for linux compute nodes --- nodemanager/arguments/PeekTaskOutputArgs.cpp | 20 ++++++++++++ nodemanager/arguments/PeekTaskOutputArgs.h | 26 +++++++++++++++ nodemanager/core/IRemoteExecutor.h | 2 ++ nodemanager/core/Process.cpp | 34 ++++++++++++++++++-- nodemanager/core/Process.h | 2 ++ nodemanager/core/RemoteCommunicator.cpp | 7 ++++ nodemanager/core/RemoteCommunicator.h | 1 + nodemanager/core/RemoteExecutor.cpp | 29 +++++++++++++++++ nodemanager/core/RemoteExecutor.h | 2 ++ 9 files changed, 121 insertions(+), 2 deletions(-) create mode 100644 nodemanager/arguments/PeekTaskOutputArgs.cpp create mode 100644 nodemanager/arguments/PeekTaskOutputArgs.h diff --git a/nodemanager/arguments/PeekTaskOutputArgs.cpp b/nodemanager/arguments/PeekTaskOutputArgs.cpp new file mode 100644 index 0000000..8d42127 --- /dev/null +++ b/nodemanager/arguments/PeekTaskOutputArgs.cpp @@ -0,0 +1,20 @@ +#include "PeekTaskOutputArgs.h" +#include "../utils/JsonHelper.h" + +using namespace hpc::arguments; +using namespace hpc::utils; + +PeekTaskOutputArgs::PeekTaskOutputArgs(int jobId, int taskId) + : JobId(jobId), TaskId(taskId) +{ + //ctor +} + +PeekTaskOutputArgs PeekTaskOutputArgs::FromJson(const json::value& j) +{ + PeekTaskOutputArgs args( + JsonHelper::Read("JobId", j), + JsonHelper::Read("TaskId", j)); + + return std::move(args); +} diff --git a/nodemanager/arguments/PeekTaskOutputArgs.h b/nodemanager/arguments/PeekTaskOutputArgs.h new file mode 100644 index 0000000..806d131 --- /dev/null +++ b/nodemanager/arguments/PeekTaskOutputArgs.h @@ -0,0 +1,26 @@ +#ifndef PEEKTASKOUTPUTARGS_H +#define PEEKTASKOUTPUTARGS_H + +#include + +namespace hpc +{ + namespace arguments + { + struct PeekTaskOutputArgs + { + public: + PeekTaskOutputArgs(int jobId, int taskId); + + int JobId; + int TaskId; + + static PeekTaskOutputArgs FromJson(const web::json::value& jsonValue); + + protected: + private: + }; + } +} + +#endif // PEEKTASKOUTPUTARGS_H diff --git a/nodemanager/core/IRemoteExecutor.h b/nodemanager/core/IRemoteExecutor.h index 63cfd33..f112b2f 100644 --- a/nodemanager/core/IRemoteExecutor.h +++ b/nodemanager/core/IRemoteExecutor.h @@ -6,6 +6,7 @@ #include "../arguments/EndJobArgs.h" #include "../arguments/EndTaskArgs.h" #include "../arguments/MetricCountersConfig.h" +#include "../arguments/PeekTaskOutputArgs.h" namespace hpc { @@ -21,6 +22,7 @@ namespace hpc virtual pplx::task Ping(std::string&& callbackUri) = 0; virtual pplx::task Metric(std::string&& callbackUri) = 0; virtual pplx::task MetricConfig(hpc::arguments::MetricCountersConfig&& config, std::string&& callbackUri) = 0; + virtual pplx::task PeekTaskOutput(hpc::arguments::PeekTaskOutputArgs&& args) = 0; }; } } diff --git a/nodemanager/core/Process.cpp b/nodemanager/core/Process.cpp index 063107f..1208fbd 100644 --- a/nodemanager/core/Process.cpp +++ b/nodemanager/core/Process.cpp @@ -543,8 +543,13 @@ std::string Process::BuildScript() fs << " || exit $?"; fs << std::endl << std::endl; - if (this->stdOutFile.empty()) this->stdOutFile = this->taskFolder + "/stdout.txt"; - if (this->stdErrFile.empty()) this->stdErrFile = this->taskFolder + "/stderr.txt"; + if (!this->streamOutput) + { + if (this->stdOutFile.empty()) this->stdOutFile = this->taskFolder + "/stdout.txt"; + if (this->stdErrFile.empty()) this->stdErrFile = this->taskFolder + "/stderr.txt"; + if (!boost::algorithm::starts_with(this->stdOutFile, "/")) this->stdOutFile = this->taskFolder + "/" + this->stdOutFile; + if (!boost::algorithm::starts_with(this->stdErrFile, "/")) this->stdErrFile = this->taskFolder + "/" + this->stdErrFile; + } // before fs << "echo before >" << this->taskFolder << "/before1.txt 2>" << this->taskFolder << "/before2.txt"; @@ -630,3 +635,28 @@ std::unique_ptr Process::PrepareEnvironment() return std::move(envi); } + +std::string Process::PeekOutput() +{ + int ret = 0; + std::string stdout; + ret = System::ExecuteCommandOut(stdout, "2>&1 tail -c 5000", this->stdOutFile); + if (ret != 0) + { + stdout = "Reading " + this->stdOutFile + " failed: " + stdout; + } + + if (this->stdOutFile == this->stdErrFile) + { + return stdout; + } + + std::string stderr; + ret = System::ExecuteCommandOut(stderr, "2>&1 tail -c 5000", this->stdErrFile); + if (ret != 0) + { + stderr = "Reading " + this->stdErrFile + " failed: " + stderr; + } + + return "STDOUT:\n" + stdout + "\nSTDERR:\n" + stderr; +} \ No newline at end of file diff --git a/nodemanager/core/Process.h b/nodemanager/core/Process.h index 1ab19f8..a9fff8c 100644 --- a/nodemanager/core/Process.h +++ b/nodemanager/core/Process.h @@ -66,6 +66,8 @@ namespace hpc void SetSelfPtr(std::shared_ptr self) { this->selfPtr.swap(self); } void ResetSelfPtr() { this->selfPtr.reset(); } + std::string PeekOutput(); + protected: private: void SetExitCode(int exitCode) diff --git a/nodemanager/core/RemoteCommunicator.cpp b/nodemanager/core/RemoteCommunicator.cpp index d1a8960..c9c46e9 100644 --- a/nodemanager/core/RemoteCommunicator.cpp +++ b/nodemanager/core/RemoteCommunicator.cpp @@ -42,6 +42,7 @@ RemoteCommunicator::RemoteCommunicator(IRemoteExecutor& exec, const http_listene this->processors["ping"] = [this] (auto&& j, auto&& c) { return this->Ping(std::move(j), std::move(c)); }; this->processors["metric"] = [this] (auto&& j, auto&& c) { return this->Metric(std::move(j), std::move(c)); }; this->processors["metricconfig"] = [this] (auto&& j, auto&& c) { return this->MetricConfig(std::move(j), std::move(c)); }; + this->processors["peektaskoutput"] = [this] (auto&& j, auto&& c) { return this->PeekTaskOutput(std::move(j), std::move(c)); }; } RemoteCommunicator::~RemoteCommunicator() @@ -293,5 +294,11 @@ pplx::task RemoteCommunicator::MetricConfig(json::value&& val, std: return this->executor.MetricConfig(std::move(args), std::move(callbackUri)); } +pplx::task RemoteCommunicator::PeekTaskOutput(json::value&& val, std::string&& callbackUri) +{ + auto args = PeekTaskOutputArgs::FromJson(val); + return this->executor.PeekTaskOutput(std::move(args)); +} + const std::string RemoteCommunicator::ApiSpace = "api"; diff --git a/nodemanager/core/RemoteCommunicator.h b/nodemanager/core/RemoteCommunicator.h index 3b8a9ef..a903023 100644 --- a/nodemanager/core/RemoteCommunicator.h +++ b/nodemanager/core/RemoteCommunicator.h @@ -62,6 +62,7 @@ namespace hpc pplx::task Ping(json::value&& val, std::string&&); pplx::task Metric(json::value&& val, std::string&&); pplx::task MetricConfig(json::value&& val, std::string&&); + pplx::task PeekTaskOutput(json::value&& val, std::string&&); static const std::string ApiSpace; const std::string listeningUri; diff --git a/nodemanager/core/RemoteExecutor.cpp b/nodemanager/core/RemoteExecutor.cpp index 937b56b..6b1459f 100644 --- a/nodemanager/core/RemoteExecutor.cpp +++ b/nodemanager/core/RemoteExecutor.cpp @@ -722,3 +722,32 @@ void RemoteExecutor::ResyncAndInvalidateCache() NamingClient::InvalidateCache(); } +pplx::task RemoteExecutor::PeekTaskOutput(hpc::arguments::PeekTaskOutputArgs&& args) +{ + Logger::Info(args.JobId, args.TaskId, this->UnknowId, "Peeking task output."); + + std::string output; + try + { + auto taskInfo = this->jobTaskTable.GetTask(args.JobId, args.TaskId); + if (taskInfo) + { + Logger::Debug(args.JobId, args.TaskId, taskInfo->GetTaskRequeueCount(), + "PeekTaskOutput for ProcessKey {0}, processes count {1}", + taskInfo->ProcessKey, this->processes.size()); + + auto p = this->processes.find(taskInfo->ProcessKey); + if (p != this->processes.end()) + { + output = p->second->PeekOutput(); + } + } + } + catch (const std::exception& ex) + { + Logger::Warn(args.JobId, args.TaskId, this->UnknowId, "Exception when peeking task output: {0}", ex.what()); + output = "Failed to get the output."; + } + + return pplx::task_from_result(json::value::string(output)); +} diff --git a/nodemanager/core/RemoteExecutor.h b/nodemanager/core/RemoteExecutor.h index 35e0932..8f1700c 100644 --- a/nodemanager/core/RemoteExecutor.h +++ b/nodemanager/core/RemoteExecutor.h @@ -30,6 +30,8 @@ namespace hpc virtual pplx::task Ping(std::string&& callbackUri); virtual pplx::task Metric(std::string&& callbackUri); virtual pplx::task MetricConfig(hpc::arguments::MetricCountersConfig&& config, std::string&& callbackUri); + virtual pplx::task PeekTaskOutput(hpc::arguments::PeekTaskOutputArgs&& args); + protected: private: static void* GracePeriodElapsed(void* data); From 09dd23127b9024467dd7cd7a480777b1e6baa612 Mon Sep 17 00:00:00 2001 From: Chenling Zhang Date: Mon, 11 Sep 2017 17:39:42 +0800 Subject: [PATCH 2/2] some revise --- nodemanager/core/Process.cpp | 64 +++++++++++++---------------- nodemanager/core/Process.h | 5 +++ nodemanager/core/RemoteExecutor.cpp | 6 +-- 3 files changed, 37 insertions(+), 38 deletions(-) diff --git a/nodemanager/core/Process.cpp b/nodemanager/core/Process.cpp index 1208fbd..0f9394f 100644 --- a/nodemanager/core/Process.cpp +++ b/nodemanager/core/Process.cpp @@ -39,8 +39,7 @@ Process::Process( workDirectory(workDir), userName(user.empty() ? "root" : user), dumpStdout(dumpStdoutToExecutionMessage), affinity(cpuAffinity), environments(envi), callback(completed), processId(0) { - this->streamOutput = boost::algorithm::starts_with(stdOutFile, "http://") || - boost::algorithm::starts_with(stdOutFile, "https://"); + this->streamOutput = StartWithHttpOrHttps(stdOutFile); Logger::Debug(this->jobId, this->taskId, this->requeueCount, "{0}, stream ? {1}", stdOutFile, this->streamOutput); } @@ -362,7 +361,7 @@ void Process::Monitor() if (!this->streamOutput) { std::string output; - + int ret = 0; if (this->dumpStdout) { @@ -527,29 +526,16 @@ std::string Process::BuildScript() std::ofstream fs(runDirInOut, std::ios::trunc); fs << "#!/bin/bash" << std::endl << std::endl; - - fs << "cd "; - + Logger::Debug("{0}, {1}", this->taskFolder, this->workDirectory); - if (this->workDirectory.empty()) - { - fs << this->taskFolder; - } - else - { - fs << this->workDirectory; - } - fs << " || exit $?"; - fs << std::endl << std::endl; + std::string workDirectory = this->workDirectory.empty() ? this->taskFolder : this->workDirectory; + fs << "cd " << workDirectory << " || exit $?" << std::endl << std::endl; - if (!this->streamOutput) - { - if (this->stdOutFile.empty()) this->stdOutFile = this->taskFolder + "/stdout.txt"; - if (this->stdErrFile.empty()) this->stdErrFile = this->taskFolder + "/stderr.txt"; - if (!boost::algorithm::starts_with(this->stdOutFile, "/")) this->stdOutFile = this->taskFolder + "/" + this->stdOutFile; - if (!boost::algorithm::starts_with(this->stdErrFile, "/")) this->stdErrFile = this->taskFolder + "/" + this->stdErrFile; - } + if (this->stdOutFile.empty()) this->stdOutFile = this->taskFolder + "/stdout.txt"; + else if (!boost::algorithm::starts_with(this->stdOutFile, "/") && !StartWithHttpOrHttps(this->stdOutFile)) this->stdOutFile = workDirectory + "/" + this->stdOutFile; + if (this->stdErrFile.empty()) this->stdErrFile = this->taskFolder + "/stderr.txt"; + else if (!boost::algorithm::starts_with(this->stdErrFile, "/") && !StartWithHttpOrHttps(this->stdErrFile)) this->stdErrFile = workDirectory + "/" + this->stdErrFile; // before fs << "echo before >" << this->taskFolder << "/before1.txt 2>" << this->taskFolder << "/before2.txt"; @@ -638,25 +624,33 @@ std::unique_ptr Process::PrepareEnvironment() std::string Process::PeekOutput() { + std::string output; + int ret = 0; std::string stdout; - ret = System::ExecuteCommandOut(stdout, "2>&1 tail -c 5000", this->stdOutFile); + ret = System::ExecuteCommandOut(stdout, "tail -c 5000 2>&1", this->stdOutFile); if (ret != 0) { - stdout = "Reading " + this->stdOutFile + " failed: " + stdout; + std::ostringstream stream; + stream << "Reading " << this->stdOutFile << " failed with exitcode " << ret << ": " << stdout; + stdout = stream.str(); } - if (this->stdOutFile == this->stdErrFile) + output = stdout; + + if (this->stdOutFile != this->stdErrFile) { - return stdout; + std::string stderr; + ret = System::ExecuteCommandOut(stderr, "tail -c 5000 2>&1", this->stdErrFile); + if (ret != 0) + { + std::ostringstream stream; + stream << "Reading " << this->stdErrFile << " failed with exitcode " << ret << ": " << stderr; + stderr = stream.str(); + } + + output = String::Join("\n", "STDOUT:", stdout, "STDERR:", stderr); } - std::string stderr; - ret = System::ExecuteCommandOut(stderr, "2>&1 tail -c 5000", this->stdErrFile); - if (ret != 0) - { - stderr = "Reading " + this->stdErrFile + " failed: " + stderr; - } - - return "STDOUT:\n" + stdout + "\nSTDERR:\n" + stderr; + return output; } \ No newline at end of file diff --git a/nodemanager/core/Process.h b/nodemanager/core/Process.h index a9fff8c..e525fe0 100644 --- a/nodemanager/core/Process.h +++ b/nodemanager/core/Process.h @@ -70,6 +70,11 @@ namespace hpc protected: private: + static bool StartWithHttpOrHttps(const std::string& path) + { + return boost::algorithm::starts_with(path, "http://") || boost::algorithm::starts_with(path, "https://"); + } + void SetExitCode(int exitCode) { this->exitCode = exitCode; diff --git a/nodemanager/core/RemoteExecutor.cpp b/nodemanager/core/RemoteExecutor.cpp index 6b1459f..e3fe980 100644 --- a/nodemanager/core/RemoteExecutor.cpp +++ b/nodemanager/core/RemoteExecutor.cpp @@ -733,8 +733,8 @@ pplx::task RemoteExecutor::PeekTaskOutput(hpc::arguments::PeekTaskO if (taskInfo) { Logger::Debug(args.JobId, args.TaskId, taskInfo->GetTaskRequeueCount(), - "PeekTaskOutput for ProcessKey {0}, processes count {1}", - taskInfo->ProcessKey, this->processes.size()); + "PeekTaskOutput for ProcessKey {0}, processes count {1}", + taskInfo->ProcessKey, this->processes.size()); auto p = this->processes.find(taskInfo->ProcessKey); if (p != this->processes.end()) @@ -746,7 +746,7 @@ pplx::task RemoteExecutor::PeekTaskOutput(hpc::arguments::PeekTaskO catch (const std::exception& ex) { Logger::Warn(args.JobId, args.TaskId, this->UnknowId, "Exception when peeking task output: {0}", ex.what()); - output = "Failed to get the output."; + output = "NodeManager: Failed to get the output."; } return pplx::task_from_result(json::value::string(output));