Merge branch 'master' of github.com:coolmay/whpc-linux-communicator

This commit is contained in:
evanc 2015-09-24 03:04:27 -07:00
Родитель 0f9354f2f5 f46bec8358
Коммит acfca89c12
17 изменённых файлов: 646 добавлений и 29 удалений

Просмотреть файл

@ -1,17 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net;
using System.Net.Http;
using System.Web.Http;
using Microsoft.Hpc.Activation;
using Microsoft.Hpc.Scheduler.Communicator;
using Microsoft.Hpc.Communicators.LinuxCommunicator.Monitoring;
namespace Microsoft.Hpc.Communicators.LinuxCommunicator
{
public class CallbackController : ApiController
{
/// <summary>
/// The Http header UpdateId
/// </summary>
public const string UpdateIdHeaderName = "UpdateId";
[HttpPost]
[Route("api/{nodename}/computenodereported")]
public int ComputeNodeReported(string nodeName, [FromBody] ComputeClusterNodeInformation nodeInfo)
@ -84,5 +88,27 @@ namespace Microsoft.Hpc.Communicators.LinuxCommunicator
return null;
}
[HttpGet]
[Route("api/hostfile")]
public HttpResponseMessage GetHosts()
{
Guid curUpdateId = LinuxCommunicator.Instance.HostsManager.UpdateId;
IEnumerable<string> updateIds;
bool hasUpdateId = Request.Headers.TryGetValues(UpdateIdHeaderName, out updateIds);
HttpResponseMessage response = null;
Guid guid;
if (hasUpdateId && Guid.TryParse(updateIds.FirstOrDefault(), out guid) && guid == curUpdateId)
{
response = Request.CreateResponse(HttpStatusCode.NoContent);
}
else
{
response = Request.CreateResponse(HttpStatusCode.OK, LinuxCommunicator.Instance.HostsManager.ManagedEntries);
}
response.Headers.Add(UpdateIdHeaderName, curUpdateId.ToString());
return response;
}
}
}

Просмотреть файл

@ -0,0 +1,56 @@
using System;
using System.Net;
namespace Microsoft.Hpc.Communicators.LinuxCommunicator.HostsFile
{
public class HostEntry
{
/// <summary>
/// The host name
/// </summary>
public string Name;
/// <summary>
/// The host IP address
/// </summary>
public string Address;
/// <summary>
/// Creates a new hosts file entry for the specified host.
/// </summary>
/// <param name="name">The host name</param>
/// <param name="ipAddress">The IP address</param>
public HostEntry(string name, string ipAddress)
{
IPAddress testAddress;
if (!IPAddress.TryParse(ipAddress, out testAddress))
{
throw new ArgumentException("ipAddress must be a valid IP address");
}
if (string.IsNullOrEmpty(name))
{
throw new ArgumentException("name cannot be null");
}
this.Address = ipAddress;
this.Name = name;
}
public override bool Equals(object obj)
{
HostEntry entry = obj as HostEntry;
if (entry == null)
{
return false;
}
return string.Equals(this.Name, entry.Name) && string.Equals(this.Address, entry.Address);
}
public override int GetHashCode()
{
return (this.Address ?? string.Empty).GetHashCode() ^ (this.Name ?? string.Empty).GetHashCode();
}
}
}

Просмотреть файл

@ -0,0 +1,220 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.IO;
using System.Diagnostics;
using System.Threading;
namespace Microsoft.Hpc.Communicators.LinuxCommunicator.HostsFile
{
/// <summary>
/// This class abstracts a hosts file so that is can be managed by HPC.
/// </summary>
public class HostsFileManager : IDisposable
{
/// <summary>
/// The string placed in the comment field of an entry managed
/// by the cluster.
/// </summary>
public const string ManagedEntryKey = "HPC";
/// <summary>
/// The parameter name used to indicate whether the hosts file is HPC managed.
/// </summary>
public const string ManageFileParameter = "ManageFile";
/// <summary>
/// The interval to reload hosts file
/// </summary>
private const int ReloadInterval = 1000 * 60;
/// <summary>
/// Regular expression for a comment line in the text file
/// </summary>
private static readonly Regex Comment = new Regex(@"^#(?<1>.*)$");
/// <summary>
/// Regular expression for a parameter value in the comments section.
/// </summary>
private static readonly Regex CommentParameter = new Regex(@"^#\s*(?<parameter>[\w\p{Lm}\p{Nl}\p{Cf}\p{Mn}\p{Mc}\.]+)\s*=\s*(?<value>[\w\p{Lm}\p{Nl}\p{Cf}\p{Mn}\p{Mc}\.]+)");
/// <summary>
/// Regular expression for a ip entry in the text file.
/// </summary>
private static readonly Regex IpEntry = new Regex(@"^(?<ip>[0-9\.]+)\s+(?<dnsName>[^\s#]+)(\s+#(?<comment>.*))?");
/// <summary>
/// The path to the hosts file
/// </summary>
private string filepath;
/// <summary>
/// The timer to reload hosts file
/// </summary>
private Timer reloadTimer;
/// <summary>
/// The last modified date of the current loaded hosts file
/// </summary>
private DateTime lastModified = DateTime.MinValue;
/// <summary>
/// The unique update ID
/// </summary>
public Guid UpdateId
{
get;
private set;
}
/// <summary>
/// This constructor initializes the parser on a specific hosts file
/// </summary>
/// <param name="filepath"></param>
public HostsFileManager(string filepath)
{
this.filepath = filepath;
this.ManagedEntries = new List<HostEntry>();
this.reloadTimer = new Timer(ReloadTimerEvent, null, 0, Timeout.Infinite);
}
/// <summary>
/// Gets the list of HPC managed entries in the host file.
/// </summary>
public List<HostEntry> ManagedEntries
{
get;
private set;
}
/// <summary>
/// Gets or sets a flag indicating whether the file is HPC managed
/// </summary>
public bool ManagedByHPC
{
get;
private set;
}
/// <summary>
/// Loads the contents of an existing host file. Will clear
/// the current configuration of this instance.
/// </summary>
/// <param name="path"></param>
public void ReloadTimerEvent(object param)
{
try
{
FileInfo fileInfo = new FileInfo(this.filepath);
if (!fileInfo.Exists)
{
LinuxCommunicator.Instance.Tracer.TraceInfo("[HostsFileManager] The hosts file doesn't exists: {0}", this.filepath);
return;
}
if (fileInfo.LastWriteTimeUtc <= this.lastModified)
{
LinuxCommunicator.Instance.Tracer.TraceInfo("[HostsFileManager] The hosts file isn't changed since last load");
return;
}
bool manageByHPC = false;
List<HostEntry> newEntries = new List<HostEntry>();
foreach (var line in File.ReadAllLines(this.filepath))
{
Match commentMatch = HostsFileManager.Comment.Match(line);
if (commentMatch.Success)
{
Match commentParameterMatch = HostsFileManager.CommentParameter.Match(line);
if (commentParameterMatch.Success && string.Equals(commentParameterMatch.Groups["parameter"].ToString(), ManageFileParameter, StringComparison.OrdinalIgnoreCase))
{
if (string.Equals(commentParameterMatch.Groups["value"].Value, "true", StringComparison.OrdinalIgnoreCase))
{
manageByHPC = true;
}
}
continue;
}
Match ipEntryMatch = HostsFileManager.IpEntry.Match(line);
if (ipEntryMatch.Success && manageByHPC)
{
string ip = ipEntryMatch.Groups["ip"].Value;
string name = ipEntryMatch.Groups["dnsName"].Value;
string comment = ipEntryMatch.Groups["comment"].Value;
if (comment.Equals(HostsFileManager.ManagedEntryKey, StringComparison.OrdinalIgnoreCase))
{
try
{
newEntries.Add(new HostEntry(name, ip));
}
catch (ArgumentException ex)
{
LinuxCommunicator.Instance.Tracer.TraceInfo("[HostsFileManager] Skip invalid host entry name={0}, ip={1}: {2}", name, ip, ex.Message);
}
}
}
}
if (manageByHPC)
{
if (newEntries.Count != this.ManagedEntries.Count || !(new HashSet<HostEntry>(this.ManagedEntries)).SetEquals(new HashSet<HostEntry>(newEntries)))
{
this.ManagedEntries = newEntries;
this.UpdateId = Guid.NewGuid();
LinuxCommunicator.Instance.Tracer.TraceInfo("[HostsFileManager] The managed host entries updated, current update Id is {0}", this.UpdateId);
}
else
{
LinuxCommunicator.Instance.Tracer.TraceInfo("[HostsFileManager] No update to HPC managed host entries, current update Id is {0}", this.UpdateId);
}
}
else
{
LinuxCommunicator.Instance.Tracer.TraceWarning("[HostsFileManager] Hosts file was not managed by HPC");
this.ManagedEntries.Clear();
}
this.ManagedByHPC = manageByHPC;
this.lastModified = fileInfo.LastWriteTimeUtc;
}
catch (Exception e)
{
LinuxCommunicator.Instance.Tracer.TraceWarning("[HostsFileManager] Failed to reload host file: {0}", e);
}
finally
{
try
{
this.reloadTimer.Change(ReloadInterval, Timeout.Infinite);
}
catch (Exception te)
{
LinuxCommunicator.Instance.Tracer.TraceWarning("[HostsFileManager] Failed to restart reload timer: {0}", te);
}
}
}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool isDisposing)
{
if (isDisposing)
{
if (this.reloadTimer != null)
{
this.reloadTimer.Dispose();
}
}
}
}
}

Просмотреть файл

@ -1,20 +1,21 @@
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Formatting;
using System.Net.Security;
using System.Security.Principal;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Hpc.Activation;
using Microsoft.Hpc.Scheduler.Communicator;
using System.Net;
using Microsoft.Hpc.Scheduler.Properties;
using System.Xml.Linq;
using System.Security.Principal;
using System.Globalization;
using System.Net.Security;
using Microsoft.Hpc.Activation;
using Microsoft.Hpc.Communicators.LinuxCommunicator.HostsFile;
using Microsoft.Hpc.Communicators.LinuxCommunicator.Monitoring;
using Microsoft.Hpc.Scheduler.Communicator;
using Microsoft.Hpc.Scheduler.Properties;
namespace Microsoft.Hpc.Communicators.LinuxCommunicator
{
@ -59,16 +60,20 @@ namespace Microsoft.Hpc.Communicators.LinuxCommunicator
instance = this;
this.headNodeFqdn = new Lazy<string>(() => Dns.GetHostEntryAsync(this.HeadNode).Result.HostName, LazyThreadSafetyMode.ExecutionAndPublication);
this.MonitoringConfigManager = new MonitoringConfigManager(this.headNodeFqdn.Value);
this.HostsManager = new HostsFileManager(Path.Combine(Environment.SystemDirectory, @"drivers\etc\hosts"));
}
public event EventHandler<RegisterEventArgs> RegisterRequested;
public MonitoringConfigManager MonitoringConfigManager { get; private set; }
public HostsFileManager HostsManager { get; private set; }
public void Dispose()
{
this.server.Dispose();
this.MonitoringConfigManager.Dispose();
this.HostsManager.Dispose();
GC.SuppressFinalize(this);
}

Просмотреть файл

@ -9,7 +9,7 @@
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Microsoft.Hpc.Communicators.LinuxCommunicator</RootNamespace>
<AssemblyName>Microsoft.Hpc.Communicators.LinuxCommunicator</AssemblyName>
<TargetFrameworkVersion>v4.5.1</TargetFrameworkVersion>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<TargetFrameworkProfile />
</PropertyGroup>
@ -100,6 +100,8 @@
<Compile Include="CredentialNativeMethods.cs" />
<Compile Include="Credentials.cs" />
<Compile Include="GenericEqualityComparer.cs" />
<Compile Include="HostsFile\HostEntry.cs" />
<Compile Include="HostsFile\HostsFileManager.cs" />
<Compile Include="LinuxCommunicator.cs" />
<Compile Include="MessageAuthenticationHandler.cs" />
<Compile Include="Monitoring\MetricCounter.cs" />

Просмотреть файл

@ -32,5 +32,4 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]
[assembly: AssemblyVersion("1.0.*")]

Просмотреть файл

@ -0,0 +1,96 @@
#include <iostream>
#include <iterator>
#include <regex>
#include <list>
#include <fstream>
#include <iomanip>
#include "HostsManager.h"
using namespace hpc::utils;
using namespace hpc::data;
using namespace web::http;
using namespace hpc::core;
HostsManager::HostsManager(const std::string& hostsUri)
{
this->hostsFetcher =
std::unique_ptr<HttpFetcher>(
new HttpFetcher(
hostsUri,
0,
FetchInterval,
[this](http_request& request) {
if(!this->updateId.empty())
request.headers().add(UpdateIdHeaderName, this->updateId);
return true;
},
[this](http_response& response) {
return this->HostsResponseHandler(response);
}));
}
HostsManager::~HostsManager()
{
this->hostsFetcher->Stop();
}
void HostsManager::Start()
{
this->hostsFetcher->Start();
}
void HostsManager::Stop()
{
this->hostsFetcher->Stop();
}
bool HostsManager::HostsResponseHandler(const http_response& response)
{
if(response.status_code != 200)
{
return;
}
std::string respUpdateId;
if (HttpHelper::FindHeader(response, UpdateIdHeaderName, respUpdateId))
{
this.updateId = respUpdateId;
std::vector<HostEntry> hostEntries = JsonHelper<std::vector<HostEntry>>::FromJson(response.extract_json().get());
this.UpdateHostsFile(hostEntries);
}
}
void HostsManager::UpdateHostsFile(const std::vector<HostEntry>& hostEntries)
{
std::list<std::string> unmanagedLines;
std::ifstream ifs(HostsFilePath, std::ios::in);
std::string line;
while (getline(ifs, line))
{
std::regex entryRegex(HPCHostEntryPattern);
std::smatch entryMatch;
// Strip the original HPC entries
if(!std::regex_match(line, entryMatch, entryRegex))
{
unmanagedLines.push_back(line);
}
}
ifs.close();
std::ofstream ofs(HostsFilePath);
auto it = unmanagedLines.cbegin();
while(it != unmanagedLines.cend())
{
ofs << *it << std::endl;
}
// Append the HPC entries at the end
for(std::size_t i=0; i<hostEntries.size(); i++)
{
ofs << std::left << std::setw(24) << hostEntries[i].IPAddress << std::setw(30) << hostEntries[i].HostName << "#HPC" << std::endl;
}
ofs.close();
}

Просмотреть файл

@ -0,0 +1,36 @@
#ifndef HOSTSMANAGER_H
#define HOSTSMANAGER_H
#include <string>
#include <vector>
#include "HttpFetcher.h"
#include "../data/HostEntry.h"
namespace hpc
{
namespace core
{
using namespace web::http;
class HostsManager
{
public:
const std::string HostsFilePath = "/etc/hosts";
const int FetchInterval = 300000;
const std::string HPCHostEntryPattern = R"delimiter(^([0-9\.]+)\s+([^\s#]+)\s+#HPC\s*)delimiter";
const std::string UpdateIdHeaderName = "UpdateId";
HostsManager(const std::string& hostsUri);
~HostsManager();
void Start();
void Stop();
protected:
private:
bool HostsResponseHandler(const http_response& response);
void UpdateHostsFile(const std::vector<HostEntry>& hostEntries);
std::string updateId;
std::unique_ptr<HttpFetcher> hostsFetcher;
};
}
}
#endif // HOSTSMANAGER_H

Просмотреть файл

@ -0,0 +1,49 @@
#include "HttpFetcher.h"
#include "HttpHelper.h"
#include "../utils/Logger.h"
using namespace web::http;
using namespace web::http::client;
using namespace hpc::core;
using namespace hpc::utils;
void HttpFetcher::Report()
{
try
{
const std::string& uri = this->reportUri;
http_client client = HttpHelper::GetHttpClient(uri);
http_request request = HttpHelper::GetHttpRequest(methods::Get);
if(this.requestHandler)
{
if(!this.requestHandler(request))
{
Logger::Info("Skipped sending to {0} because of failure in request handler", uri);
return;
}
}
http_response response = client.request(request, this->cts.get_token()).get();
Logger::Debug("---------> Reported to {0} response code {1}", uri, response.status_code());
if(this.responseHandler)
{
if(!this.responseHandler(response))
{
Logger::Info("Error in response handler for the request sent to {0}", uri);
}
}
}
catch (const http_exception& httpEx)
{
Logger::Warn("HttpException occurred when report to {0}, ex {1}", this->targetUri, httpEx.what());
}
catch (const std::exception& ex)
{
Logger::Error("Exception occurred when report to {0}, ex {1}", this->targetUri, ex.what());
}
catch (...)
{
Logger::Error("Unknown error occurred when report to {0}", this->targetUri);
}
}

Просмотреть файл

@ -0,0 +1,44 @@
#ifndef HTTPFETCHER_H
#define HTTPFETCHER_H
#include <cpprest/json.h>
#include <cpprest/http_client.h>
#include <functional>
#include "Reporter.h"
namespace hpc
{
namespace core
{
using namespace web;
class HttpFetcher : public Reporter<int>
{
public:
HttpFetcher(
const std::string& uri,
int hold,
int interval,
std::function<void(http::http_request&)> requestHandler,
std::function<void(http::http_response&)> responseHandler)
: Reporter<json::value>(uri, hold, interval, nullptr),requestHandler(requestHandler),responseHandler(responseHandler)
{
}
virtual ~HttpFetcher()
{
this->cts.cancel();
this->Stop();
}
virtual void Report();
private:
std::function<bool(http::http_request&)> requestHandler;
std::function<bool(http::http_response&)> responseHandler;
pplx::cancellation_token_source cts;
};
}
}
#endif // HTTPFETCHER_H

Просмотреть файл

@ -17,16 +17,24 @@ namespace hpc
{
public:
static http::http_request GetHttpRequest(
const http::method& mtd,
const json::value &body)
const http::method& mtd)
{
http::http_request msg(mtd);
msg.set_request_uri("");
msg.set_body(body);
msg.headers().add(AuthenticationHeaderKey, NodeManagerConfig::GetClusterAuthenticationKey());
return msg;
}
template <typename T>
static http::http_request GetHttpRequest(
const http::method& mtd,
const T &body)
{
http::http_request msg = GetHttpRequest(mtd);
msg.set_body(body);
return msg;
}
static void ConfigListenerSslContext(context& ctx)
{
ctx.set_options(boost::asio::ssl::context::default_workarounds);
@ -79,10 +87,11 @@ namespace hpc
return std::move(http::client::http_client(uri, config));
}
static bool FindHeader(const http::http_request& request, const std::string& headerKey, std::string& header)
template <typename T>
static bool FindHeader(const T& message, const std::string& headerKey, std::string& header)
{
auto h = request.headers().find(headerKey);
if (h != request.headers().end())
auto h = message.headers().find(headerKey);
if (h != message.headers().end())
{
header = h->second;
return true;

Просмотреть файл

@ -29,6 +29,7 @@ namespace hpc
AddConfigurationItem(std::string, RegisterUri);
AddConfigurationItem(std::string, HeartbeatUri);
AddConfigurationItem(std::string, MetricUri);
AddConfigurationItem(std::string, HostsFileUri);
AddConfigurationItem(std::string, ClusterAuthenticationKey);
AddConfigurationItem(std::string, TrustedCAPath);
AddConfigurationItem(std::string, TrustedCAFile);

Просмотреть файл

@ -38,6 +38,7 @@ RemoteExecutor::RemoteExecutor(const std::string& networkName)
this->StartHeartbeat(NodeManagerConfig::GetHeartbeatUri());
this->StartMetric(NodeManagerConfig::GetMetricUri());
this->StartHostsManager(NodeManagerConfig::GetHostsFileUri());
}
json::value RemoteExecutor::StartJobAndTask(StartJobAndTaskArgs&& args, const std::string& callbackUri)
@ -540,6 +541,17 @@ void RemoteExecutor::StartHeartbeat(const std::string& callbackUri)
this->nodeInfoReporter->Start();
}
void RemoteExecutor::StartHostsManager(const std::string& callbackUri)
{
if(!callbackUri.empty())
{
WriterLock writerLock(&this->lock);
this->hostsManager = std::unique_ptr<HostsManager>(new HostsManager(callbackUri));
this->hostsManager->Start();
}
}
json::value RemoteExecutor::Ping(const std::string& callbackUri)
{
auto uri = NodeManagerConfig::GetHeartbeatUri();

Просмотреть файл

@ -9,6 +9,7 @@
#include "Monitor.h"
#include "Process.h"
#include "Reporter.h"
#include "HostsManager.h"
#include "../arguments/MetricCountersConfig.h"
#include "../data/ProcessStatistics.h"
@ -35,6 +36,7 @@ namespace hpc
void StartHeartbeat(const std::string& callbackUri);
void StartMetric(const std::string& callbackUri);
void StartHostsManager(const std::string& callbackUri);
const hpc::data::ProcessStatistics* TerminateTask(
int jobId, int taskId, int requeueCount,
@ -53,6 +55,7 @@ namespace hpc
std::unique_ptr<Reporter<json::value>> nodeInfoReporter;
std::unique_ptr<Reporter<json::value>> registerReporter;
std::unique_ptr<Reporter<std::vector<unsigned char>>> metricReporter;
std::unique_ptr<HostsManager> hostsManager;
std::map<uint64_t, std::shared_ptr<Process>> processes;
std::map<int, std::tuple<std::string, bool, bool, bool, bool, std::string>> jobUsers;

Просмотреть файл

@ -0,0 +1,25 @@
#include "HostEntry.h"
#include "../utils/JsonHelper.h"
using namespace hpc::data;
using namespace hpc::utils;
HostEntry HostEntry::FromJson(const web::json::value& jsonValue)
{
return HostEntry(
JsonHelper<std::string>::Read("Name", jsonValue),
JsonHelper<std::string>::Read("Address", jsonValue));
}
namespace hpc
{
namespace utils
{
template <>
HostEntry JsonHelper<HostEntry>::FromJson(const json::value& j)
{
if (!j.is_null()) { return HostEntry::FromJson(j); }
else { return HostEntry(); }
}
}
}

Просмотреть файл

@ -0,0 +1,33 @@
#ifndef HOSTENTRY_H
#define HOSTENTRY_H
#include <string>
#include <cpprest/json.h>
namespace hpc
{
namespace data
{
class HostEntry
{
public:
HostEntry() = default;
HostEntry(const std::string& hostName, const std::string& ipAddress):
HostName(hostName), IPAddress(ipAddress)
{
}
static HostEntry FromJson(const web::json::value& jsonValue);
bool operator == (const HostEntry &entry) const
{
return ((this->HostName.compare(entry.HostName) == 0) && (this->IPAddress.compare(entry.IPAddress) == 0));
}
std::string HostName;
std::string IPAddress;
};
}
}
#endif // HOSTENTRY_H

Просмотреть файл

@ -183,6 +183,7 @@ void System::CPU(int &cores, int &sockets)
cores = coreIds.size();
sockets = physicalIds.size();
sockets = (sockets > 0)? sockets : 1;
// Logger::Debug("Detected core count {0}, socket count {1}", cores, sockets);