Fix the unit test failure, enabled cancellation token for classes, added version info

This commit is contained in:
evanc 2017-09-26 08:30:19 -07:00
Родитель 1c5753b412
Коммит 3b9a07ce47
20 изменённых файлов: 94 добавлений и 66 удалений

Просмотреть файл

@ -567,6 +567,13 @@ namespace hpc
"Added task completion uri configuration",
}
},
{ "2.3.1.0",
{
"Added docker support",
"Added support to peek the task output",
"Fixed the unit test failure",
}
},
};
return versionHistory;

Просмотреть файл

@ -12,7 +12,7 @@ using namespace hpc::data;
using namespace web::http;
using namespace hpc::core;
HostsManager::HostsManager(std::function<std::string()> getHostsUri, int fetchInterval)
HostsManager::HostsManager(std::function<std::string(pplx::cancellation_token)> getHostsUri, int fetchInterval)
{
this->hostsFetcher =
std::unique_ptr<HttpFetcher>(

Просмотреть файл

@ -18,7 +18,7 @@ namespace hpc
const std::string HPCHostEntryPattern = R"delimiter(^([0-9\.]+)\s+([^\s#]+)\s+#HPC\s*$)delimiter";
const std::string UpdateIdHeaderName = "UpdateId";
HostsManager(std::function<std::string()> getHostsUri, int fetchInterval);
HostsManager(std::function<std::string(pplx::cancellation_token)> getHostsUri, int fetchInterval);
~HostsManager() { this->Stop(); }
void Start() { this->hostsFetcher->Start(); }

Просмотреть файл

@ -12,7 +12,7 @@ int HttpFetcher::Report()
std::string uri;
try
{
uri = this->getReportUri();
uri = this->getReportUri(this->cts.get_token());
auto client = HttpHelper::GetHttpClient(uri);
auto request = HttpHelper::GetHttpRequest(methods::GET);

Просмотреть файл

@ -17,7 +17,7 @@ namespace hpc
public:
HttpFetcher(
const std::string& name,
std::function<std::string()> getReportUri,
std::function<std::string(pplx::cancellation_token)> getReportUri,
int hold,
int interval,
std::function<bool(http::http_request&)> requestHandler,
@ -40,7 +40,6 @@ namespace hpc
private:
std::function<bool(http::http_request&)> requestHandler;
std::function<bool(http::http_response&)> responseHandler;
pplx::cancellation_token_source cts;
};
}
}

Просмотреть файл

@ -13,8 +13,9 @@ int HttpReporter::Report()
try
{
uri = this->getReportUri();
uri = this->getReportUri(this->cts.get_token());
if (this->cts.get_token().is_canceled()) return -1;
auto jsonBody = this->valueFetcher();
if (jsonBody.is_null())
{

Просмотреть файл

@ -18,7 +18,7 @@ namespace hpc
public:
HttpReporter(
const std::string& reporterName,
std::function<std::string()> getUri,
std::function<std::string(pplx::cancellation_token)> getUri,
int hold,
int interval,
std::function<json::value()> fetcher,
@ -37,7 +37,6 @@ namespace hpc
protected:
private:
pplx::cancellation_token_source cts;
};
}
}

Просмотреть файл

@ -2,7 +2,7 @@
using namespace hpc::core;
void MetricCollectorBase::ApplyConfig(const MetricCounter& config)
void MetricCollectorBase::ApplyConfig(const MetricCounter& config, pplx::cancellation_token token)
{
this->metricId = config.MetricId;
@ -15,7 +15,7 @@ void MetricCollectorBase::ApplyConfig(const MetricCounter& config)
if (!instanceNames.empty())
{
auto client = HttpHelper::GetHttpClient(NodeManagerConfig::ResolveMetricInstanceIdsUri());
auto client = HttpHelper::GetHttpClient(NodeManagerConfig::ResolveMetricInstanceIdsUri(token));
json::value jsonBody = JsonHelper<std::vector<std::string>>::ToJson(instanceNames);
auto request = HttpHelper::GetHttpRequest(web::http::methods::POST, jsonBody);

Просмотреть файл

@ -27,7 +27,7 @@ namespace hpc
MetricCollectorBase() = default;
void ApplyConfig(const MetricCounter& config);
void ApplyConfig(const MetricCounter& config, pplx::cancellation_token token);
void Reset()
{

Просмотреть файл

@ -333,6 +333,7 @@ Monitor::~Monitor()
{
if (this->threadId != 0)
{
// todo: graceful exit the thread.
pthread_cancel(this->threadId);
pthread_join(this->threadId, nullptr);
}
@ -345,7 +346,7 @@ void Monitor::SetNodeUuid(const uuid& id)
this->packet.Uuid.AssignFrom(id);
}
void Monitor::ApplyMetricConfig(MetricCountersConfig&& config)
void Monitor::ApplyMetricConfig(MetricCountersConfig&& config, pplx::cancellation_token token)
{
WriterLock writerLock(&this->lock);
@ -353,7 +354,7 @@ void Monitor::ApplyMetricConfig(MetricCountersConfig&& config)
for (auto& counter : config.MetricCounters)
{
if (!this->EnableMetricCounter(counter))
if (!this->EnableMetricCounter(counter, token))
{
Logger::Debug("Disabled counter MetricId {0}, InstanceId {1}, InstanceName {2} Path {3}", counter.MetricId, counter.InstanceId, counter.InstanceName, counter.Path);
}
@ -364,12 +365,12 @@ void Monitor::ApplyMetricConfig(MetricCountersConfig&& config)
}
}
bool Monitor::EnableMetricCounter(const MetricCounter& counterConfig)
bool Monitor::EnableMetricCounter(const MetricCounter& counterConfig, pplx::cancellation_token token)
{
auto collector = this->collectors.find(counterConfig.Path);
if (collector != this->collectors.end())
{
collector->second->ApplyConfig(counterConfig);
collector->second->ApplyConfig(counterConfig, token);
return true;
}

Просмотреть файл

@ -32,11 +32,11 @@ namespace hpc
json::value GetRegisterInfo();
void SetNodeUuid(const uuid& id);
void ApplyMetricConfig(hpc::arguments::MetricCountersConfig&& config);
void ApplyMetricConfig(hpc::arguments::MetricCountersConfig&& config, pplx::cancellation_token token);
protected:
private:
bool EnableMetricCounter(const hpc::arguments::MetricCounter& counterConfig);
bool EnableMetricCounter(const hpc::arguments::MetricCounter& counterConfig, pplx::cancellation_token token);
void Run();
static void* MonitoringThread(void* arg);

Просмотреть файл

@ -23,7 +23,7 @@ void NamingClient::InvalidateCache()
}
}
std::string NamingClient::GetServiceLocation(const std::string& serviceName)
std::string NamingClient::GetServiceLocation(const std::string& serviceName, pplx::cancellation_token token)
{
std::map<std::string, std::string>::iterator location;
@ -34,20 +34,28 @@ std::string NamingClient::GetServiceLocation(const std::string& serviceName)
if (location == this->serviceLocations.end())
{
WriterLock writerLock(&this->lock);
Logger::Debug("ResolveServiceLocation> there is no entry for {0}", serviceName);
for (auto kvp : this->serviceLocations)
{
Logger::Debug("ResolveServiceLocation> entry {0} = {1}", kvp.first, kvp.second);
ReaderLock readerLock(&this->lock);
Logger::Debug("ResolveServiceLocation> there is no entry for {0}", serviceName);
for (auto kvp : this->serviceLocations)
{
Logger::Debug("ResolveServiceLocation> entry {0} = {1}", kvp.first, kvp.second);
}
location = this->serviceLocations.find(serviceName);
}
location = this->serviceLocations.find(serviceName);
if (location == this->serviceLocations.end())
{
std::string temp;
this->RequestForServiceLocation(serviceName, temp);
this->serviceLocations[serviceName] = temp;
location = this->serviceLocations.find(serviceName);
this->RequestForServiceLocation(serviceName, temp, token);
{
WriterLock writerLock(&this->lock);
this->serviceLocations[serviceName] = temp;
location = this->serviceLocations.find(serviceName);
}
}
}
@ -55,13 +63,13 @@ std::string NamingClient::GetServiceLocation(const std::string& serviceName)
return location->second;
}
void NamingClient::RequestForServiceLocation(const std::string& serviceName, std::string& serviceLocation)
void NamingClient::RequestForServiceLocation(const std::string& serviceName, std::string& serviceLocation, pplx::cancellation_token token)
{
int selected = rand() % this->namingServicesUri.size();
std::string uri;
int interval = this->intervalSeconds;
while (true)
while (!token.is_canceled())
{
try
{
@ -71,7 +79,7 @@ void NamingClient::RequestForServiceLocation(const std::string& serviceName, std
auto client = HttpHelper::GetHttpClient(uri);
auto request = HttpHelper::GetHttpRequest(methods::GET);
http_response response = client->request(*request, this->cts.get_token()).get();
http_response response = client->request(*request, token).get();
if (response.status_code() == http::status_codes::OK)
{
serviceLocation = JsonHelper<std::string>::FromJson(response.extract_json().get());
@ -96,6 +104,8 @@ void NamingClient::RequestForServiceLocation(const std::string& serviceName, std
Logger::Error("ResolveServiceLocation> Unknown error occurred when fetching from {0}", uri);
}
if (token.is_canceled()) break;
sleep(interval);
interval *= 2;
if (interval > 300) interval = 300;

Просмотреть файл

@ -35,12 +35,12 @@ namespace hpc
return instance;
}
std::string GetServiceLocation(const std::string& serviceName);
std::string GetServiceLocation(const std::string& serviceName, pplx::cancellation_token token);
static void InvalidateCache();
private:
void RequestForServiceLocation(const std::string& serviceName, std::string& serviceLocation);
void RequestForServiceLocation(const std::string& serviceName, std::string& serviceLocation, pplx::cancellation_token token);
int intervalSeconds;
std::map<std::string, std::string> serviceLocations;

Просмотреть файл

@ -43,38 +43,39 @@ namespace hpc
AddConfigurationItem(std::string, MetricUri);
AddConfigurationItem(std::string, HeartbeatUri);
AddConfigurationItem(std::string, TaskCompletionUri);
AddConfigurationItem(std::string, HostsFileUri);
static std::string ResolveRegisterUri()
static std::string ResolveRegisterUri(pplx::cancellation_token token)
{
std::string uri = NodeManagerConfig::GetRegisterUri();
return ResolveUri(uri, [](std::shared_ptr<NamingClient> namingClient) { return namingClient->GetServiceLocation(NodeManagerConfig::GetDefaultServiceName()); });
return ResolveUri(uri, [token](std::shared_ptr<NamingClient> namingClient) { return namingClient->GetServiceLocation(NodeManagerConfig::GetDefaultServiceName(), token); });
}
static std::string ResolveHeartbeatUri()
static std::string ResolveHeartbeatUri(pplx::cancellation_token token)
{
std::string uri = NodeManagerConfig::GetHeartbeatUri();
return ResolveUri(uri, [](std::shared_ptr<NamingClient> namingClient) { return namingClient->GetServiceLocation(NodeManagerConfig::GetDefaultServiceName()); });
return ResolveUri(uri, [token](std::shared_ptr<NamingClient> namingClient) { return namingClient->GetServiceLocation(NodeManagerConfig::GetDefaultServiceName(), token); });
}
static std::string ResolveMetricUri()
static std::string ResolveMetricUri(pplx::cancellation_token token)
{
std::string uri = NodeManagerConfig::GetMetricUri();
return ResolveUri(uri, [](std::shared_ptr<NamingClient> namingClient) { return namingClient->GetServiceLocation(NodeManagerConfig::GetUdpMetricServiceName()); });
return ResolveUri(uri, [token](std::shared_ptr<NamingClient> namingClient) { return namingClient->GetServiceLocation(NodeManagerConfig::GetUdpMetricServiceName(), token); });
}
static std::string ResolveHostsFileUri()
static std::string ResolveHostsFileUri(pplx::cancellation_token token)
{
std::string uri = NodeManagerConfig::GetHostsFileUri();
return ResolveUri(uri, [](std::shared_ptr<NamingClient> namingClient) { return namingClient->GetServiceLocation(NodeManagerConfig::GetDefaultServiceName()); });
return ResolveUri(uri, [token](std::shared_ptr<NamingClient> namingClient) { return namingClient->GetServiceLocation(NodeManagerConfig::GetDefaultServiceName(), token); });
}
static std::string ResolveMetricInstanceIdsUri()
static std::string ResolveMetricInstanceIdsUri(pplx::cancellation_token token)
{
std::string uri = NodeManagerConfig::GetMetricInstanceIdsUri();
return ResolveUri(uri, [](std::shared_ptr<NamingClient> namingClient) { return namingClient->GetServiceLocation(NodeManagerConfig::GetDefaultServiceName()); });
return ResolveUri(uri, [token](std::shared_ptr<NamingClient> namingClient) { return namingClient->GetServiceLocation(NodeManagerConfig::GetDefaultServiceName(), token); });
}
static std::string ResolveTaskCompletedUri(const std::string& uri)
static std::string ResolveTaskCompletedUri(const std::string& uri, pplx::cancellation_token token)
{
std::string configUri = NodeManagerConfig::GetTaskCompletionUri();
if (configUri.empty())
@ -82,13 +83,12 @@ namespace hpc
configUri = uri;
}
return ResolveUri(configUri, [](std::shared_ptr<NamingClient> namingClient) { return namingClient->GetServiceLocation(NodeManagerConfig::GetDefaultServiceName()); });
return ResolveUri(configUri, [token](std::shared_ptr<NamingClient> namingClient) { return namingClient->GetServiceLocation(NodeManagerConfig::GetDefaultServiceName(), token); });
}
protected:
private:
AddConfigurationItem(std::string, RegisterUri);
AddConfigurationItem(std::string, HostsFileUri);
AddConfigurationItem(std::string, MetricInstanceIdsUri);
static std::string ResolveUri(const std::string& uri, std::function<std::string(std::shared_ptr<NamingClient>)> resolver)

Просмотреть файл

@ -76,7 +76,10 @@ void RemoteCommunicator::Close()
{
try
{
Logger::Info("Closing the communicator {0}", this->listener.uri().to_string().c_str());
this->listener.close().wait();
Logger::Info("Closed the communicator {0}", this->listener.uri().to_string().c_str());
this->isListening = false;
}
catch (const std::exception& ex)
@ -210,6 +213,7 @@ void RemoteCommunicator::HandlePost(http_request request)
{
try
{
Logger::Info("Replied with content {0}", t.get());
request.reply(status_codes::OK, t.get()).then([](auto t) { IsError(t); });
}
catch (const web::http::http_exception& httpEx)

Просмотреть файл

@ -30,7 +30,7 @@ RemoteExecutor::RemoteExecutor(const std::string& networkName)
std::unique_ptr<Reporter<json::value>>(
new HttpReporter(
"RegisterReporter",
[]() { return NodeManagerConfig::ResolveRegisterUri(); },
[](pplx::cancellation_token token) { return NodeManagerConfig::ResolveRegisterUri(token); },
3,
this->RegisterInterval,
[this]() { return this->monitor.GetRegisterInfo(); },
@ -534,7 +534,7 @@ void RemoteExecutor::ReportTaskCompletion(
{
if (!jsonBody.is_null())
{
std::string uri = NodeManagerConfig::ResolveTaskCompletedUri(callbackUri);
std::string uri = NodeManagerConfig::ResolveTaskCompletedUri(callbackUri, this->cts.get_token());
Logger::Debug(jobId, taskId, taskRequeueCount,
"Callback to {0} with {1}", uri, jsonBody);
@ -579,7 +579,7 @@ void RemoteExecutor::StartHeartbeat()
std::unique_ptr<Reporter<json::value>>(
new HttpReporter(
"HeartbeatReporter",
[]() { return NodeManagerConfig::ResolveHeartbeatUri(); },
[](pplx::cancellation_token token) { return NodeManagerConfig::ResolveHeartbeatUri(token); },
0,
this->NodeInfoReportInterval,
[this]() { return this->jobTaskTable.ToJson(); },
@ -590,7 +590,7 @@ void RemoteExecutor::StartHeartbeat()
void RemoteExecutor::StartHostsManager()
{
std::string hostsUri = NodeManagerConfig::ResolveHostsFileUri();
std::string hostsUri = NodeManagerConfig::GetHostsFileUri();
if (!hostsUri.empty())
{
int interval = this->DefaultHostsFetchInterval;
@ -613,7 +613,7 @@ void RemoteExecutor::StartHostsManager()
WriterLock writerLock(&this->lock);
this->hostsManager = std::unique_ptr<HostsManager>(new HostsManager([]() { return NodeManagerConfig::ResolveHostsFileUri(); }, interval));
this->hostsManager = std::unique_ptr<HostsManager>(new HostsManager([](pplx::cancellation_token token) { return NodeManagerConfig::ResolveHostsFileUri(token); }, interval));
this->hostsManager->Start();
}
else
@ -639,7 +639,7 @@ void RemoteExecutor::StartMetric()
{
WriterLock writerLock(&this->lock);
std::string uri = NodeManagerConfig::ResolveMetricUri();
std::string uri = NodeManagerConfig::GetMetricUri();
if (!uri.empty())
{
auto tokens = String::Split(uri, '/');
@ -651,7 +651,7 @@ void RemoteExecutor::StartMetric()
std::unique_ptr<Reporter<std::vector<unsigned char>>>(
new UdpReporter(
"MetricReporter",
[]() { return NodeManagerConfig::ResolveMetricUri(); },
[](pplx::cancellation_token token) { return NodeManagerConfig::ResolveMetricUri(token); },
0,
this->MetricReportInterval,
[this]() { return this->monitor.GetMonitorPacketData(); },
@ -681,7 +681,7 @@ pplx::task<json::value> RemoteExecutor::MetricConfig(
{
this->Metric(std::move(callbackUri));
this->monitor.ApplyMetricConfig(std::move(config));
this->monitor.ApplyMetricConfig(std::move(config), this->cts.get_token());
return pplx::task_from_result(json::value());
}

Просмотреть файл

@ -21,7 +21,13 @@ namespace hpc
{
public:
RemoteExecutor(const std::string& networkName);
~RemoteExecutor() { pthread_rwlock_destroy(&this->lock); }
~RemoteExecutor()
{
Logger::Info("Closing the Remote Executor.");
this->cts.cancel();
pthread_rwlock_destroy(&this->lock);
Logger::Info("Closed the Remote Executor.");
}
virtual pplx::task<web::json::value> StartJobAndTask(hpc::arguments::StartJobAndTaskArgs&& args, std::string&& callbackUri);
virtual pplx::task<web::json::value> StartTask(hpc::arguments::StartTaskArgs&& args, std::string&& callbackUri);
@ -67,6 +73,8 @@ namespace hpc
std::map<int, std::tuple<std::string, bool, bool, bool, bool, std::string>> jobUsers;
std::map<std::string, std::set<int>> userJobs;
pthread_rwlock_t lock;
pplx::cancellation_token_source cts;
};
}
}

Просмотреть файл

@ -17,7 +17,7 @@ namespace hpc
class Reporter
{
public:
Reporter(std::string reporterName, std::function<std::string()> getUri, int hold, int interval, std::function<ReportType()> fetcher, std::function<void()> onErrorFunc)
Reporter(std::string reporterName, std::function<std::string(pplx::cancellation_token)> getUri, int hold, int interval, std::function<ReportType()> fetcher, std::function<void()> onErrorFunc)
: name(reporterName), getReportUri(getUri), valueFetcher(fetcher), onError(onErrorFunc), intervalSeconds(interval), holdSeconds(hold)
{
}
@ -32,36 +32,35 @@ namespace hpc
void Stop()
{
Logger::Debug("Stopping the thread of Reporter {0}", this->name);
this->isRunning = false;
this->cts.cancel();
if (this->threadId != 0)
{
while (this->inRequest) usleep(1);
pthread_cancel(this->threadId);
pthread_join(this->threadId, nullptr);
Logger::Debug("Destructed Reporter {0}", this->name);
Logger::Debug("Stopped the thread of Reporter {0}", this->name);
}
}
virtual ~Reporter()
{
Logger::Debug("Destruct Reporter {0}", this->name);
Logger::Debug("Destructed Reporter {0}", this->name);
}
virtual int Report() = 0;
protected:
std::string name;
std::function<std::string()> getReportUri;
std::function<std::string(pplx::cancellation_token)> getReportUri;
std::function<ReportType()> valueFetcher;
std::function<void()> onError;
int intervalSeconds;
pplx::cancellation_token_source cts;
private:
static void* ReportingThread(void* arg)
{
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);
Reporter* r = static_cast<Reporter*>(arg);
sleep(r->holdSeconds);
@ -82,10 +81,10 @@ namespace hpc
r->inRequest = false;
}
sleep(needRetry ? r->ErrorRetrySeconds : r->intervalSeconds);
if (r->isRunning) sleep(needRetry ? r->ErrorRetrySeconds : r->intervalSeconds);
}
pthread_exit(nullptr);
return nullptr;
}
const int ErrorRetrySeconds = 2;

Просмотреть файл

@ -12,7 +12,7 @@ using namespace hpc::utils;
UdpReporter::UdpReporter(
const std::string& name,
std::function<std::string()> getReportUri,
std::function<std::string(pplx::cancellation_token)> getReportUri,
int hold,
int interval,
std::function<std::vector<unsigned char>()> fetcher,
@ -32,7 +32,7 @@ void UdpReporter::ReConnect()
try
{
uri = this->getReportUri();
uri = this->getReportUri(this->cts.get_token());
}
catch (const http::http_exception& httpEx)
{

Просмотреть файл

@ -15,7 +15,7 @@ namespace hpc
public:
UdpReporter(
const std::string& name,
std::function<std::string()> getReportUri,
std::function<std::string(pplx::cancellation_token)> getReportUri,
int hold,
int interval,
std::function<std::vector<unsigned char>()> fetcher,