Fix several bugs in linux communicator

This commit is contained in:
EvanCui 2015-05-25 18:46:40 +08:00
Родитель 729dcab0f4
Коммит 76e208dd00
2 изменённых файлов: 113 добавлений и 42 удалений

Просмотреть файл

@ -20,14 +20,15 @@ namespace Microsoft.Hpc.Communicators.LinuxCommunicator
private const string CallbackUriHeaderName = "CallbackUri";
private const string HpcFullKeyName = @"HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\HPC";
private const string ClusterNameKeyName = "ClusterName";
private const int AutoRetryLimit = 5;
private const int AutoRetrySendLimit = 3;
private const int AutoRetryStartLimit = 3;
private readonly string HeadNode = (string)Microsoft.Win32.Registry.GetValue(HpcFullKeyName, ClusterNameKeyName, null);
private readonly int MonitoringPort = 9894;
private readonly TimeSpan RequestTimeout = TimeSpan.FromSeconds(20);
private readonly TimeSpan RequestTimeout = TimeSpan.FromSeconds(40);
private readonly TimeSpan DelayBetweenRetry = TimeSpan.FromSeconds(3);
private readonly TimeSpan RetryStartInterval = TimeSpan.FromSeconds(10);
private HttpClient client;
private WebServer server;
private CancellationTokenSource cancellationTokenSource;
@ -51,7 +52,6 @@ namespace Microsoft.Hpc.Communicators.LinuxCommunicator
public void Dispose()
{
this.server.Dispose();
this.client.Dispose();
GC.SuppressFinalize(this);
}
@ -118,9 +118,6 @@ namespace Microsoft.Hpc.Communicators.LinuxCommunicator
return sslPolicyErrors == System.Net.Security.SslPolicyErrors.None;
};
this.client = new HttpClient();
this.client.Timeout = this.RequestTimeout;
this.server = new WebServer();
if (this.HeadNode == null)
@ -136,10 +133,43 @@ namespace Microsoft.Hpc.Communicators.LinuxCommunicator
public bool Start()
{
this.Tracer.TraceInfo("Starting LinuxCommunicator.");
this.cancellationTokenSource = new CancellationTokenSource();
this.server.Start().Wait();
return this.Start(0);
}
private bool Start(int retryCount)
{
this.Tracer.TraceInfo("Starting LinuxCommunicator. RetryCount {0}", retryCount);
if (retryCount >= AutoRetryStartLimit)
{
this.Tracer.TraceInfo("Exceeding the auto retry start limit {0}", AutoRetryStartLimit);
return false;
}
if (this.cancellationTokenSource != null) { this.cancellationTokenSource.Dispose(); }
this.cancellationTokenSource = new CancellationTokenSource();
try
{
this.server.Start().Wait();
}
catch(AggregateException aggrEx)
{
if (aggrEx.InnerExceptions.Any(e => e is HttpListenerException))
{
this.Tracer.TraceWarning("Failed to start http listener {0}", aggrEx);
return this.Start(retryCount + 1);
}
throw;
}
catch (HttpListenerException ex)
{
this.Tracer.TraceWarning("Failed to start http listener {0}", ex);
return this.Start(retryCount + 1);
}
return true;
}
@ -165,7 +195,10 @@ namespace Microsoft.Hpc.Communicators.LinuxCommunicator
try
{
arg.JobInfo = await content.ReadAsAsync<ComputeClusterJobInformation>();
if (content != null)
{
arg.JobInfo = await content.ReadAsAsync<ComputeClusterJobInformation>();
}
}
catch (Exception e)
{
@ -195,7 +228,10 @@ namespace Microsoft.Hpc.Communicators.LinuxCommunicator
try
{
arg.TaskInfo = await content.ReadAsAsync<ComputeClusterTaskInformation>();
if (content != null)
{
arg.TaskInfo = await content.ReadAsAsync<ComputeClusterTaskInformation>();
}
}
catch (Exception e)
{
@ -242,31 +278,34 @@ namespace Microsoft.Hpc.Communicators.LinuxCommunicator
{
string privateKey = null, publicKey = null;
try
if (extendedData != null)
{
XDocument xDoc = XDocument.Parse(extendedData);
var privateKeyNode = xDoc.Descendants("PrivateKey").FirstOrDefault();
var publicKeyNode = xDoc.Descendants("PublicKey").FirstOrDefault();
if (privateKeyNode != null)
try
{
privateKey = privateKeyNode.Value;
}
XDocument xDoc = XDocument.Parse(extendedData);
if (publicKeyNode != null)
{
publicKey = publicKeyNode.Value;
var privateKeyNode = xDoc.Descendants("PrivateKey").FirstOrDefault();
var publicKeyNode = xDoc.Descendants("PublicKey").FirstOrDefault();
if (privateKeyNode != null)
{
privateKey = privateKeyNode.Value;
}
if (publicKeyNode != null)
{
publicKey = publicKeyNode.Value;
}
}
catch (Exception ex)
{
this.Tracer.TraceWarning("Error parsing extended data {0}, ex {1}", extendedData, ex);
}
}
catch (Exception ex)
{
this.Tracer.TraceWarning("Error parsing extended data {0}, ex {1}", extendedData, ex);
}
if (IsAdmin(userName, password))
{
startInfo.EnvironmentVariables["CCP_ISADMIN"] = "1";
}
}
this.SendRequest("startjobandtask", this.GetCallbackUri(nodeName, "taskcompleted"), nodeName, (content, ex) =>
{
@ -299,7 +338,7 @@ namespace Microsoft.Hpc.Communicators.LinuxCommunicator
}, null);
}
private void SendRequest<T>(string action, string callbackUri, string nodeName, Action<HttpContent, Exception> callback, T arg, int retryCount = 0)
private async Task SendRequestInternal<T>(string action, string callbackUri, string nodeName, Action<HttpContent, Exception> callback, T arg, int retryCount = 0)
{
this.Tracer.TraceDetail("Sending out request, action {0}, callback {1}, nodeName {2}", action, callbackUri, nodeName);
var request = new HttpRequestMessage(HttpMethod.Post, this.GetResoureUri(nodeName, action));
@ -307,19 +346,32 @@ namespace Microsoft.Hpc.Communicators.LinuxCommunicator
var formatter = new JsonMediaTypeFormatter();
request.Content = new ObjectContent<T>(arg, formatter);
this.client.SendAsync(request, this.cancellationTokenSource.Token).ContinueWith(async t =>
{
Exception ex = t.Exception;
this.Tracer.TraceDetail("Sending out request task completed, action {0}, callback {1}, nodeName {2} ex {3}", action, callbackUri, nodeName, ex);
Exception ex = null;
HttpContent content = null;
bool retry = false;
HttpContent content = null;
using (HttpClient client = new HttpClient())
{
client.Timeout = this.RequestTimeout;
HttpResponseMessage response = null;
try
{
response = await client.SendAsync(request, this.cancellationTokenSource.Token);
}
catch (Exception e)
{
ex = e;
}
this.Tracer.TraceDetail("Sending out request task completed, action {0}, callback {1}, nodeName {2} ex {3}", action, callbackUri, nodeName, ex);
if (ex == null)
{
try
{
t.Result.EnsureSuccessStatusCode();
content = t.Result.Content;
response.EnsureSuccessStatusCode();
content = response.Content;
}
catch (Exception e)
{
@ -327,23 +379,41 @@ namespace Microsoft.Hpc.Communicators.LinuxCommunicator
}
}
if (this.CanRetry(ex) && retryCount < AutoRetryLimit)
if (this.CanRetry(ex) && retryCount < AutoRetrySendLimit)
{
await Task.Delay(DelayBetweenRetry);
this.SendRequest(action, callbackUri, nodeName, callback, arg, retryCount + 1);
return;
retry = true;
}
else
{
callback(content, ex);
}
}, this.cancellationTokenSource.Token);
}
if (retry)
{
await Task.Delay(DelayBetweenRetry);
this.SendRequest(action, callbackUri, nodeName, callback, arg, retryCount + 1);
}
}
private void SendRequest<T>(string action, string callbackUri, string nodeName, Action<HttpContent, Exception> callback, T arg, int retryCount = 0)
{
this.SendRequestInternal(action, callbackUri, nodeName, callback, arg, retryCount).ContinueWith(t =>
{
this.Tracer.TraceDetail("Finished sending, action {0}, callback {1}, nodeName {2} retry count {3}", action, callbackUri, retryCount);
});
}
private bool CanRetry(Exception exception)
{
if (this.cancellationTokenSource.IsCancellationRequested)
{
return false;
}
if (exception is HttpRequestException ||
exception is WebException)
exception is WebException ||
exception is TaskCanceledException)
{
return true;
}

Просмотреть файл

@ -55,6 +55,7 @@ namespace Microsoft.Hpc.Communicators.LinuxCommunicator
{
await Task.Yield();
if (this.source != null) { this.source.Dispose(); }
this.source = new CancellationTokenSource();
LinuxCommunicator.Instance.Tracer.TraceInfo("Start listening on {0}", this.ListeningUri);