Make all monitors into go routines

This commit is contained in:
Nont 2023-10-17 14:34:59 -05:00 коммит произвёл Caden Marchese
Родитель bfd8f6497d
Коммит c6285803fa
5 изменённых файлов: 91 добавлений и 48 удалений

Просмотреть файл

@ -9,6 +9,7 @@ import (
"net/http"
"net/netip"
"strings"
"sync"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm"
@ -19,41 +20,40 @@ import (
"github.com/Azure/ARO-RP/pkg/metrics"
"github.com/Azure/ARO-RP/pkg/monitor/dimension"
"github.com/Azure/ARO-RP/pkg/monitor/emitter"
"github.com/Azure/ARO-RP/pkg/monitor/monitoring"
)
const (
MetricFailedNSGMonitorCreation = "monitor.preconfigurednsg.failedmonitorcreation"
MetricInvalidDenyRule = "monitor.preconfigurednsg.invaliddenyrule"
MetricSubnetAccessResponseCode = "monitor.preconfigurednsg.subnetaccessresponsecode"
MetricSubnetAccessForbidden = "monitor.preconfigurednsg.subnetaccessforbidden"
MetricUnsuccessfulFPCreation = "monitor.preconfigurednsg.fpcreationunsuccessful"
MetricNSGMonitoringTimedOut = "monitor.preconfigurednsg.monitoringtimedout"
MetricSubnetAccessResponseCode = "monitor.preconfigurednsg.subnetaccessresponsecode"
)
var expandNSG = "NetworkSecurityGroup"
var _ monitoring.Monitor = (*NSGMonitor)(nil)
// NSGMonitor is responsible for performing NSG rule validations when preconfiguredNSG is enabled
type NSGMonitor struct {
log *logrus.Entry
emitter metrics.Emitter
oc *api.OpenShiftCluster
wg *sync.WaitGroup
subnetClient *armnetwork.SubnetsClient
done chan error
dims map[string]string
}
func (n *NSGMonitor) Done() <-chan error {
return n.done
}
func NewNSGMonitor(log *logrus.Entry, oc *api.OpenShiftCluster, subscriptionID string, subnetClient *armnetwork.SubnetsClient, emitter metrics.Emitter) *NSGMonitor {
func NewNSGMonitor(log *logrus.Entry, oc *api.OpenShiftCluster, subscriptionID string, subnetClient *armnetwork.SubnetsClient, emitter metrics.Emitter, wg *sync.WaitGroup) *NSGMonitor {
return &NSGMonitor{
log: log,
emitter: emitter,
oc: oc,
subnetClient: subnetClient,
done: make(chan error),
wg: wg,
dims: map[string]string{
dimension.ResourceID: oc.ID,
@ -107,12 +107,12 @@ func (n *NSGMonitor) toSubnetConfig(ctx context.Context, subnetID string) (subne
return subnetNSGConfig{prefixes, subnet.Properties.NetworkSecurityGroup}, nil
}
func (n *NSGMonitor) Monitor(ctx context.Context) {
func (n *NSGMonitor) Monitor(ctx context.Context) []error {
defer n.wg.Done()
masterSubnet, err := n.toSubnetConfig(ctx, n.oc.Properties.MasterProfile.SubnetID)
if err != nil {
// FP has no access to the subnet
n.done <- err
return
return []error{err}
}
// need this to get the right workerProfiles
@ -129,8 +129,7 @@ func (n *NSGMonitor) Monitor(ctx context.Context) {
s, err := n.toSubnetConfig(ctx, wp.SubnetID)
if err != nil {
// FP has no access to the subnet
n.done <- err
return
return []error{err}
}
workerSubnets = append(workerSubnets, s)
workerPrefixes = append(workerPrefixes, s.prefix...)
@ -177,5 +176,5 @@ func (n *NSGMonitor) Monitor(ctx context.Context) {
}
}
}
n.done <- nil
return []error{}
}

Просмотреть файл

@ -6,6 +6,7 @@ package cluster
import (
"context"
"net/http"
"sync"
"github.com/Azure/go-autorest/autorest/azure"
configv1 "github.com/openshift/api/config/v1"
@ -24,11 +25,14 @@ import (
"github.com/Azure/ARO-RP/pkg/metrics"
"github.com/Azure/ARO-RP/pkg/monitor/dimension"
"github.com/Azure/ARO-RP/pkg/monitor/emitter"
"github.com/Azure/ARO-RP/pkg/monitor/monitoring"
arov1alpha1 "github.com/Azure/ARO-RP/pkg/operator/apis/aro.openshift.io/v1alpha1"
aroclient "github.com/Azure/ARO-RP/pkg/operator/clientset/versioned"
"github.com/Azure/ARO-RP/pkg/util/steps"
)
var _ monitoring.Monitor = (*Monitor)(nil)
type Monitor struct {
log *logrus.Entry
hourlyRun bool
@ -55,9 +59,11 @@ type Monitor struct {
ns *corev1.NodeList
arodl *appsv1.DeploymentList
}
wg *sync.WaitGroup
}
func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftCluster, m metrics.Emitter, hiveRestConfig *rest.Config, hourlyRun bool) (*Monitor, error) {
func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftCluster, m metrics.Emitter, hiveRestConfig *rest.Config, hourlyRun bool, wg *sync.WaitGroup) (*Monitor, error) {
r, err := azure.ParseResourceID(oc.ID)
if err != nil {
return nil, err
@ -129,6 +135,7 @@ func NewMonitor(log *logrus.Entry, restConfig *rest.Config, oc *api.OpenShiftClu
m: m,
ocpclientset: ocpclientset,
hiveclientset: hiveclientset,
wg: wg,
}, nil
}
@ -154,6 +161,8 @@ func getHiveClientSet(hiveRestConfig *rest.Config) (client.Client, error) {
// Monitor checks the API server health of a cluster
func (mon *Monitor) Monitor(ctx context.Context) (errs []error) {
defer mon.wg.Done()
mon.log.Debug("monitoring")
if mon.hourlyRun {

Просмотреть файл

@ -0,0 +1,21 @@
package monitoring
import (
"context"
"sync"
)
// Monitor represents a consistent interface for different monitoring components
type Monitor interface {
Monitor(context.Context) []error
}
// noOpMonitor is a no operation monitor
type NoOpMonitor struct {
Wg *sync.WaitGroup
}
func (no *NoOpMonitor) Monitor(context.Context) []error {
no.Wg.Done()
return []error{}
}

Просмотреть файл

@ -7,6 +7,7 @@ import (
"context"
"fmt"
"reflect"
"sync"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2"
@ -15,9 +16,11 @@ import (
"k8s.io/client-go/rest"
"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/metrics"
"github.com/Azure/ARO-RP/pkg/monitor/azure/nsg"
"github.com/Azure/ARO-RP/pkg/monitor/cluster"
"github.com/Azure/ARO-RP/pkg/monitor/dimension"
"github.com/Azure/ARO-RP/pkg/monitor/monitoring"
utillog "github.com/Azure/ARO-RP/pkg/util/log"
"github.com/Azure/ARO-RP/pkg/util/recover"
"github.com/Azure/ARO-RP/pkg/util/restconfig"
@ -230,6 +233,23 @@ out:
log.Debug("stopping monitoring")
}
func (mon *monitor) newNSGMonitor(log *logrus.Entry, oc *api.OpenShiftCluster, subscriptionID, tenantID string, e metrics.Emitter, dims map[string]string, wg *sync.WaitGroup) monitoring.Monitor {
token, err := mon.env.FPNewClientCertificateCredential(tenantID)
if err != nil {
log.Error("Unable to create FP Authorizer for NSG monitoring.", err)
mon.clusterm.EmitGauge(nsg.MetricFailedNSGMonitorCreation, int64(1), dims)
return &monitoring.NoOpMonitor{Wg: wg}
}
client, err := armnetwork.NewSubnetsClient(subscriptionID, token, nil)
if err != nil {
log.Error("Unable to create the subnet client for NSG monitoring", err)
mon.clusterm.EmitGauge(nsg.MetricFailedNSGMonitorCreation, int64(1), dims)
return &monitoring.NoOpMonitor{Wg: wg}
}
return nsg.NewNSGMonitor(log, oc, subscriptionID, client, e, wg)
}
// workOne checks the API server health of a cluster
func (mon *monitor) workOne(ctx context.Context, log *logrus.Entry, doc *api.OpenShiftClusterDocument, sub *api.SubscriptionDocument, hourlyRun bool) {
ctx, cancel := context.WithTimeout(ctx, 50*time.Second)
@ -255,27 +275,15 @@ func (mon *monitor) workOne(ctx context.Context, log *logrus.Entry, doc *api.Ope
dimension.SubscriptionID: sub.ID,
}
var nsgMon *nsg.NSGMonitor
var monitors []monitoring.Monitor
var wg sync.WaitGroup
if doc.OpenShiftCluster.Properties.NetworkProfile.PreconfiguredNSG == api.PreconfiguredNSGEnabled && hourlyRun {
token, err := mon.env.FPNewClientCertificateCredential(sub.Subscription.Properties.TenantID)
if err != nil {
// Not stopping here just because we can't monitor NSG
log.Error("Unable to create FP Authorizer for NSG monitoring.", err)
mon.m.EmitGauge(nsg.MetricUnsuccessfulFPCreation, int64(1), dims)
} else {
client, err := armnetwork.NewSubnetsClient(sub.ID, token, nil)
if err != nil {
log.Error("Unable to create the subnet client for NSG monitoring", err)
} else {
nsgMon = nsg.NewNSGMonitor(log, doc.OpenShiftCluster, sub.ID, client, mon.clusterm)
}
}
if nsgMon != nil {
go nsgMon.Monitor(ctx)
}
nsgMon := mon.newNSGMonitor(log, doc.OpenShiftCluster, sub.ID, sub.Subscription.Properties.TenantID, mon.clusterm, dims, &wg)
monitors = append(monitors, nsgMon)
}
c, err := cluster.NewMonitor(log, restConfig, doc.OpenShiftCluster, mon.clusterm, hiveRestConfig, hourlyRun)
c, err := cluster.NewMonitor(log, restConfig, doc.OpenShiftCluster, mon.clusterm, hiveRestConfig, hourlyRun, &wg)
if err != nil {
log.Error(err)
mon.m.EmitGauge("monitor.cluster.failedworker", 1, map[string]string{
@ -284,18 +292,23 @@ func (mon *monitor) workOne(ctx context.Context, log *logrus.Entry, doc *api.Ope
return
}
c.Monitor(ctx)
monitors = append(monitors, c)
allJobsDone := make(chan bool)
go execute(ctx, allJobsDone, &wg, monitors)
// if doing nsg monitoring, wait until timed out
if nsgMon != nil {
select {
case err := <-nsgMon.Done():
if err != nil {
log.Error("Error occurred during NSG monitoring", err)
}
case <-ctx.Done():
log.Infof("NSG Monitoring processing for cluster %s has timed out", doc.OpenShiftCluster.ID)
mon.m.EmitGauge(nsg.MetricNSGMonitoringTimedOut, int64(1), dims)
}
select {
case <-allJobsDone:
case <-ctx.Done():
log.Infof("The monitoring process for cluster %s has timed out.", doc.OpenShiftCluster.ID)
mon.m.EmitGauge("monitor.main.timedout", int64(1), dims)
}
}
func execute(ctx context.Context, done chan<- bool, wg *sync.WaitGroup, monitors []monitoring.Monitor) {
for _, monitor := range monitors {
wg.Add(1)
go monitor.Monitor(ctx)
}
wg.Wait()
done <- true
}

Просмотреть файл

@ -5,6 +5,7 @@ package e2e
import (
"context"
"sync"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@ -20,7 +21,7 @@ var _ = Describe("Monitor", func() {
By("creating a new monitor instance for the test cluster")
mon, err := cluster.NewMonitor(log, clients.RestConfig, &api.OpenShiftCluster{
ID: resourceIDFromEnv(),
}, &noop.Noop{}, nil, true)
}, &noop.Noop{}, nil, true, new(sync.WaitGroup))
Expect(err).NotTo(HaveOccurred())
By("running the monitor once")