// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
// ------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Fabric;
using System.Fabric.Health;
using System.Fabric.Query;
using System.Fabric.Repair;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ClusterObserver.Utilities;
using FabricObserver.Observers;
using FabricObserver.Observers.Utilities;
using FabricObserver.Observers.Utilities.Telemetry;
using FabricObserver.TelemetryLib;
using System.Fabric.Description;
using System.IO;
namespace ClusterObserver
{
public sealed class ClusterObserver : ObserverBase
{
private readonly Uri repairManagerServiceUri = new($"{ObserverConstants.SystemAppName}/RepairManagerService");
private readonly Uri fabricSystemAppUri = new(ObserverConstants.SystemAppName);
private readonly bool ignoreDefaultQueryTimeout;
private HealthState LastKnownClusterAggregatedHealthState
{
get; set;
} = HealthState.Unknown;
///
/// Dictionary that holds node name (key) and tuple of node status, first detected time, last detected time.
///
private Dictionary NodeStatusDictionary
{
get;
}
private Dictionary ApplicationUpgradesCompletedStatus
{
get;
}
public TimeSpan MaxTimeNodeStatusNotOk
{
get; set;
} = TimeSpan.FromHours(2.0);
public bool MonitorRepairJobStatus
{
get; set;
}
public bool MonitorUpgradeStatus
{
get; set;
}
public bool HasClusterUpgradeCompleted
{
get; set;
}
public bool EmitWarningDetails
{
get; set;
}
public ClusterObserver(StatelessServiceContext serviceContext, bool ignoreDefaultQueryTimeout = false)
: base (null, serviceContext)
{
NodeStatusDictionary = [];
ApplicationUpgradesCompletedStatus = [];
this.ignoreDefaultQueryTimeout = ignoreDefaultQueryTimeout;
// Observer Logger setup. This will override the default setup in FabricObserver.Extensibility.dll.
string logFolderBasePath;
string observerLogPath = GetSettingParameterValue(
ClusterObserverConstants.ObserverManagerConfigurationSectionName,
ClusterObserverConstants.ObserverLogPathParameter);
if (!string.IsNullOrWhiteSpace(observerLogPath))
{
logFolderBasePath = observerLogPath;
}
else
{
string logFolderBase = Path.Combine(Environment.CurrentDirectory, "cluster_observer_logs");
logFolderBasePath = logFolderBase;
}
ObserverLogger = new Logger(GetType().Name, logFolderBasePath, 7)
{
EnableETWLogging = IsEtwProviderEnabled
};
}
public override async Task ObserveAsync(CancellationToken token)
{
if (!IsEnabled || (!IsTelemetryEnabled && !IsEtwEnabled)
|| (RunInterval > TimeSpan.Zero && DateTime.Now.Subtract(LastRunDateTime) < RunInterval))
{
return;
}
if (token.IsCancellationRequested)
{
return;
}
// This is the RunAsync SF runtime cancellation token.
Token = token;
SetPropertiesFromApplicationSettings();
await ReportAsync(Token);
LastRunDateTime = DateTime.Now;
}
public override async Task ReportAsync(CancellationToken token)
{
await ReportClusterHealthAsync();
}
private void SetPropertiesFromApplicationSettings()
{
if (TimeSpan.TryParse(
GetSettingParameterValue(ConfigurationSectionName, ClusterObserverConstants.MaxTimeNodeStatusNotOkSettingParameter), out TimeSpan maxTime))
{
MaxTimeNodeStatusNotOk = maxTime;
}
if (bool.TryParse(
GetSettingParameterValue(ConfigurationSectionName, ClusterObserverConstants.EmitHealthWarningEvaluationConfigurationSetting), out bool emitWarnings))
{
EmitWarningDetails = emitWarnings;
}
if (bool.TryParse(
GetSettingParameterValue(ConfigurationSectionName, ClusterObserverConstants.MonitorRepairJobsConfigurationSetting), out bool monitorRepairJobs))
{
MonitorRepairJobStatus = monitorRepairJobs;
}
if (bool.TryParse(
GetSettingParameterValue(ConfigurationSectionName, ClusterObserverConstants.MonitorUpgradesConfigurationSetting), out bool monitorUpgrades))
{
MonitorUpgradeStatus = monitorUpgrades;
}
}
private async Task ReportClusterHealthAsync()
{
try
{
// Monitor node status.
await MonitorNodeStatusAsync(Token, ignoreDefaultQueryTimeout);
// Check for active repairs in the cluster.
if (MonitorRepairJobStatus)
{
var repairsInProgress = await GetRepairTasksCurrentlyProcessingAsync(Token);
string repairState = string.Empty;
if (repairsInProgress?.Count > 0)
{
string ids = string.Empty;
foreach (var repair in repairsInProgress)
{
Token.ThrowIfCancellationRequested();
ids += $"TaskId: {repair.TaskId}{Environment.NewLine}State: {repair.State}{Environment.NewLine}";
}
repairState += $"There are currently one or more Repair Jobs processing in the cluster.{Environment.NewLine}{ids}";
var telemetry = new ClusterTelemetryData()
{
ClusterId = ClusterInformation.ClusterInfoTuple.ClusterId,
EntityType = EntityType.Cluster,
HealthState = HealthState.Ok,
Description = repairState,
Metric = "RepairJobs",
ObserverName = ObserverName
};
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(telemetry, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, telemetry);
}
}
}
// Check cluster upgrade status.
if (MonitorUpgradeStatus)
{
await ReportClusterUpgradeStatusAsync(Token);
}
var clusterQueryDesc = new ClusterHealthQueryDescription
{
EventsFilter = new HealthEventsFilter
{
HealthStateFilterValue = HealthStateFilter.Error | HealthStateFilter.Warning
},
ApplicationsFilter = new ApplicationHealthStatesFilter
{
HealthStateFilterValue = HealthStateFilter.Error | HealthStateFilter.Warning
},
NodesFilter = new NodeHealthStatesFilter
{
HealthStateFilterValue = HealthStateFilter.Error | HealthStateFilter.Warning
},
HealthPolicy = new ClusterHealthPolicy(),
HealthStatisticsFilter = new ClusterHealthStatisticsFilter
{
ExcludeHealthStatistics = false,
IncludeSystemApplicationHealthStatistics = true
}
};
ClusterHealth clusterHealth =
await FabricClientRetryHelper.ExecuteFabricActionWithRetryAsync(
() =>
FabricClientInstance.HealthManager.GetClusterHealthAsync(
clusterQueryDesc,
ConfigurationSettings.AsyncTimeout,
Token),
Token);
// Previous aggregated cluster health state was Error or Warning. It's now Ok.
if (clusterHealth.AggregatedHealthState == HealthState.Ok && (LastKnownClusterAggregatedHealthState == HealthState.Error
|| (EmitWarningDetails && LastKnownClusterAggregatedHealthState == HealthState.Warning)))
{
var telemetry = new ClusterTelemetryData()
{
ClusterId = ClusterInformation.ClusterInfoTuple.ClusterId,
EntityType = EntityType.Cluster,
HealthState = HealthState.Ok,
Description = $"Cluster has recovered from previous {LastKnownClusterAggregatedHealthState} state.",
Metric = "AggregatedClusterHealth",
ObserverName = ObserverName
};
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(telemetry, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, telemetry);
}
// Reset last known Cluster aggregated health state.
LastKnownClusterAggregatedHealthState = HealthState.Ok;
return;
}
// Cluster is healthy. Nothing to do here.
if (clusterHealth.AggregatedHealthState == HealthState.Ok)
{
return;
}
// If in Warning and you are not sending Warning state reports, then end here.
if (!EmitWarningDetails && clusterHealth.AggregatedHealthState == HealthState.Warning)
{
return;
}
// Process node health.
if (clusterHealth.NodeHealthStates != null && clusterHealth.NodeHealthStates.Count > 0)
{
try
{
await ProcessNodeHealthAsync(clusterHealth.NodeHealthStates, Token);
}
catch (Exception e) when (e is FabricException or TimeoutException)
{
#if DEBUG
ObserverLogger.LogInfo($"Handled Exception in ReportClusterHealthAsync::Node:{Environment.NewLine}{e.Message}");
#endif
}
}
// Process Application/Service health.
if (clusterHealth.ApplicationHealthStates != null && clusterHealth.ApplicationHealthStates.Count > 0)
{
foreach (var app in clusterHealth.ApplicationHealthStates)
{
Token.ThrowIfCancellationRequested();
try
{
if (app.ApplicationName.OriginalString == ObserverConstants.SystemAppName)
{
await ProcessApplicationHealthAsync(app, Token);
}
var appHealth =
await FabricClientInstance.HealthManager.GetApplicationHealthAsync(
app.ApplicationName,
ConfigurationSettings.AsyncTimeout,
Token);
if (appHealth.ServiceHealthStates != null && appHealth.ServiceHealthStates.Count > 0)
{
foreach (var service in appHealth.ServiceHealthStates)
{
if (service.AggregatedHealthState == HealthState.Ok)
{
continue;
}
await ProcessServiceHealthAsync(service, Token);
}
}
else
{
await ProcessApplicationHealthAsync(app, Token);
}
}
catch (Exception e) when (e is FabricException or TimeoutException)
{
#if DEBUG
ObserverLogger.LogInfo($"Handled Exception in ReportClusterHealthAsync::Application:{Environment.NewLine}{e.Message}");
#endif
}
}
}
// Track current aggregated health state for use in next run.
LastKnownClusterAggregatedHealthState = clusterHealth.AggregatedHealthState;
}
catch (Exception e) when (e is FabricException or TimeoutException)
{
string msg = $"Handled transient exception in ReportClusterHealthAsync:{Environment.NewLine}{e}";
// Log it locally.
ObserverLogger.LogWarning(msg);
}
catch (Exception e) when (e is not (OperationCanceledException or TaskCanceledException))
{
string msg = $"Unhandled exception in ReportClusterHealthAsync:{Environment.NewLine}{e}";
// Log it locally.
ObserverLogger.LogWarning(msg);
var telemetryData = new ClusterTelemetryData()
{
ClusterId = ClusterInformation.ClusterInfoTuple.ClusterId,
EntityType = EntityType.Cluster,
HealthState = HealthState.Warning,
Description = msg,
ObserverName = ObserverName
};
// Send Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(telemetryData, Token);
}
}
// Emit ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, telemetryData);
}
// Fix the bug.
throw;
}
}
private async Task ReportApplicationUpgradeStatus(Uri appName, CancellationToken Token)
{
ServiceFabricUpgradeEventData appUpgradeInfo =
await FabricClientRetryHelper.ExecuteFabricActionWithRetryAsync(
() => UpgradeChecker.GetApplicationUpgradeDetailsAsync(FabricClientInstance, appName, Token),
Token);
if (appUpgradeInfo?.ApplicationUpgradeProgress == null || Token.IsCancellationRequested)
{
return;
}
if (appUpgradeInfo.ApplicationUpgradeProgress.UpgradeState == ApplicationUpgradeState.Invalid
|| appUpgradeInfo.ApplicationUpgradeProgress.CurrentUpgradeDomainProgress.UpgradeDomainName == "-1")
{
return;
}
if (appUpgradeInfo.ApplicationUpgradeProgress.UpgradeState is ApplicationUpgradeState.RollingForwardCompleted
or ApplicationUpgradeState.RollingBackCompleted)
{
if (ApplicationUpgradesCompletedStatus.ContainsKey(appName.OriginalString))
{
if (ApplicationUpgradesCompletedStatus[appName.OriginalString])
{
return;
}
ApplicationUpgradesCompletedStatus[appName.OriginalString] = true;
}
else
{
ApplicationUpgradesCompletedStatus.Add(appName.OriginalString, true);
}
}
else
{
if (ApplicationUpgradesCompletedStatus.ContainsKey(appName.OriginalString))
{
ApplicationUpgradesCompletedStatus[appName.OriginalString] = false;
}
else
{
ApplicationUpgradesCompletedStatus.Add(appName.OriginalString, false);
}
}
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportApplicationUpgradeStatusAsync(appUpgradeInfo, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, appUpgradeInfo);
}
}
private async Task ReportClusterUpgradeStatusAsync(CancellationToken Token)
{
var eventData =
await FabricClientRetryHelper.ExecuteFabricActionWithRetryAsync(
() => UpgradeChecker.GetClusterUpgradeDetailsAsync(FabricClientInstance, Token), Token);
if (eventData?.FabricUpgradeProgress == null || Token.IsCancellationRequested)
{
return;
}
if (eventData.FabricUpgradeProgress.UpgradeState == FabricUpgradeState.Invalid
|| eventData.FabricUpgradeProgress.CurrentUpgradeDomainProgress?.UpgradeDomainName == "-1")
{
return;
}
if (eventData.FabricUpgradeProgress.UpgradeState is FabricUpgradeState.RollingForwardCompleted
or FabricUpgradeState.RollingBackCompleted)
{
if (HasClusterUpgradeCompleted)
{
return;
}
HasClusterUpgradeCompleted = true;
}
else
{
HasClusterUpgradeCompleted = false;
}
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportClusterUpgradeStatusAsync(eventData, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, eventData);
}
}
private async Task ProcessApplicationHealthAsync(ApplicationHealthState appHealthState, CancellationToken Token)
{
string telemetryDescription = string.Empty;
var appHealth = await FabricClientRetryHelper.ExecuteFabricActionWithRetryAsync(
() => FabricClientInstance.HealthManager.GetApplicationHealthAsync(
appHealthState.ApplicationName,
ignoreDefaultQueryTimeout ? TimeSpan.FromSeconds(1) : ConfigurationSettings.AsyncTimeout,
Token),
Token);
if (appHealth == null)
{
return;
}
Uri appName = appHealthState.ApplicationName;
// Check upgrade status of unhealthy application. Note, this doesn't apply to System applications as they update as part of a platform update.
if (MonitorUpgradeStatus && !appName.Equals(fabricSystemAppUri))
{
await ReportApplicationUpgradeStatus(appName, Token);
}
var appHealthEvents =
appHealth.HealthEvents.Where(
e => e.HealthInformation.HealthState is HealthState.Error or HealthState.Warning).ToList();
if (appHealthEvents.Count == 0)
{
return;
}
foreach (HealthEvent healthEvent in appHealthEvents.OrderByDescending(f => f.SourceUtcTimestamp))
{
if (healthEvent.HealthInformation.HealthState is not HealthState.Error and not HealthState.Warning)
{
continue;
}
// TelemetryData?
if (TryGetTelemetryData(healthEvent, out TelemetryDataBase foTelemetryData))
{
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(foTelemetryData, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, foTelemetryData);
}
}
else // Not from FO/FHProxy.
{
var applicationHealth =
await FabricClientInstance.HealthManager.GetApplicationHealthAsync(
appName,
ConfigurationSettings.AsyncTimeout,
Token);
await ProcessEntityHealthAsync(applicationHealth, Token);
}
}
}
private async Task ProcessServiceHealthAsync(ServiceHealthState serviceHealthState, CancellationToken Token)
{
Uri appName;
Uri serviceName = serviceHealthState.ServiceName;
ServiceHealth serviceHealth =
await FabricClientInstance.HealthManager.GetServiceHealthAsync(serviceName, ConfigurationSettings.AsyncTimeout, Token);
ApplicationNameResult name =
await FabricClientInstance.QueryManager.GetApplicationNameAsync(serviceName, ConfigurationSettings.AsyncTimeout, Token);
appName = name.ApplicationName;
IList healthEvents = serviceHealth.HealthEvents;
if (serviceHealth.PartitionHealthStates.Any(
p => p.AggregatedHealthState is HealthState.Error or HealthState.Warning))
{
var partitionHealthStates =
serviceHealth.PartitionHealthStates.Where(
p => p.AggregatedHealthState is HealthState.Warning or HealthState.Error);
foreach (var partitionHealthState in partitionHealthStates)
{
var partitionHealth =
await FabricClientInstance.HealthManager.GetPartitionHealthAsync(
partitionHealthState.PartitionId,
ConfigurationSettings.AsyncTimeout,
Token);
await ProcessEntityHealthAsync(partitionHealth, Token);
var replicaHealthStates =
partitionHealth.ReplicaHealthStates.Where(
p => p.AggregatedHealthState is HealthState.Warning or HealthState.Error);
if (replicaHealthStates != null && replicaHealthStates.Any())
{
foreach (var replica in replicaHealthStates)
{
var replicaHealth =
await FabricClientInstance.HealthManager.GetReplicaHealthAsync(
partitionHealthState.PartitionId,
replica.Id,
ConfigurationSettings.AsyncTimeout,
Token);
if (replicaHealth != null)
{
var replicaEvents =
replicaHealth.HealthEvents.Where(
h => h.HealthInformation.HealthState is HealthState.Warning
or HealthState.Error).ToList();
if (!replicaEvents.Any(h => JsonHelper.TryDeserializeObject(h.HealthInformation.Description, out _)))
{
await ProcessEntityHealthAsync(replicaHealth, Token);
}
}
}
}
}
}
// From FO/FHProxy or some other service/component created an SF health event.
foreach (HealthEvent healthEvent in healthEvents.OrderByDescending(f => f.SourceUtcTimestamp))
{
if (healthEvent.HealthInformation.HealthState is not HealthState.Error and not HealthState.Warning)
{
continue;
}
// HealthInformation.Description == serialized instance of TelemetryDataBase type?
if (TryGetTelemetryData(healthEvent, out TelemetryDataBase foTelemetryData))
{
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(foTelemetryData, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, foTelemetryData);
}
}
else // Not from FO/FHProxy.
{
var serviceEntityHealth =
await FabricClientInstance.HealthManager.GetServiceHealthAsync(serviceName, ConfigurationSettings.AsyncTimeout, Token);
await ProcessEntityHealthAsync(serviceEntityHealth, Token);
}
}
}
private async Task ProcessNodeHealthAsync(IEnumerable nodeHealthStates, CancellationToken Token)
{
// Check cluster upgrade status. This will be used to help determine if a node in Error is in that state because of a Fabric runtime upgrade.
var clusterUpgradeInfo =
await FabricClientRetryHelper.ExecuteFabricActionWithRetryAsync(
() =>
UpgradeChecker.GetClusterUpgradeDetailsAsync(FabricClientInstance, Token),
Token);
var supportedNodeHealthStates = nodeHealthStates.Where(a => a.AggregatedHealthState is HealthState.Warning or HealthState.Error);
foreach (var node in supportedNodeHealthStates)
{
Token.ThrowIfCancellationRequested();
if (node.AggregatedHealthState == HealthState.Ok || (!EmitWarningDetails && node.AggregatedHealthState == HealthState.Warning))
{
continue;
}
string telemetryDescription = string.Empty;
if (node.AggregatedHealthState == HealthState.Error
&& clusterUpgradeInfo != null
&& clusterUpgradeInfo.FabricUpgradeProgress.CurrentUpgradeDomainProgress.NodeProgressList.Any(n => n.NodeName == node.NodeName))
{
telemetryDescription +=
$"Note: Cluster is currently upgrading in UD {clusterUpgradeInfo.FabricUpgradeProgress.CurrentUpgradeDomainProgress?.UpgradeDomainName}. " +
$"Node {node.NodeName} Error State could be due to this upgrade, which will temporarily take down a node as a " +
$"normal part of the upgrade process.{Environment.NewLine}";
}
var nodeHealth =
await FabricClientRetryHelper.ExecuteFabricActionWithRetryAsync(
() =>
FabricClientInstance.HealthManager.GetNodeHealthAsync(
node.NodeName, ignoreDefaultQueryTimeout ? TimeSpan.FromSeconds(1) : ConfigurationSettings.AsyncTimeout, Token),
Token);
foreach (var nodeHealthEvent in nodeHealth.HealthEvents.Where(ev => ev.HealthInformation.HealthState != HealthState.Ok))
{
Token.ThrowIfCancellationRequested();
var targetNodeList =
await FabricClientRetryHelper.ExecuteFabricActionWithRetryAsync(
() =>
FabricClientInstance.QueryManager.GetNodeListAsync(
node.NodeName,
ignoreDefaultQueryTimeout ? TimeSpan.FromSeconds(1) : ignoreDefaultQueryTimeout ? TimeSpan.FromSeconds(1) : ConfigurationSettings.AsyncTimeout,
Token),
Token);
if (targetNodeList?.Count > 0)
{
Node targetNode = targetNodeList[0];
if (TryGetTelemetryData(nodeHealthEvent, out TelemetryDataBase telemetryData))
{
telemetryData.Description +=
$"{Environment.NewLine}Node Status: {(targetNode != null ? targetNode.NodeStatus.ToString() : string.Empty)}";
// Telemetry (AppInsights/LogAnalytics..).
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(telemetryData, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, telemetryData);
}
}
else if (!string.IsNullOrWhiteSpace(nodeHealthEvent.HealthInformation.Description))
{
telemetryDescription +=
$"{nodeHealthEvent.HealthInformation.Description}{Environment.NewLine}" +
$"Node Status: {(targetNode != null ? targetNode.NodeStatus.ToString() : string.Empty)}";
var telemData = new NodeTelemetryData()
{
ClusterId = ClusterInformation.ClusterInfoTuple.ClusterId,
EntityType = EntityType.Node,
NodeName = targetNode?.NodeName ?? node.NodeName,
NodeType = targetNode?.NodeType,
ObserverName = ObserverName,
HealthState = targetNode?.HealthState ?? node.AggregatedHealthState,
Description = telemetryDescription
};
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(telemData, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, telemData);
}
}
}
// Reset
telemetryDescription = string.Empty;
}
}
}
private async Task ProcessEntityHealthAsync(EntityHealth entityHealth, CancellationToken Token)
{
try
{
if (entityHealth is ApplicationHealth appHealth)
{
if (appHealth.HealthEvents == null || appHealth.HealthEvents.Count == 0)
{
return;
}
foreach (var healthEvent in appHealth.HealthEvents.Where(
e => e.HealthInformation.HealthState is HealthState.Error or HealthState.Warning))
{
var telemetryData = new ServiceTelemetryData
{
ApplicationName = appHealth.ApplicationName.OriginalString,
ClusterId = ClusterInformation.ClusterInfoTuple.ClusterId,
EntityType = EntityType.Application,
Metric = "AppHealth",
Property = healthEvent.HealthInformation.Property,
Description = healthEvent.HealthInformation.Description,
HealthState = healthEvent.HealthInformation.HealthState,
Source = healthEvent.HealthInformation.SourceId,
ObserverName = ObserverName
};
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(telemetryData, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, telemetryData);
}
}
}
else if (entityHealth is DeployedApplicationHealth deployedAppHealth)
{
if (deployedAppHealth.HealthEvents != null || deployedAppHealth.HealthEvents.Count == 0)
{
return;
}
foreach (var healthEvent in deployedAppHealth.HealthEvents.Where(
e => e.HealthInformation.HealthState is HealthState.Error or HealthState.Warning))
{
var telemetryData = new ServiceTelemetryData
{
ApplicationName = deployedAppHealth.ApplicationName.OriginalString,
ClusterId = ClusterInformation.ClusterInfoTuple.ClusterId,
EntityType = EntityType.Application,
Metric = "AppHealth",
NodeName = deployedAppHealth.NodeName,
Property = healthEvent.HealthInformation.Property,
Description = healthEvent.HealthInformation.Description,
HealthState = healthEvent.HealthInformation.HealthState,
Source = healthEvent.HealthInformation.SourceId,
ObserverName = ObserverName
};
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(telemetryData, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, telemetryData);
}
}
}
else if (entityHealth is DeployedServicePackageHealth depServicePackageHealth)
{
if (depServicePackageHealth.HealthEvents == null || depServicePackageHealth.HealthEvents.Count == 0)
{
return;
}
var deployedServicePackages =
await FabricClientInstance.QueryManager.GetDeployedServicePackageListAsync(
depServicePackageHealth.NodeName,
depServicePackageHealth.ApplicationName,
depServicePackageHealth.ServiceManifestName,
ConfigurationSettings.AsyncTimeout,
Token);
if (deployedServicePackages?.Count == 0)
{
return;
}
foreach (var healthEvent in depServicePackageHealth.HealthEvents.Where(
e => e.HealthInformation.HealthState is HealthState.Error or HealthState.Warning))
{
var telemetryData = new ServiceTelemetryData
{
ApplicationName = depServicePackageHealth.ApplicationName.OriginalString,
ClusterId = ClusterInformation.ClusterInfoTuple.ClusterId,
EntityType = EntityType.Service,
Metric = "DeployedServicePkgHealth",
NodeName = depServicePackageHealth.NodeName,
Property = healthEvent.HealthInformation.Property,
Description = healthEvent.HealthInformation.Description,
HealthState = healthEvent.HealthInformation.HealthState,
Source = healthEvent.HealthInformation.SourceId,
ObserverName = ObserverName
};
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(telemetryData, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, telemetryData);
}
}
}
else if (entityHealth is NodeHealth nodeHealth)
{
if (nodeHealth.HealthEvents == null || nodeHealth.HealthEvents.Count == 0)
{
return;
}
foreach (var healthEvent in nodeHealth.HealthEvents.Where(
e => e.HealthInformation.HealthState is HealthState.Error or HealthState.Warning))
{
var telemetryData = new NodeTelemetryData
{
ClusterId = ClusterInformation.ClusterInfoTuple.ClusterId,
EntityType = EntityType.Node,
Metric = "NodeHealth",
NodeName = nodeHealth.NodeName,
Property = healthEvent.HealthInformation.Property,
Description = healthEvent.HealthInformation.Description,
HealthState = healthEvent.HealthInformation.HealthState,
Source = healthEvent.HealthInformation.SourceId,
ObserverName = ClusterObserverConstants.ClusterObserverName
};
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(telemetryData, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, telemetryData);
}
}
}
else if (entityHealth is PartitionHealth partitionHealth)
{
if (partitionHealth.HealthEvents == null || partitionHealth.HealthEvents.Count == 0)
{
return;
}
foreach (var healthEvent in partitionHealth.HealthEvents.Where(
e => e.HealthInformation.HealthState is HealthState.Error or HealthState.Warning))
{
var telemetryData = new ServiceTelemetryData
{
ClusterId = ClusterInformation.ClusterInfoTuple.ClusterId,
EntityType = EntityType.Partition,
Metric = "PartitionHealth",
Property = healthEvent.HealthInformation.Property,
PartitionId = partitionHealth.PartitionId.ToString(),
Description = healthEvent.HealthInformation.Description,
HealthState = healthEvent.HealthInformation.HealthState,
Source = healthEvent.HealthInformation.SourceId,
ObserverName = ObserverName
};
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(telemetryData, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, telemetryData);
}
}
}
else if (entityHealth is ReplicaHealth replicaHealth)
{
if (replicaHealth.HealthEvents == null || replicaHealth.HealthEvents.Count == 0)
{
return;
}
var replicaList =
await FabricClientInstance.QueryManager.GetReplicaListAsync(
replicaHealth.PartitionId,
replicaHealth.Id,
ConfigurationSettings.AsyncTimeout,
Token);
if (replicaList?.Count == 0)
{
return;
}
string serviceKind = replicaList[0].ServiceKind.ToString();
string nodeName = replicaList[0].NodeName;
foreach (var healthEvent in replicaHealth.HealthEvents.Where(
e => e.HealthInformation.HealthState is HealthState.Error or HealthState.Warning))
{
var telemetryData = new ServiceTelemetryData
{
ClusterId = ClusterInformation.ClusterInfoTuple.ClusterId,
EntityType = EntityType.Replica,
Metric = "ReplicaHealth",
NodeName = nodeName,
Property = healthEvent.HealthInformation.Property,
PartitionId = replicaHealth.PartitionId.ToString(),
ReplicaId = replicaHealth.Id,
Description = healthEvent.HealthInformation.Description,
HealthState = healthEvent.HealthInformation.HealthState,
ServiceKind = serviceKind,
Source = healthEvent.HealthInformation.SourceId,
ObserverName = ObserverName
};
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(telemetryData, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, telemetryData);
}
}
}
else if (entityHealth is ServiceHealth serviceHealth)
{
ApplicationNameResult appNameResult =
await FabricClientInstance.QueryManager.GetApplicationNameAsync(serviceHealth.ServiceName, ConfigurationSettings.AsyncTimeout, Token);
if (appNameResult == null)
{
return;
}
ServiceList serviceList =
await FabricClientInstance.QueryManager.GetServiceListAsync(appNameResult.ApplicationName, serviceHealth.ServiceName, ConfigurationSettings.AsyncTimeout, Token);
if (serviceList == null || serviceList.Count == 0)
{
return;
}
if (serviceHealth.HealthEvents == null || serviceHealth.HealthEvents.Count == 0)
{
return;
}
foreach (var healthEvent in serviceHealth.HealthEvents.Where(
e => e.HealthInformation.HealthState is HealthState.Error or HealthState.Warning))
{
var telemetryData = new ServiceTelemetryData
{
ApplicationName = appNameResult?.ApplicationName.OriginalString,
ClusterId = ClusterInformation.ClusterInfoTuple.ClusterId,
EntityType = EntityType.Service,
Metric = "ServiceHealth",
Property = healthEvent.HealthInformation.Property,
ServiceName = serviceHealth.ServiceName.OriginalString,
ServiceKind = serviceList[0].ServiceKind.ToString(),
ServiceTypeName = serviceList[0].ServiceTypeName,
ServiceTypeVersion = serviceList[0].ServiceManifestVersion,
Description = healthEvent.HealthInformation.Description,
HealthState = healthEvent.HealthInformation.HealthState,
Source = healthEvent.HealthInformation.SourceId,
ObserverName = ObserverName
};
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(telemetryData, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, telemetryData);
}
}
}
}
catch (FabricException fe)
{
ObserverLogger.LogWarning($"Exception in ProcessGenericHealthEntityAsync: {fe.Message}.");
}
}
private async Task MonitorNodeStatusAsync(CancellationToken Token, bool isTest = false)
{
// If a node's NodeStatus is Disabling, Disabled, or Down
// for at or above the specified maximum time (in ApplicationManifest.xml, see MaxTimeNodeStatusNotOk),
// then CO will emit a Warning signal.
var nodeList = await FabricClientRetryHelper.ExecuteFabricActionWithRetryAsync(
() =>
FabricClientInstance.QueryManager.GetNodeListAsync(
null,
isTest ? TimeSpan.FromSeconds(1) : ConfigurationSettings.AsyncTimeout,
Token),
Token);
// Are any of the nodes that were previously in non-Up status, now Up?
if (NodeStatusDictionary.Count > 0)
{
foreach (var nodeDictItem in NodeStatusDictionary)
{
if (!nodeList.Any(n => n.NodeName == nodeDictItem.Key && n.NodeStatus == NodeStatus.Up))
{
continue;
}
var telemetry = new NodeTelemetryData()
{
ClusterId = ClusterInformation.ClusterInfoTuple.ClusterId,
HealthState = HealthState.Ok,
Description = $"{nodeDictItem.Key} is now Up.",
Metric = "NodeStatus",
NodeName = nodeDictItem.Key,
NodeType = nodeList.Any(n => n.NodeName == nodeDictItem.Key) ? nodeList.First(n => n.NodeName == nodeDictItem.Key).NodeType : null,
ObserverName = ObserverName,
Value = 0
};
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(telemetry, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, telemetry);
}
// Clear dictionary entry.
NodeStatusDictionary.Remove(nodeDictItem.Key);
}
}
if (nodeList.Any(n => n.NodeStatus != NodeStatus.Up))
{
var filteredList = nodeList.Where(
node => node.NodeStatus is NodeStatus.Disabled
or NodeStatus.Disabling
or NodeStatus.Down);
foreach (var node in filteredList)
{
if (!NodeStatusDictionary.ContainsKey(node.NodeName))
{
NodeStatusDictionary.Add(node.NodeName, (node.NodeStatus, DateTime.Now, DateTime.Now));
}
else
{
if (NodeStatusDictionary.TryGetValue(node.NodeName, out var tuple))
{
NodeStatusDictionary[node.NodeName] = (node.NodeStatus, tuple.FirstDetectedTime, DateTime.Now);
}
}
// Nodes stuck in Disabled/Disabling/Down?
if (NodeStatusDictionary.Any(
dict => dict.Key == node.NodeName
&& dict.Value.LastDetectedTime.Subtract(dict.Value.FirstDetectedTime)
>= MaxTimeNodeStatusNotOk))
{
var kvp = NodeStatusDictionary.FirstOrDefault(
dict => dict.Key == node.NodeName
&& dict.Value.LastDetectedTime.Subtract(dict.Value.FirstDetectedTime)
>= MaxTimeNodeStatusNotOk);
var message =
$"Node {kvp.Key} has been {kvp.Value.NodeStatus} " +
$"for {Math.Round(kvp.Value.LastDetectedTime.Subtract(kvp.Value.FirstDetectedTime).TotalHours, 2)} hours.{Environment.NewLine}";
var telemetry = new NodeTelemetryData()
{
ClusterId = ClusterInformation.ClusterInfoTuple.ClusterId,
HealthState = HealthState.Warning,
Description = message,
Metric = "NodeStatus",
NodeName = kvp.Key,
NodeType = nodeList.Any(n => n.NodeName == kvp.Key) ? nodeList.First(n => n.NodeName == kvp.Key).NodeType : null,
ObserverName = ObserverName,
Value = 1,
};
// Telemetry.
if (IsTelemetryEnabled)
{
if (TelemetryClient != null)
{
await TelemetryClient.ReportHealthAsync(telemetry, Token);
}
}
// ETW.
if (IsEtwEnabled)
{
ObserverLogger.LogEtw(ClusterObserverConstants.ClusterObserverETWEventName, telemetry);
}
}
}
}
}
private static bool TryGetTelemetryData(HealthEvent healthEvent, out TelemetryDataBase telemetryData)
{
if (JsonHelper.TryDeserializeObject(healthEvent.HealthInformation.Description, out TelemetryData telemData))
{
switch (telemData.ObserverName)
{
case ObserverConstants.AppObserverName:
if (JsonHelper.TryDeserializeObject(healthEvent.HealthInformation.Description, out ServiceTelemetryData serviceTelemetryData))
{
telemetryData = serviceTelemetryData;
return true;
}
break;
case ObserverConstants.ContainerObserverName:
if (JsonHelper.TryDeserializeObject(healthEvent.HealthInformation.Description, out ContainerTelemetryData containerTelemetryData))
{
telemetryData = containerTelemetryData;
return true;
}
break;
case ObserverConstants.FabricSystemObserverName:
if (JsonHelper.TryDeserializeObject(healthEvent.HealthInformation.Description, out SystemServiceTelemetryData systemServiceTelemetryData))
{
telemetryData = systemServiceTelemetryData;
return true;
}
break;
case ObserverConstants.DiskObserverName:
// enforce strict type member handling in Json deserialization as this type has specific properties that are unique to it.
if (JsonHelper.TryDeserializeObject(healthEvent.HealthInformation.Description, out DiskTelemetryData diskTelemetryData, treatMissingMembersAsError: true))
{
telemetryData = diskTelemetryData;
return true;
}
break;
case ObserverConstants.NodeObserverName:
if (JsonHelper.TryDeserializeObject(healthEvent.HealthInformation.Description, out NodeTelemetryData nodeTelemetryData))
{
telemetryData = nodeTelemetryData;
return true;
}
break;
default:
telemetryData = telemData;
return true;
}
}
telemetryData = null;
return false;
}
private async Task IsRepairManagerDeployedAsync(CancellationToken cancellationToken)
{
try
{
var serviceList = await FabricClientRetryHelper.ExecuteFabricActionWithRetryAsync(
() => FabricClientInstance.QueryManager.GetServiceListAsync(
fabricSystemAppUri,
repairManagerServiceUri,
ignoreDefaultQueryTimeout ? TimeSpan.FromSeconds(1) : ConfigurationSettings.AsyncTimeout,
cancellationToken),
cancellationToken);
return serviceList?.Count > 0;
}
catch (Exception e) when (e is FabricException or TimeoutException)
{
return false;
}
}
private async Task GetRepairTasksCurrentlyProcessingAsync(CancellationToken cancellationToken)
{
if (!await IsRepairManagerDeployedAsync(cancellationToken))
{
return null;
}
try
{
var repairTasks = await FabricClientRetryHelper.ExecuteFabricActionWithRetryAsync(
() => FabricClientInstance.RepairManager.GetRepairTaskListAsync(
null,
RepairTaskStateFilter.Active |
RepairTaskStateFilter.Approved |
RepairTaskStateFilter.Executing,
null,
ignoreDefaultQueryTimeout ? TimeSpan.FromSeconds(1) : ConfigurationSettings.AsyncTimeout,
cancellationToken),
cancellationToken);
return repairTasks;
}
catch (Exception e) when (e is FabricException or TimeoutException)
{
}
catch (Exception e) when (e is not (OperationCanceledException or TaskCanceledException or OutOfMemoryException))
{
ObserverLogger.LogWarning(e.ToString());
}
return null;
}
}
}