This commit is contained in:
Evan Cui 2015-04-01 21:38:15 +08:00
Родитель 5d826c8caa
Коммит 579a57bfa5
2 изменённых файлов: 75 добавлений и 67 удалений

Просмотреть файл

@ -36,7 +36,7 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c
if (taskInfo->TaskRequeueCount < args.StartInfo.TaskRequeueCount)
{
Logger::Info("Change the task {0} requeue count from {1} to {2}", args.TaskId, taskInfo->TaskRequeueCount, args.StartInfo.TaskRequeueCount);
Logger::Info("Task {0}: Change requeue count from {1} to {2}", args.TaskId, taskInfo->TaskRequeueCount, args.StartInfo.TaskRequeueCount);
taskInfo->TaskRequeueCount = args.StartInfo.TaskRequeueCount;
}
@ -63,31 +63,39 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c
try
{
taskInfo->Exited = true;
taskInfo->ExitCode = exitCode;
taskInfo->Message = std::move(message);
taskInfo->KernelProcessorTime = kernelTime.tv_sec * 1000000 + kernelTime.tv_usec;
taskInfo->UserProcessorTime = userTime.tv_sec * 1000000 + userTime.tv_usec;
auto jsonBody = taskInfo->ToJson();
Logger::Debug("Callback to {0} with {1}", callbackUri, jsonBody);
client::http_client_config config;
config.set_validate_certificates(false);
client::http_client client(callbackUri, config);
client.request(methods::POST, "", jsonBody).then([&callbackUri, this, taskInfo](http_response response)
if (taskInfo->Exited)
{
Logger::Info("Callback to {0} response code {1}", callbackUri, response.status_code());
this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId);
}).wait();
Logger::Debug("Task {0}: Ended already by EndTask.", taskInfo->TaskId);
}
else
{
taskInfo->Exited = true;
taskInfo->ExitCode = exitCode;
taskInfo->Message = std::move(message);
taskInfo->KernelProcessorTime = kernelTime.tv_sec * 1000000 + kernelTime.tv_usec;
taskInfo->UserProcessorTime = userTime.tv_sec * 1000000 + userTime.tv_usec;
auto jsonBody = taskInfo->ToJson();
Logger::Debug("Task {0}: Callback to {1} with {2}", taskInfo->TaskId, callbackUri, jsonBody);
client::http_client_config config;
config.set_validate_certificates(false);
client::http_client client(callbackUri, config);
client.request(methods::POST, "", jsonBody).then([&callbackUri, this, taskInfo](http_response response)
{
Logger::Info("Task {0}: Callback to {1} response code {2}", taskInfo->TaskId, callbackUri, response.status_code());
}).wait();
}
}
catch (const std::exception& ex)
{
Logger::Error("Exception when sending back task result. {0}", ex.what());
}
this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId);
// Process will be deleted here.
this->processes.erase(taskInfo->TaskId);
Logger::Debug("erased process");
Logger::Debug("Task {0}: erased process", taskInfo->TaskId);
}));
this->processes[args.TaskId] = process;
@ -138,6 +146,7 @@ json::value RemoteExecutor::EndTask(hpc::arguments::EndTaskArgs&& args)
auto taskInfo = this->jobTaskTable.GetTask(args.JobId, args.TaskId);
this->TerminateTask(args.TaskId);
this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId);
json::value jsonBody;
if (taskInfo)
@ -149,7 +158,6 @@ json::value RemoteExecutor::EndTask(hpc::arguments::EndTaskArgs&& args)
}
return jsonBody;
}
json::value RemoteExecutor::Ping(const std::string& callbackUri)

Просмотреть файл

@ -1,49 +1,49 @@
#ifndef TASKINFO_H
#define TASKINFO_H
#include <cpprest/json.h>
#include <vector>
#include <string>
#include <memory>
namespace hpc
{
namespace data
{
struct TaskInfo
{
public:
TaskInfo(int jobId, int taskId, const std::string& nodeName) :
NodeName(nodeName), JobId(jobId), TaskId(taskId), TaskRequeueCount(0),
ExitCode(0), Exited(false), KernelProcessorTime(0), UserProcessorTime(0),
WorkingSet(0), NumberOfProcesses(0), IsPrimaryTask(true)
{
}
TaskInfo(TaskInfo&& t) = default;
web::json::value ToJson() const;
const std::string& NodeName;
int JobId;
int TaskId;
int TaskRequeueCount;
int ExitCode;
bool Exited;
long long KernelProcessorTime;
long long UserProcessorTime;
int WorkingSet;
int NumberOfProcesses;
bool IsPrimaryTask;
std::string Message;
std::vector<int> ProcessIds;
protected:
private:
};
}
}
#endif // TASKINFO_H
#ifndef TASKINFO_H
#define TASKINFO_H
#include <cpprest/json.h>
#include <vector>
#include <string>
#include <memory>
namespace hpc
{
namespace data
{
struct TaskInfo
{
public:
TaskInfo(int jobId, int taskId, const std::string& nodeName) :
NodeName(nodeName), JobId(jobId), TaskId(taskId), TaskRequeueCount(0),
ExitCode(0), Exited(false), KernelProcessorTime(0), UserProcessorTime(0),
WorkingSet(0), NumberOfProcesses(0), IsPrimaryTask(true)
{
}
TaskInfo(TaskInfo&& t) = default;
web::json::value ToJson() const;
const std::string& NodeName;
int JobId;
int TaskId;
int TaskRequeueCount;
int ExitCode;
bool Exited;
long long KernelProcessorTime;
long long UserProcessorTime;
int WorkingSet;
int NumberOfProcesses;
bool IsPrimaryTask;
std::string Message;
std::vector<int> ProcessIds;
protected:
private:
};
}
}
#endif // TASKINFO_H