This commit is contained in:
Chenling Zhang 2017-09-14 17:07:03 +08:00
Родитель 899827eb64 09dd23127b
Коммит b56f10d590
9 изменённых файлов: 134 добавлений и 16 удалений

Просмотреть файл

@ -0,0 +1,20 @@
#include "PeekTaskOutputArgs.h"
#include "../utils/JsonHelper.h"
using namespace hpc::arguments;
using namespace hpc::utils;
PeekTaskOutputArgs::PeekTaskOutputArgs(int jobId, int taskId)
: JobId(jobId), TaskId(taskId)
{
//ctor
}
PeekTaskOutputArgs PeekTaskOutputArgs::FromJson(const json::value& j)
{
PeekTaskOutputArgs args(
JsonHelper<int>::Read("JobId", j),
JsonHelper<int>::Read("TaskId", j));
return std::move(args);
}

Просмотреть файл

@ -0,0 +1,26 @@
#ifndef PEEKTASKOUTPUTARGS_H
#define PEEKTASKOUTPUTARGS_H
#include <cpprest/json.h>
namespace hpc
{
namespace arguments
{
struct PeekTaskOutputArgs
{
public:
PeekTaskOutputArgs(int jobId, int taskId);
int JobId;
int TaskId;
static PeekTaskOutputArgs FromJson(const web::json::value& jsonValue);
protected:
private:
};
}
}
#endif // PEEKTASKOUTPUTARGS_H

Просмотреть файл

@ -6,6 +6,7 @@
#include "../arguments/EndJobArgs.h"
#include "../arguments/EndTaskArgs.h"
#include "../arguments/MetricCountersConfig.h"
#include "../arguments/PeekTaskOutputArgs.h"
namespace hpc
{
@ -21,6 +22,7 @@ namespace hpc
virtual pplx::task<web::json::value> Ping(std::string&& callbackUri) = 0;
virtual pplx::task<web::json::value> Metric(std::string&& callbackUri) = 0;
virtual pplx::task<web::json::value> MetricConfig(hpc::arguments::MetricCountersConfig&& config, std::string&& callbackUri) = 0;
virtual pplx::task<web::json::value> PeekTaskOutput(hpc::arguments::PeekTaskOutputArgs&& args) = 0;
};
}
}

Просмотреть файл

@ -39,8 +39,7 @@ Process::Process(
workDirectory(workDir), userName(user.empty() ? "root" : user), dumpStdout(dumpStdoutToExecutionMessage),
affinity(cpuAffinity), environments(envi), callback(completed), processId(0)
{
this->streamOutput = boost::algorithm::starts_with(stdOutFile, "http://") ||
boost::algorithm::starts_with(stdOutFile, "https://");
this->streamOutput = StartWithHttpOrHttps(stdOutFile);
Logger::Debug(this->jobId, this->taskId, this->requeueCount, "{0}, stream ? {1}", stdOutFile, this->streamOutput);
}
@ -362,7 +361,7 @@ void Process::Monitor()
if (!this->streamOutput)
{
std::string output;
int ret = 0;
if (this->dumpStdout)
{
@ -527,24 +526,16 @@ std::string Process::BuildScript()
std::ofstream fs(runDirInOut, std::ios::trunc);
fs << "#!/bin/bash" << std::endl << std::endl;
fs << "cd ";
Logger::Debug("{0}, {1}", this->taskFolder, this->workDirectory);
if (this->workDirectory.empty())
{
fs << this->taskFolder;
}
else
{
fs << this->workDirectory;
}
fs << " || exit $?";
fs << std::endl << std::endl;
std::string workDirectory = this->workDirectory.empty() ? this->taskFolder : this->workDirectory;
fs << "cd " << workDirectory << " || exit $?" << std::endl << std::endl;
if (this->stdOutFile.empty()) this->stdOutFile = this->taskFolder + "/stdout.txt";
else if (!boost::algorithm::starts_with(this->stdOutFile, "/") && !StartWithHttpOrHttps(this->stdOutFile)) this->stdOutFile = workDirectory + "/" + this->stdOutFile;
if (this->stdErrFile.empty()) this->stdErrFile = this->taskFolder + "/stderr.txt";
else if (!boost::algorithm::starts_with(this->stdErrFile, "/") && !StartWithHttpOrHttps(this->stdErrFile)) this->stdErrFile = workDirectory + "/" + this->stdErrFile;
// before
fs << "echo before >" << this->taskFolder << "/before1.txt 2>" << this->taskFolder << "/before2.txt";
@ -630,3 +621,36 @@ std::unique_ptr<const char* []> Process::PrepareEnvironment()
return std::move(envi);
}
std::string Process::PeekOutput()
{
std::string output;
int ret = 0;
std::string stdout;
ret = System::ExecuteCommandOut(stdout, "tail -c 5000 2>&1", this->stdOutFile);
if (ret != 0)
{
std::ostringstream stream;
stream << "Reading " << this->stdOutFile << " failed with exitcode " << ret << ": " << stdout;
stdout = stream.str();
}
output = stdout;
if (this->stdOutFile != this->stdErrFile)
{
std::string stderr;
ret = System::ExecuteCommandOut(stderr, "tail -c 5000 2>&1", this->stdErrFile);
if (ret != 0)
{
std::ostringstream stream;
stream << "Reading " << this->stdErrFile << " failed with exitcode " << ret << ": " << stderr;
stderr = stream.str();
}
output = String::Join("\n", "STDOUT:", stdout, "STDERR:", stderr);
}
return output;
}

Просмотреть файл

@ -66,8 +66,15 @@ namespace hpc
void SetSelfPtr(std::shared_ptr<Process> self) { this->selfPtr.swap(self); }
void ResetSelfPtr() { this->selfPtr.reset(); }
std::string PeekOutput();
protected:
private:
static bool StartWithHttpOrHttps(const std::string& path)
{
return boost::algorithm::starts_with(path, "http://") || boost::algorithm::starts_with(path, "https://");
}
void SetExitCode(int exitCode)
{
this->exitCode = exitCode;

Просмотреть файл

@ -42,6 +42,7 @@ RemoteCommunicator::RemoteCommunicator(IRemoteExecutor& exec, const http_listene
this->processors["ping"] = [this] (auto&& j, auto&& c) { return this->Ping(std::move(j), std::move(c)); };
this->processors["metric"] = [this] (auto&& j, auto&& c) { return this->Metric(std::move(j), std::move(c)); };
this->processors["metricconfig"] = [this] (auto&& j, auto&& c) { return this->MetricConfig(std::move(j), std::move(c)); };
this->processors["peektaskoutput"] = [this] (auto&& j, auto&& c) { return this->PeekTaskOutput(std::move(j), std::move(c)); };
}
RemoteCommunicator::~RemoteCommunicator()
@ -293,5 +294,11 @@ pplx::task<json::value> RemoteCommunicator::MetricConfig(json::value&& val, std:
return this->executor.MetricConfig(std::move(args), std::move(callbackUri));
}
pplx::task<json::value> RemoteCommunicator::PeekTaskOutput(json::value&& val, std::string&& callbackUri)
{
auto args = PeekTaskOutputArgs::FromJson(val);
return this->executor.PeekTaskOutput(std::move(args));
}
const std::string RemoteCommunicator::ApiSpace = "api";

Просмотреть файл

@ -62,6 +62,7 @@ namespace hpc
pplx::task<json::value> Ping(json::value&& val, std::string&&);
pplx::task<json::value> Metric(json::value&& val, std::string&&);
pplx::task<json::value> MetricConfig(json::value&& val, std::string&&);
pplx::task<json::value> PeekTaskOutput(json::value&& val, std::string&&);
static const std::string ApiSpace;
const std::string listeningUri;

Просмотреть файл

@ -722,3 +722,32 @@ void RemoteExecutor::ResyncAndInvalidateCache()
NamingClient::InvalidateCache();
}
pplx::task<json::value> RemoteExecutor::PeekTaskOutput(hpc::arguments::PeekTaskOutputArgs&& args)
{
Logger::Info(args.JobId, args.TaskId, this->UnknowId, "Peeking task output.");
std::string output;
try
{
auto taskInfo = this->jobTaskTable.GetTask(args.JobId, args.TaskId);
if (taskInfo)
{
Logger::Debug(args.JobId, args.TaskId, taskInfo->GetTaskRequeueCount(),
"PeekTaskOutput for ProcessKey {0}, processes count {1}",
taskInfo->ProcessKey, this->processes.size());
auto p = this->processes.find(taskInfo->ProcessKey);
if (p != this->processes.end())
{
output = p->second->PeekOutput();
}
}
}
catch (const std::exception& ex)
{
Logger::Warn(args.JobId, args.TaskId, this->UnknowId, "Exception when peeking task output: {0}", ex.what());
output = "NodeManager: Failed to get the output.";
}
return pplx::task_from_result(json::value::string(output));
}

Просмотреть файл

@ -30,6 +30,8 @@ namespace hpc
virtual pplx::task<web::json::value> Ping(std::string&& callbackUri);
virtual pplx::task<web::json::value> Metric(std::string&& callbackUri);
virtual pplx::task<web::json::value> MetricConfig(hpc::arguments::MetricCountersConfig&& config, std::string&& callbackUri);
virtual pplx::task<web::json::value> PeekTaskOutput(hpc::arguments::PeekTaskOutputArgs&& args);
protected:
private:
static void* GracePeriodElapsed(void* data);