diff --git a/LinuxCommunicator/CallbackController.cs b/LinuxCommunicator/CallbackController.cs
index f4ecab9..f96ff2d 100644
--- a/LinuxCommunicator/CallbackController.cs
+++ b/LinuxCommunicator/CallbackController.cs
@@ -1,17 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
+using System.Net;
+using System.Net.Http;
using System.Web.Http;
using Microsoft.Hpc.Activation;
using Microsoft.Hpc.Scheduler.Communicator;
-using Microsoft.Hpc.Communicators.LinuxCommunicator.Monitoring;
namespace Microsoft.Hpc.Communicators.LinuxCommunicator
{
public class CallbackController : ApiController
{
+ ///
+ /// The Http header UpdateId
+ ///
+ public const string UpdateIdHeaderName = "UpdateId";
+
[HttpPost]
[Route("api/{nodename}/computenodereported")]
public int ComputeNodeReported(string nodeName, [FromBody] ComputeClusterNodeInformation nodeInfo)
@@ -84,5 +88,27 @@ namespace Microsoft.Hpc.Communicators.LinuxCommunicator
return null;
}
+
+ [HttpGet]
+ [Route("api/hostfile")]
+ public HttpResponseMessage GetHosts()
+ {
+ Guid curUpdateId = LinuxCommunicator.Instance.HostsManager.UpdateId;
+ IEnumerable updateIds;
+ bool hasUpdateId = Request.Headers.TryGetValues(UpdateIdHeaderName, out updateIds);
+ HttpResponseMessage response = null;
+ Guid guid;
+ if (hasUpdateId && Guid.TryParse(updateIds.FirstOrDefault(), out guid) && guid == curUpdateId)
+ {
+ response = Request.CreateResponse(HttpStatusCode.NoContent);
+ }
+ else
+ {
+ response = Request.CreateResponse(HttpStatusCode.OK, LinuxCommunicator.Instance.HostsManager.ManagedEntries);
+ }
+
+ response.Headers.Add(UpdateIdHeaderName, curUpdateId.ToString());
+ return response;
+ }
}
}
diff --git a/LinuxCommunicator/HostsFile/HostEntry.cs b/LinuxCommunicator/HostsFile/HostEntry.cs
new file mode 100644
index 0000000..1e82b5d
--- /dev/null
+++ b/LinuxCommunicator/HostsFile/HostEntry.cs
@@ -0,0 +1,56 @@
+using System;
+using System.Net;
+
+namespace Microsoft.Hpc.Communicators.LinuxCommunicator.HostsFile
+{
+ public class HostEntry
+ {
+ ///
+ /// The host name
+ ///
+ public string Name;
+
+ ///
+ /// The host IP address
+ ///
+ public string Address;
+
+ ///
+ /// Creates a new hosts file entry for the specified host.
+ ///
+ /// The host name
+ /// The IP address
+ public HostEntry(string name, string ipAddress)
+ {
+ IPAddress testAddress;
+ if (!IPAddress.TryParse(ipAddress, out testAddress))
+ {
+ throw new ArgumentException("ipAddress must be a valid IP address");
+ }
+
+ if (string.IsNullOrEmpty(name))
+ {
+ throw new ArgumentException("name cannot be null");
+ }
+
+ this.Address = ipAddress;
+ this.Name = name;
+ }
+
+ public override bool Equals(object obj)
+ {
+ HostEntry entry = obj as HostEntry;
+ if (entry == null)
+ {
+ return false;
+ }
+
+ return string.Equals(this.Name, entry.Name) && string.Equals(this.Address, entry.Address);
+ }
+
+ public override int GetHashCode()
+ {
+ return (this.Address ?? string.Empty).GetHashCode() ^ (this.Name ?? string.Empty).GetHashCode();
+ }
+ }
+}
diff --git a/LinuxCommunicator/HostsFile/HostsFileManager.cs b/LinuxCommunicator/HostsFile/HostsFileManager.cs
new file mode 100644
index 0000000..7bc6592
--- /dev/null
+++ b/LinuxCommunicator/HostsFile/HostsFileManager.cs
@@ -0,0 +1,220 @@
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Linq;
+using System.Text;
+using System.Text.RegularExpressions;
+using System.IO;
+using System.Diagnostics;
+using System.Threading;
+
+namespace Microsoft.Hpc.Communicators.LinuxCommunicator.HostsFile
+{
+ ///
+ /// This class abstracts a hosts file so that is can be managed by HPC.
+ ///
+ public class HostsFileManager : IDisposable
+ {
+ ///
+ /// The string placed in the comment field of an entry managed
+ /// by the cluster.
+ ///
+ public const string ManagedEntryKey = "HPC";
+
+ ///
+ /// The parameter name used to indicate whether the hosts file is HPC managed.
+ ///
+ public const string ManageFileParameter = "ManageFile";
+
+ ///
+ /// The interval to reload hosts file
+ ///
+ private const int ReloadInterval = 1000 * 60;
+
+ ///
+ /// Regular expression for a comment line in the text file
+ ///
+ private static readonly Regex Comment = new Regex(@"^#(?<1>.*)$");
+
+ ///
+ /// Regular expression for a parameter value in the comments section.
+ ///
+ private static readonly Regex CommentParameter = new Regex(@"^#\s*(?[\w\p{Lm}\p{Nl}\p{Cf}\p{Mn}\p{Mc}\.]+)\s*=\s*(?[\w\p{Lm}\p{Nl}\p{Cf}\p{Mn}\p{Mc}\.]+)");
+
+ ///
+ /// Regular expression for a ip entry in the text file.
+ ///
+ private static readonly Regex IpEntry = new Regex(@"^(?[0-9\.]+)\s+(?[^\s#]+)(\s+#(?.*))?");
+
+ ///
+ /// The path to the hosts file
+ ///
+ private string filepath;
+
+ ///
+ /// The timer to reload hosts file
+ ///
+ private Timer reloadTimer;
+
+ ///
+ /// The last modified date of the current loaded hosts file
+ ///
+ private DateTime lastModified = DateTime.MinValue;
+
+ ///
+ /// The unique update ID
+ ///
+ public Guid UpdateId
+ {
+ get;
+ private set;
+ }
+
+ ///
+ /// This constructor initializes the parser on a specific hosts file
+ ///
+ ///
+ public HostsFileManager(string filepath)
+ {
+ this.filepath = filepath;
+ this.ManagedEntries = new List();
+ this.reloadTimer = new Timer(ReloadTimerEvent, null, 0, Timeout.Infinite);
+ }
+
+ ///
+ /// Gets the list of HPC managed entries in the host file.
+ ///
+ public List ManagedEntries
+ {
+ get;
+ private set;
+ }
+
+ ///
+ /// Gets or sets a flag indicating whether the file is HPC managed
+ ///
+ public bool ManagedByHPC
+ {
+ get;
+ private set;
+ }
+
+ ///
+ /// Loads the contents of an existing host file. Will clear
+ /// the current configuration of this instance.
+ ///
+ ///
+ public void ReloadTimerEvent(object param)
+ {
+ try
+ {
+ FileInfo fileInfo = new FileInfo(this.filepath);
+ if (!fileInfo.Exists)
+ {
+ LinuxCommunicator.Instance.Tracer.TraceInfo("[HostsFileManager] The hosts file doesn't exists: {0}", this.filepath);
+ return;
+ }
+
+ if (fileInfo.LastWriteTimeUtc <= this.lastModified)
+ {
+ LinuxCommunicator.Instance.Tracer.TraceInfo("[HostsFileManager] The hosts file isn't changed since last load");
+ return;
+ }
+
+ bool manageByHPC = false;
+ List newEntries = new List();
+ foreach (var line in File.ReadAllLines(this.filepath))
+ {
+ Match commentMatch = HostsFileManager.Comment.Match(line);
+ if (commentMatch.Success)
+ {
+ Match commentParameterMatch = HostsFileManager.CommentParameter.Match(line);
+ if (commentParameterMatch.Success && string.Equals(commentParameterMatch.Groups["parameter"].ToString(), ManageFileParameter, StringComparison.OrdinalIgnoreCase))
+ {
+ if (string.Equals(commentParameterMatch.Groups["value"].Value, "true", StringComparison.OrdinalIgnoreCase))
+ {
+ manageByHPC = true;
+ }
+ }
+
+ continue;
+ }
+
+ Match ipEntryMatch = HostsFileManager.IpEntry.Match(line);
+ if (ipEntryMatch.Success && manageByHPC)
+ {
+ string ip = ipEntryMatch.Groups["ip"].Value;
+ string name = ipEntryMatch.Groups["dnsName"].Value;
+ string comment = ipEntryMatch.Groups["comment"].Value;
+
+ if (comment.Equals(HostsFileManager.ManagedEntryKey, StringComparison.OrdinalIgnoreCase))
+ {
+ try
+ {
+ newEntries.Add(new HostEntry(name, ip));
+ }
+ catch (ArgumentException ex)
+ {
+ LinuxCommunicator.Instance.Tracer.TraceInfo("[HostsFileManager] Skip invalid host entry name={0}, ip={1}: {2}", name, ip, ex.Message);
+ }
+ }
+ }
+ }
+
+ if (manageByHPC)
+ {
+ if (newEntries.Count != this.ManagedEntries.Count || !(new HashSet(this.ManagedEntries)).SetEquals(new HashSet(newEntries)))
+ {
+ this.ManagedEntries = newEntries;
+ this.UpdateId = Guid.NewGuid();
+ LinuxCommunicator.Instance.Tracer.TraceInfo("[HostsFileManager] The managed host entries updated, current update Id is {0}", this.UpdateId);
+ }
+ else
+ {
+ LinuxCommunicator.Instance.Tracer.TraceInfo("[HostsFileManager] No update to HPC managed host entries, current update Id is {0}", this.UpdateId);
+ }
+ }
+ else
+ {
+ LinuxCommunicator.Instance.Tracer.TraceWarning("[HostsFileManager] Hosts file was not managed by HPC");
+ this.ManagedEntries.Clear();
+ }
+
+ this.ManagedByHPC = manageByHPC;
+ this.lastModified = fileInfo.LastWriteTimeUtc;
+ }
+ catch (Exception e)
+ {
+ LinuxCommunicator.Instance.Tracer.TraceWarning("[HostsFileManager] Failed to reload host file: {0}", e);
+ }
+ finally
+ {
+ try
+ {
+ this.reloadTimer.Change(ReloadInterval, Timeout.Infinite);
+ }
+ catch (Exception te)
+ {
+ LinuxCommunicator.Instance.Tracer.TraceWarning("[HostsFileManager] Failed to restart reload timer: {0}", te);
+ }
+ }
+ }
+
+ public void Dispose()
+ {
+ this.Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected virtual void Dispose(bool isDisposing)
+ {
+ if (isDisposing)
+ {
+ if (this.reloadTimer != null)
+ {
+ this.reloadTimer.Dispose();
+ }
+ }
+ }
+ }
+}
diff --git a/LinuxCommunicator/LinuxCommunicator.cs b/LinuxCommunicator/LinuxCommunicator.cs
index 58057cb..12e1892 100644
--- a/LinuxCommunicator/LinuxCommunicator.cs
+++ b/LinuxCommunicator/LinuxCommunicator.cs
@@ -1,20 +1,21 @@
using System;
-using System.Collections.Generic;
using System.Collections.Concurrent;
+using System.Globalization;
+using System.IO;
using System.Linq;
+using System.Net;
using System.Net.Http;
using System.Net.Http.Formatting;
+using System.Net.Security;
+using System.Security.Principal;
using System.Threading;
using System.Threading.Tasks;
-using Microsoft.Hpc.Activation;
-using Microsoft.Hpc.Scheduler.Communicator;
-using System.Net;
-using Microsoft.Hpc.Scheduler.Properties;
using System.Xml.Linq;
-using System.Security.Principal;
-using System.Globalization;
-using System.Net.Security;
+using Microsoft.Hpc.Activation;
+using Microsoft.Hpc.Communicators.LinuxCommunicator.HostsFile;
using Microsoft.Hpc.Communicators.LinuxCommunicator.Monitoring;
+using Microsoft.Hpc.Scheduler.Communicator;
+using Microsoft.Hpc.Scheduler.Properties;
namespace Microsoft.Hpc.Communicators.LinuxCommunicator
{
@@ -59,16 +60,20 @@ namespace Microsoft.Hpc.Communicators.LinuxCommunicator
instance = this;
this.headNodeFqdn = new Lazy(() => Dns.GetHostEntryAsync(this.HeadNode).Result.HostName, LazyThreadSafetyMode.ExecutionAndPublication);
this.MonitoringConfigManager = new MonitoringConfigManager(this.headNodeFqdn.Value);
+ this.HostsManager = new HostsFileManager(Path.Combine(Environment.SystemDirectory, @"drivers\etc\hosts"));
}
public event EventHandler RegisterRequested;
public MonitoringConfigManager MonitoringConfigManager { get; private set; }
+ public HostsFileManager HostsManager { get; private set; }
+
public void Dispose()
{
this.server.Dispose();
this.MonitoringConfigManager.Dispose();
+ this.HostsManager.Dispose();
GC.SuppressFinalize(this);
}
diff --git a/LinuxCommunicator/LinuxCommunicator.csproj b/LinuxCommunicator/LinuxCommunicator.csproj
index 1062456..3a18a41 100644
--- a/LinuxCommunicator/LinuxCommunicator.csproj
+++ b/LinuxCommunicator/LinuxCommunicator.csproj
@@ -9,7 +9,7 @@
Properties
Microsoft.Hpc.Communicators.LinuxCommunicator
Microsoft.Hpc.Communicators.LinuxCommunicator
- v4.5.1
+ v4.5
512
@@ -100,6 +100,8 @@
+
+
diff --git a/LinuxCommunicator/Properties/AssemblyInfo.cs b/LinuxCommunicator/Properties/AssemblyInfo.cs
index 16471fb..0cab48d 100644
--- a/LinuxCommunicator/Properties/AssemblyInfo.cs
+++ b/LinuxCommunicator/Properties/AssemblyInfo.cs
@@ -32,5 +32,4 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
-[assembly: AssemblyVersion("1.0.0.0")]
-[assembly: AssemblyFileVersion("1.0.0.0")]
+[assembly: AssemblyVersion("1.0.*")]
diff --git a/nodemanager/core/HostsManager.cpp b/nodemanager/core/HostsManager.cpp
new file mode 100644
index 0000000..592d689
--- /dev/null
+++ b/nodemanager/core/HostsManager.cpp
@@ -0,0 +1,96 @@
+#include
+#include
+#include
+#include
+#include
+#include
+#include "HostsManager.h"
+
+using namespace hpc::utils;
+using namespace hpc::data;
+using namespace web::http;
+using namespace hpc::core;
+
+HostsManager::HostsManager(const std::string& hostsUri)
+{
+ this->hostsFetcher =
+ std::unique_ptr(
+ new HttpFetcher(
+ hostsUri,
+ 0,
+ FetchInterval,
+ [this](http_request& request) {
+ if(!this->updateId.empty())
+ request.headers().add(UpdateIdHeaderName, this->updateId);
+ return true;
+ },
+ [this](http_response& response) {
+ return this->HostsResponseHandler(response);
+ }));
+}
+
+HostsManager::~HostsManager()
+{
+ this->hostsFetcher->Stop();
+}
+
+void HostsManager::Start()
+{
+ this->hostsFetcher->Start();
+}
+
+void HostsManager::Stop()
+{
+ this->hostsFetcher->Stop();
+}
+
+bool HostsManager::HostsResponseHandler(const http_response& response)
+{
+ if(response.status_code != 200)
+ {
+ return;
+ }
+
+ std::string respUpdateId;
+ if (HttpHelper::FindHeader(response, UpdateIdHeaderName, respUpdateId))
+ {
+ this.updateId = respUpdateId;
+ std::vector hostEntries = JsonHelper>::FromJson(response.extract_json().get());
+ this.UpdateHostsFile(hostEntries);
+ }
+}
+
+void HostsManager::UpdateHostsFile(const std::vector& hostEntries)
+{
+ std::list unmanagedLines;
+ std::ifstream ifs(HostsFilePath, std::ios::in);
+ std::string line;
+ while (getline(ifs, line))
+ {
+ std::regex entryRegex(HPCHostEntryPattern);
+ std::smatch entryMatch;
+ // Strip the original HPC entries
+ if(!std::regex_match(line, entryMatch, entryRegex))
+ {
+ unmanagedLines.push_back(line);
+ }
+ }
+
+ ifs.close();
+
+ std::ofstream ofs(HostsFilePath);
+ auto it = unmanagedLines.cbegin();
+ while(it != unmanagedLines.cend())
+ {
+ ofs << *it << std::endl;
+ }
+
+ // Append the HPC entries at the end
+ for(std::size_t i=0; i
+#include
+#include "HttpFetcher.h"
+#include "../data/HostEntry.h"
+
+namespace hpc
+{
+ namespace core
+ {
+ using namespace web::http;
+ class HostsManager
+ {
+ public:
+ const std::string HostsFilePath = "/etc/hosts";
+ const int FetchInterval = 300000;
+ const std::string HPCHostEntryPattern = R"delimiter(^([0-9\.]+)\s+([^\s#]+)\s+#HPC\s*)delimiter";
+ const std::string UpdateIdHeaderName = "UpdateId";
+ HostsManager(const std::string& hostsUri);
+ ~HostsManager();
+ void Start();
+ void Stop();
+
+ protected:
+ private:
+ bool HostsResponseHandler(const http_response& response);
+ void UpdateHostsFile(const std::vector& hostEntries);
+ std::string updateId;
+ std::unique_ptr hostsFetcher;
+ };
+ }
+}
+
+#endif // HOSTSMANAGER_H
diff --git a/nodemanager/core/HttpFetcher.cpp b/nodemanager/core/HttpFetcher.cpp
new file mode 100644
index 0000000..8dc0970
--- /dev/null
+++ b/nodemanager/core/HttpFetcher.cpp
@@ -0,0 +1,49 @@
+#include "HttpFetcher.h"
+#include "HttpHelper.h"
+#include "../utils/Logger.h"
+
+using namespace web::http;
+using namespace web::http::client;
+using namespace hpc::core;
+using namespace hpc::utils;
+
+void HttpFetcher::Report()
+{
+ try
+ {
+ const std::string& uri = this->reportUri;
+ http_client client = HttpHelper::GetHttpClient(uri);
+
+ http_request request = HttpHelper::GetHttpRequest(methods::Get);
+ if(this.requestHandler)
+ {
+ if(!this.requestHandler(request))
+ {
+ Logger::Info("Skipped sending to {0} because of failure in request handler", uri);
+ return;
+ }
+ }
+
+ http_response response = client.request(request, this->cts.get_token()).get();
+ Logger::Debug("---------> Reported to {0} response code {1}", uri, response.status_code());
+ if(this.responseHandler)
+ {
+ if(!this.responseHandler(response))
+ {
+ Logger::Info("Error in response handler for the request sent to {0}", uri);
+ }
+ }
+ }
+ catch (const http_exception& httpEx)
+ {
+ Logger::Warn("HttpException occurred when report to {0}, ex {1}", this->targetUri, httpEx.what());
+ }
+ catch (const std::exception& ex)
+ {
+ Logger::Error("Exception occurred when report to {0}, ex {1}", this->targetUri, ex.what());
+ }
+ catch (...)
+ {
+ Logger::Error("Unknown error occurred when report to {0}", this->targetUri);
+ }
+}
diff --git a/nodemanager/core/HttpFetcher.h b/nodemanager/core/HttpFetcher.h
new file mode 100644
index 0000000..2e851f0
--- /dev/null
+++ b/nodemanager/core/HttpFetcher.h
@@ -0,0 +1,44 @@
+#ifndef HTTPFETCHER_H
+#define HTTPFETCHER_H
+
+#include
+#include
+#include
+#include "Reporter.h"
+
+namespace hpc
+{
+ namespace core
+ {
+ using namespace web;
+
+ class HttpFetcher : public Reporter
+ {
+ public:
+ HttpFetcher(
+ const std::string& uri,
+ int hold,
+ int interval,
+ std::function requestHandler,
+ std::function responseHandler)
+ : Reporter(uri, hold, interval, nullptr),requestHandler(requestHandler),responseHandler(responseHandler)
+ {
+ }
+
+ virtual ~HttpFetcher()
+ {
+ this->cts.cancel();
+ this->Stop();
+ }
+
+ virtual void Report();
+
+ private:
+ std::function requestHandler;
+ std::function responseHandler;
+ pplx::cancellation_token_source cts;
+ };
+ }
+}
+
+#endif // HTTPFETCHER_H
\ No newline at end of file
diff --git a/nodemanager/core/HttpHelper.h b/nodemanager/core/HttpHelper.h
index 9135b40..cbc04bb 100644
--- a/nodemanager/core/HttpHelper.h
+++ b/nodemanager/core/HttpHelper.h
@@ -17,16 +17,24 @@ namespace hpc
{
public:
static http::http_request GetHttpRequest(
- const http::method& mtd,
- const json::value &body)
+ const http::method& mtd)
{
http::http_request msg(mtd);
msg.set_request_uri("");
- msg.set_body(body);
msg.headers().add(AuthenticationHeaderKey, NodeManagerConfig::GetClusterAuthenticationKey());
return msg;
}
+ template
+ static http::http_request GetHttpRequest(
+ const http::method& mtd,
+ const T &body)
+ {
+ http::http_request msg = GetHttpRequest(mtd);
+ msg.set_body(body);
+ return msg;
+ }
+
static void ConfigListenerSslContext(context& ctx)
{
ctx.set_options(boost::asio::ssl::context::default_workarounds);
@@ -71,7 +79,7 @@ namespace hpc
});
utility::seconds timeout(5l);
- config.set_timeout(timeout);
+ config.set_timeout(timeout);
Logger::Debug(
"Create client to {0}, configure: timeout {1} seconds, chuck size {2}",
uri, config.timeout().count(), config.chunksize());
@@ -79,13 +87,14 @@ namespace hpc
return std::move(http::client::http_client(uri, config));
}
- static bool FindHeader(const http::http_request& request, const std::string& headerKey, std::string& header)
- {
- auto h = request.headers().find(headerKey);
- if (h != request.headers().end())
- {
+ template
+ static bool FindHeader(const T& message, const std::string& headerKey, std::string& header)
+ {
+ auto h = message.headers().find(headerKey);
+ if (h != message.headers().end())
+ {
header = h->second;
- return true;
+ return true;
}
return false;
diff --git a/nodemanager/core/NodeManagerConfig.h b/nodemanager/core/NodeManagerConfig.h
index 4e3b87e..9677d8c 100644
--- a/nodemanager/core/NodeManagerConfig.h
+++ b/nodemanager/core/NodeManagerConfig.h
@@ -29,6 +29,7 @@ namespace hpc
AddConfigurationItem(std::string, RegisterUri);
AddConfigurationItem(std::string, HeartbeatUri);
AddConfigurationItem(std::string, MetricUri);
+ AddConfigurationItem(std::string, HostsFileUri);
AddConfigurationItem(std::string, ClusterAuthenticationKey);
AddConfigurationItem(std::string, TrustedCAPath);
AddConfigurationItem(std::string, TrustedCAFile);
diff --git a/nodemanager/core/RemoteExecutor.cpp b/nodemanager/core/RemoteExecutor.cpp
index 717cd80..c22d33c 100644
--- a/nodemanager/core/RemoteExecutor.cpp
+++ b/nodemanager/core/RemoteExecutor.cpp
@@ -38,6 +38,7 @@ RemoteExecutor::RemoteExecutor(const std::string& networkName)
this->StartHeartbeat(NodeManagerConfig::GetHeartbeatUri());
this->StartMetric(NodeManagerConfig::GetMetricUri());
+ this->StartHostsManager(NodeManagerConfig::GetHostsFileUri());
}
json::value RemoteExecutor::StartJobAndTask(StartJobAndTaskArgs&& args, const std::string& callbackUri)
@@ -447,7 +448,7 @@ json::value RemoteExecutor::EndTask(hpc::arguments::EndTaskArgs&& args, const st
void* RemoteExecutor::GracePeriodElapsed(void* data)
{
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);
auto* ids = static_cast*>(data);
@@ -461,7 +462,7 @@ void* RemoteExecutor::GracePeriodElapsed(void* data)
delete ids;
sleep(period);
-
+
WriterLock writerLock(&e->lock);
Logger::Info(jobId, taskId, e->UnknowId, "GracePeriodElapsed: starting");
@@ -540,6 +541,17 @@ void RemoteExecutor::StartHeartbeat(const std::string& callbackUri)
this->nodeInfoReporter->Start();
}
+void RemoteExecutor::StartHostsManager(const std::string& callbackUri)
+{
+ if(!callbackUri.empty())
+ {
+ WriterLock writerLock(&this->lock);
+
+ this->hostsManager = std::unique_ptr(new HostsManager(callbackUri));
+ this->hostsManager->Start();
+ }
+}
+
json::value RemoteExecutor::Ping(const std::string& callbackUri)
{
auto uri = NodeManagerConfig::GetHeartbeatUri();
@@ -599,7 +611,7 @@ json::value RemoteExecutor::MetricConfig(
this->monitor.ApplyMetricConfig(std::move(config));
return json::value();
-}
+}
const ProcessStatistics* RemoteExecutor::TerminateTask(
int jobId, int taskId, int requeueCount,
diff --git a/nodemanager/core/RemoteExecutor.h b/nodemanager/core/RemoteExecutor.h
index 88e8c0d..1e7d177 100644
--- a/nodemanager/core/RemoteExecutor.h
+++ b/nodemanager/core/RemoteExecutor.h
@@ -9,8 +9,9 @@
#include "Monitor.h"
#include "Process.h"
#include "Reporter.h"
-#include "../arguments/MetricCountersConfig.h"
-#include "../data/ProcessStatistics.h"
+#include "HostsManager.h"
+#include "../arguments/MetricCountersConfig.h"
+#include "../data/ProcessStatistics.h"
namespace hpc
{
@@ -35,6 +36,7 @@ namespace hpc
void StartHeartbeat(const std::string& callbackUri);
void StartMetric(const std::string& callbackUri);
+ void StartHostsManager(const std::string& callbackUri);
const hpc::data::ProcessStatistics* TerminateTask(
int jobId, int taskId, int requeueCount,
@@ -53,6 +55,7 @@ namespace hpc
std::unique_ptr> nodeInfoReporter;
std::unique_ptr> registerReporter;
std::unique_ptr>> metricReporter;
+ std::unique_ptr hostsManager;
std::map> processes;
std::map> jobUsers;
diff --git a/nodemanager/data/HostEntry.cpp b/nodemanager/data/HostEntry.cpp
new file mode 100644
index 0000000..15338fa
--- /dev/null
+++ b/nodemanager/data/HostEntry.cpp
@@ -0,0 +1,25 @@
+#include "HostEntry.h"
+#include "../utils/JsonHelper.h"
+
+using namespace hpc::data;
+using namespace hpc::utils;
+
+HostEntry HostEntry::FromJson(const web::json::value& jsonValue)
+{
+ return HostEntry(
+ JsonHelper::Read("Name", jsonValue),
+ JsonHelper::Read("Address", jsonValue));
+}
+
+namespace hpc
+{
+ namespace utils
+ {
+ template <>
+ HostEntry JsonHelper::FromJson(const json::value& j)
+ {
+ if (!j.is_null()) { return HostEntry::FromJson(j); }
+ else { return HostEntry(); }
+ }
+ }
+}
diff --git a/nodemanager/data/HostEntry.h b/nodemanager/data/HostEntry.h
new file mode 100644
index 0000000..62e7ae5
--- /dev/null
+++ b/nodemanager/data/HostEntry.h
@@ -0,0 +1,33 @@
+#ifndef HOSTENTRY_H
+#define HOSTENTRY_H
+
+#include
+#include
+
+namespace hpc
+{
+ namespace data
+ {
+ class HostEntry
+ {
+ public:
+ HostEntry() = default;
+
+ HostEntry(const std::string& hostName, const std::string& ipAddress):
+ HostName(hostName), IPAddress(ipAddress)
+ {
+ }
+
+ static HostEntry FromJson(const web::json::value& jsonValue);
+ bool operator == (const HostEntry &entry) const
+ {
+ return ((this->HostName.compare(entry.HostName) == 0) && (this->IPAddress.compare(entry.IPAddress) == 0));
+ }
+
+ std::string HostName;
+ std::string IPAddress;
+ };
+ }
+}
+
+#endif // HOSTENTRY_H
\ No newline at end of file
diff --git a/nodemanager/utils/System.cpp b/nodemanager/utils/System.cpp
index 607da2c..ac5d3ef 100644
--- a/nodemanager/utils/System.cpp
+++ b/nodemanager/utils/System.cpp
@@ -183,6 +183,7 @@ void System::CPU(int &cores, int &sockets)
cores = coreIds.size();
sockets = physicalIds.size();
+ sockets = (sockets > 0)? sockets : 1;
// Logger::Debug("Detected core count {0}, socket count {1}", cores, sockets);