347 строки
9.1 KiB
C++
347 строки
9.1 KiB
C++
#include "JobTaskDb.h"
|
|
#include "RemotingExecutor.h"
|
|
|
|
void* MetricThread(void* arg)
|
|
{
|
|
std::cout << "Start report thread." << std::endl;
|
|
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
|
|
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
|
|
|
|
while (true)
|
|
{
|
|
std::string metricUri = JobTaskDb::GetInstance().GetMetricReportUri();
|
|
|
|
if (!metricUri.empty())
|
|
{
|
|
HandleJson::Metric(metricUri);
|
|
}
|
|
|
|
sleep(Monitoring::Interval);
|
|
}
|
|
|
|
pthread_exit(0);
|
|
}
|
|
|
|
void* ReportingThread(void* arg)
|
|
{
|
|
std::cout << "Start metric thread." << std::endl;
|
|
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
|
|
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
|
|
|
|
while (true)
|
|
{
|
|
std::string reportUri = JobTaskDb::GetInstance().GetReportUri();
|
|
|
|
if (!reportUri.empty())
|
|
{
|
|
HandleJson::Ping(reportUri);
|
|
}
|
|
|
|
sleep(30);
|
|
}
|
|
|
|
pthread_exit(0);
|
|
}
|
|
|
|
JobTaskDb::JobTaskDb() : executor(new Executor())
|
|
{
|
|
// start monitoring service
|
|
monitoring.Start();
|
|
|
|
pthread_mutex_init(&jobTaskDbLock, NULL);
|
|
|
|
pthread_create(&(this->threadId), NULL, ReportingThread, NULL);
|
|
|
|
pthread_create(&(this->metricThreadId), NULL, MetricThread, NULL);
|
|
}
|
|
|
|
JobTaskDb::~JobTaskDb()
|
|
{
|
|
if (executor != NULL)
|
|
delete executor;
|
|
|
|
// stop monitoring service
|
|
monitoring.Stop();
|
|
|
|
pthread_mutex_destroy(&jobTaskDbLock);
|
|
|
|
if (threadId != 0){
|
|
pthread_cancel(threadId);
|
|
pthread_join(threadId, NULL);
|
|
}
|
|
|
|
if (metricThreadId != 0){
|
|
pthread_cancel(metricThreadId);
|
|
pthread_join(metricThreadId, NULL);
|
|
}
|
|
}
|
|
|
|
void JobTaskDb::SetReportUri(const std::string& reportUri)
|
|
{
|
|
pthread_mutex_lock(&jobTaskDbLock);
|
|
this->reportUri = reportUri;
|
|
this->metricReportUri = GetMetricReportUri(this->reportUri);
|
|
|
|
std::ofstream uriFile("ReportUri", std::ios::trunc);
|
|
uriFile << reportUri;
|
|
|
|
std::cout << "ReportUri recorded: " << reportUri << std::endl;
|
|
|
|
pthread_mutex_unlock(&jobTaskDbLock);
|
|
}
|
|
|
|
const std::string JobTaskDb::GetReportUri()
|
|
{
|
|
return this->reportUri;
|
|
}
|
|
|
|
const std::string JobTaskDb::GetMetricReportUri()
|
|
{
|
|
return this->metricReportUri;
|
|
}
|
|
|
|
std::string JobTaskDb::GetMetricReportUri(std::string & reportUri)
|
|
{
|
|
size_t found = reportUri.find_last_of('/');
|
|
if (found != std::string::npos){
|
|
std::string metricReportUri = reportUri.substr(0, found + 1) + "metricreported";
|
|
std::cout << "GetMetricReportUri : " << metricReportUri << std::endl;
|
|
return metricReportUri;
|
|
}
|
|
return "";
|
|
}
|
|
|
|
JobTaskDb* JobTaskDb::instance = NULL;
|
|
|
|
json::value UMID::GetJson() const
|
|
{
|
|
json::value obj;
|
|
obj[U("MetricId")] = json::value(this->MetricId);
|
|
obj[U("InstanceId")] = json::value(this->InstanceId);
|
|
return obj;
|
|
}
|
|
|
|
json::value ComputeNodeMetricInformation::GetJson() const
|
|
{
|
|
json::value obj;
|
|
obj[U("Name")] = json::value::string(this->Name);
|
|
obj[U("Time")] = json::value::string(this->Time);
|
|
|
|
std::vector<json::value> arr1;
|
|
std::vector<json::value> arr2;
|
|
for (std::map<int, UMID*>::const_iterator it = this->Umids.begin();
|
|
it != this->Umids.end();
|
|
it++)
|
|
{
|
|
int no = it->first;
|
|
arr1.push_back(it->second->GetJson());
|
|
arr2.push_back(json::value::number(this->Values.at(no)));
|
|
}
|
|
|
|
obj[U("Umids")] = json::value::array(arr1);
|
|
obj[U("Values")] = json::value::array(arr2);
|
|
|
|
obj[U("TickCount")] = json::value::number(this->TickCount);
|
|
return obj;
|
|
}
|
|
|
|
json::value ComputeClusterTaskInformation::GetJson() const
|
|
{
|
|
json::value obj;
|
|
obj[U("ExitCode")] = json::value::number(this->ExitCode);
|
|
obj[U("Exited")] = json::value::boolean(this->Exited);
|
|
obj[U("KernelProcessorTime")] = json::value((int64_t)this->KernelProcessorTime);
|
|
obj[U("NumberOfProcesses")] = json::value::number(this->NumberOfProcesses);
|
|
obj[U("PrimaryTask")] = json::value::boolean(this->PrimaryTask);
|
|
obj[U("ProcessIds")] = json::value::string(this->ProcessIds);
|
|
obj[U("TaskId")] = json::value::number(this->TaskId);
|
|
obj[U("TaskRequeueCount")] = json::value::number(this->TaskRequeueCount);
|
|
obj[U("UserProcessorTime")] = json::value((int64_t)this->UserProcessorTime);
|
|
obj[U("WorkingSet")] = json::value::number(this->WorkingSet);
|
|
obj[U("Message")] = json::value::string(this->Message);
|
|
return obj;
|
|
}
|
|
|
|
json::value ComputeClusterTaskInformation::GetEventArgJson() const
|
|
{
|
|
json::value obj;
|
|
obj[U("JobId")] = json::value::number(this->JobId);
|
|
obj[U("TaskInfo")] = this->GetJson();
|
|
return obj;
|
|
}
|
|
|
|
json::value ComputeClusterJobInformation::GetJson() const
|
|
{
|
|
json::value obj;
|
|
obj[U("JobId")] = json::value::number(this->JobId);
|
|
std::vector<json::value> ans;
|
|
|
|
for (std::map<int, ComputeClusterTaskInformation*>::const_iterator it = this->Tasks.begin();
|
|
it != this->Tasks.end();
|
|
it ++)
|
|
{
|
|
ans.push_back(it->second->GetJson());
|
|
}
|
|
|
|
obj[U("Tasks")] = json::value::array(ans);
|
|
return obj;
|
|
}
|
|
|
|
json::value ComputeClusterNodeInformation::GetJson() const
|
|
{
|
|
json::value obj;
|
|
obj[U("Availability")] = json::value::number(this->Availability);
|
|
obj[U("JustStarted")] = json::value::boolean(this->JustStarted);
|
|
obj[U("MacAddress")] = json::value::string(this->MacAddress);
|
|
obj[U("Name")] = json::value::string(this->Name);
|
|
std::vector<json::value> ans;
|
|
|
|
for (std::map<int, ComputeClusterJobInformation*>::const_iterator it = this->Jobs.begin();
|
|
it != this->Jobs.end();
|
|
it ++)
|
|
{
|
|
ans.push_back(it->second->GetJson());
|
|
}
|
|
|
|
obj[U("Jobs")] = json::value::array(ans);
|
|
return obj;
|
|
}
|
|
|
|
json::value JobTaskDb::GetNodeInfo()
|
|
{
|
|
pthread_mutex_lock(&jobTaskDbLock);
|
|
json::value obj = this->nodeInfo.GetJson();
|
|
this->nodeInfo.JustStarted = false;
|
|
pthread_mutex_unlock(&jobTaskDbLock);
|
|
|
|
return obj;
|
|
}
|
|
|
|
json::value JobTaskDb::GetMetricInfo()
|
|
{
|
|
ComputeNodeMetricInformation cnmi(nodeInfo.Name);
|
|
// add metric data
|
|
UMID cpuUsage(1, 1);
|
|
cnmi.Umids[0] = &cpuUsage;
|
|
cnmi.Values[0] = monitoring.GetCpuUsage();
|
|
|
|
UMID memoryUsage(3, 0);
|
|
cnmi.Umids[1] = &memoryUsage;
|
|
cnmi.Values[1] = monitoring.GetAvailableMemory();
|
|
|
|
UMID networkUsage(12, 1);
|
|
cnmi.Umids[2] = &networkUsage;
|
|
cnmi.Values[2] = monitoring.GetNetworkUsage();
|
|
|
|
json::value obj = cnmi.GetJson();
|
|
return obj;
|
|
}
|
|
|
|
void JobTaskDb::StartJobAndTask(int jobId, int taskId, ProcessStartInfo* startInfo, const std::string& callbackUri)
|
|
{
|
|
std::cout << "Enter StartJobAndTask" << std::endl;
|
|
|
|
pthread_mutex_lock(&jobTaskDbLock);
|
|
|
|
std::cout << "GetLock" << std::endl;
|
|
ComputeClusterJobInformation* jobInfo = NULL;
|
|
|
|
std::map<int, ComputeClusterJobInformation*>::iterator it = this->nodeInfo.Jobs.find(jobId);
|
|
|
|
std::cout << "JobInfoFound" << std::endl;
|
|
if (it == this->nodeInfo.Jobs.end())
|
|
{
|
|
jobInfo = new ComputeClusterJobInformation(jobId);
|
|
this->nodeInfo.Jobs[jobId] = jobInfo;
|
|
std::cout << "Job added" << std::endl;
|
|
}
|
|
else
|
|
{
|
|
jobInfo = it->second;
|
|
}
|
|
|
|
ComputeClusterTaskInformation* taskInfo;
|
|
std::map<int, ComputeClusterTaskInformation*>::iterator taskIt = jobInfo->Tasks.find(taskId);
|
|
std::cout << "TaskInfoFound" << std::endl;
|
|
|
|
if (taskIt == jobInfo->Tasks.end())
|
|
{
|
|
std::cout << "Before EXE startTask" << std::endl;
|
|
taskInfo = this->executor->StartTask(jobId, taskId, startInfo, callbackUri);
|
|
std::cout << "After Exe StartTask" << std::endl;
|
|
jobInfo->Tasks[taskId] = taskInfo;
|
|
}
|
|
else
|
|
{
|
|
taskInfo = taskIt->second;
|
|
}
|
|
|
|
pthread_mutex_unlock(&jobTaskDbLock);
|
|
}
|
|
|
|
void JobTaskDb::EndJob(int jobId)
|
|
{
|
|
pthread_mutex_lock(&jobTaskDbLock);
|
|
|
|
ComputeClusterJobInformation* jobInfo = NULL;
|
|
std::map<int, ComputeClusterJobInformation*>::iterator it = this->nodeInfo.Jobs.find(jobId);
|
|
|
|
if (it != this->nodeInfo.Jobs.end())
|
|
{
|
|
jobInfo = it->second;
|
|
|
|
for (std::map<int, ComputeClusterTaskInformation*>::iterator taskIt = jobInfo->Tasks.begin(); taskIt != jobInfo->Tasks.end(); taskIt++)
|
|
{
|
|
this->executor->EndTask(taskIt->second);
|
|
}
|
|
|
|
delete jobInfo;
|
|
|
|
this->nodeInfo.Jobs.erase(jobId);
|
|
}
|
|
|
|
pthread_mutex_unlock(&jobTaskDbLock);
|
|
}
|
|
|
|
void JobTaskDb::EndTask(int jobId, int taskId)
|
|
{
|
|
pthread_mutex_lock(&jobTaskDbLock);
|
|
|
|
ComputeClusterJobInformation* jobInfo = NULL;
|
|
|
|
std::map<int, ComputeClusterJobInformation*>::iterator it = this->nodeInfo.Jobs.find(jobId);
|
|
if (it != this->nodeInfo.Jobs.end())
|
|
{
|
|
jobInfo = it->second;
|
|
|
|
std::map<int, ComputeClusterTaskInformation*>::iterator taskIt = jobInfo->Tasks.find(taskId);
|
|
if (taskIt != jobInfo->Tasks.end())
|
|
{
|
|
this->executor->EndTask(taskIt->second);
|
|
}
|
|
}
|
|
|
|
pthread_mutex_unlock(&jobTaskDbLock);
|
|
}
|
|
|
|
void JobTaskDb::RemoveTask(int jobId, int taskId)
|
|
{
|
|
pthread_mutex_lock(&jobTaskDbLock);
|
|
|
|
ComputeClusterJobInformation* jobInfo = NULL;
|
|
|
|
std::map<int, ComputeClusterJobInformation*>::iterator it = this->nodeInfo.Jobs.find(jobId);
|
|
if (it != this->nodeInfo.Jobs.end())
|
|
{
|
|
jobInfo = it->second;
|
|
|
|
std::map<int, ComputeClusterTaskInformation*>::iterator taskIt = jobInfo->Tasks.find(taskId);
|
|
if (taskIt != jobInfo->Tasks.end())
|
|
{
|
|
jobInfo->Tasks.erase(taskId);
|
|
}
|
|
}
|
|
|
|
pthread_mutex_unlock(&jobTaskDbLock);
|
|
}
|