зеркало из https://github.com/Azure/ARO-RP.git
try and fix e2e race condition
This commit is contained in:
Родитель
ac26d1918e
Коммит
1cb1a57b23
|
@ -34,7 +34,7 @@ var _ = Describe("MIMO Actuator", Ordered, func() {
|
||||||
var manifests database.MaintenanceManifests
|
var manifests database.MaintenanceManifests
|
||||||
var manifestsClient *cosmosdb.FakeMaintenanceManifestDocumentClient
|
var manifestsClient *cosmosdb.FakeMaintenanceManifestDocumentClient
|
||||||
var clusters database.OpenShiftClusters
|
var clusters database.OpenShiftClusters
|
||||||
//var clustersClient cosmosdb.OpenShiftClusterDocumentClient
|
var clustersClient *cosmosdb.FakeOpenShiftClusterDocumentClient
|
||||||
|
|
||||||
var a Actuator
|
var a Actuator
|
||||||
|
|
||||||
|
@ -75,7 +75,7 @@ var _ = Describe("MIMO Actuator", Ordered, func() {
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
now := func() time.Time { return time.Unix(120, 0) }
|
now := func() time.Time { return time.Unix(120, 0) }
|
||||||
manifests, manifestsClient = testdatabase.NewFakeMaintenanceManifests(now)
|
manifests, manifestsClient = testdatabase.NewFakeMaintenanceManifests(now)
|
||||||
clusters, _ = testdatabase.NewFakeOpenShiftClusters()
|
clusters, clustersClient = testdatabase.NewFakeOpenShiftClusters()
|
||||||
|
|
||||||
a = &actuator{
|
a = &actuator{
|
||||||
log: log,
|
log: log,
|
||||||
|
@ -105,6 +105,9 @@ var _ = Describe("MIMO Actuator", Ordered, func() {
|
||||||
Key: strings.ToLower(clusterResourceID),
|
Key: strings.ToLower(clusterResourceID),
|
||||||
OpenShiftCluster: &api.OpenShiftCluster{
|
OpenShiftCluster: &api.OpenShiftCluster{
|
||||||
ID: clusterResourceID,
|
ID: clusterResourceID,
|
||||||
|
Properties: api.OpenShiftClusterProperties{
|
||||||
|
ProvisioningState: api.ProvisioningStateSucceeded,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -130,6 +133,15 @@ var _ = Describe("MIMO Actuator", Ordered, func() {
|
||||||
RunAfter: 0,
|
RunAfter: 0,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
checker.AddOpenShiftClusterDocuments(&api.OpenShiftClusterDocument{
|
||||||
|
Key: strings.ToLower(clusterResourceID),
|
||||||
|
OpenShiftCluster: &api.OpenShiftCluster{
|
||||||
|
ID: clusterResourceID,
|
||||||
|
Properties: api.OpenShiftClusterProperties{
|
||||||
|
ProvisioningState: api.ProvisioningStateSucceeded,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
It("expires them", func() {
|
It("expires them", func() {
|
||||||
|
@ -139,6 +151,9 @@ var _ = Describe("MIMO Actuator", Ordered, func() {
|
||||||
|
|
||||||
errs := checker.CheckMaintenanceManifests(manifestsClient)
|
errs := checker.CheckMaintenanceManifests(manifestsClient)
|
||||||
Expect(errs).To(BeNil(), fmt.Sprintf("%v", errs))
|
Expect(errs).To(BeNil(), fmt.Sprintf("%v", errs))
|
||||||
|
|
||||||
|
errs = checker.CheckOpenShiftClusters(clustersClient)
|
||||||
|
Expect(errs).To(BeNil(), fmt.Sprintf("%v", errs))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -151,6 +166,10 @@ var _ = Describe("MIMO Actuator", Ordered, func() {
|
||||||
Key: strings.ToLower(clusterResourceID),
|
Key: strings.ToLower(clusterResourceID),
|
||||||
OpenShiftCluster: &api.OpenShiftCluster{
|
OpenShiftCluster: &api.OpenShiftCluster{
|
||||||
ID: clusterResourceID,
|
ID: clusterResourceID,
|
||||||
|
Properties: api.OpenShiftClusterProperties{
|
||||||
|
ProvisioningState: api.ProvisioningStateSucceeded,
|
||||||
|
MaintenanceState: api.MaintenanceStateNone,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -179,6 +198,16 @@ var _ = Describe("MIMO Actuator", Ordered, func() {
|
||||||
RunAfter: 0,
|
RunAfter: 0,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
checker.AddOpenShiftClusterDocuments(&api.OpenShiftClusterDocument{
|
||||||
|
Key: strings.ToLower(clusterResourceID),
|
||||||
|
OpenShiftCluster: &api.OpenShiftCluster{
|
||||||
|
ID: clusterResourceID,
|
||||||
|
Properties: api.OpenShiftClusterProperties{
|
||||||
|
ProvisioningState: api.ProvisioningStateSucceeded,
|
||||||
|
MaintenanceState: api.MaintenanceStateNone,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
It("runs them", func() {
|
It("runs them", func() {
|
||||||
|
@ -198,6 +227,9 @@ var _ = Describe("MIMO Actuator", Ordered, func() {
|
||||||
|
|
||||||
errs := checker.CheckMaintenanceManifests(manifestsClient)
|
errs := checker.CheckMaintenanceManifests(manifestsClient)
|
||||||
Expect(errs).To(BeNil(), fmt.Sprintf("%v", errs))
|
Expect(errs).To(BeNil(), fmt.Sprintf("%v", errs))
|
||||||
|
|
||||||
|
errs = checker.CheckOpenShiftClusters(clustersClient)
|
||||||
|
Expect(errs).To(BeNil(), fmt.Sprintf("%v", errs))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -125,6 +125,8 @@ func (a *actuator) Process(ctx context.Context) (bool, error) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
a.log.Infof("Processing %d manifests", len(manifestsToAction))
|
||||||
|
|
||||||
// Dequeue the document
|
// Dequeue the document
|
||||||
oc, err := a.oc.Get(ctx, a.clusterResourceID)
|
oc, err := a.oc.Get(ctx, a.clusterResourceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -136,12 +138,10 @@ func (a *actuator) Process(ctx context.Context) (bool, error) {
|
||||||
return false, fmt.Errorf("failed dequeuing cluster document: %w", err) // This will include StatusPreconditionFaileds
|
return false, fmt.Errorf("failed dequeuing cluster document: %w", err) // This will include StatusPreconditionFaileds
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save these so we can reset them after
|
|
||||||
previousProvisioningState := oc.OpenShiftCluster.Properties.ProvisioningState
|
|
||||||
previousFailedProvisioningState := oc.OpenShiftCluster.Properties.FailedProvisioningState
|
|
||||||
|
|
||||||
// Mark the maintenance state as unplanned and put it in AdminUpdating
|
// Mark the maintenance state as unplanned and put it in AdminUpdating
|
||||||
|
a.log.Infof("Marking cluster as in AdminUpdating")
|
||||||
oc, err = a.oc.PatchWithLease(ctx, a.clusterResourceID, func(oscd *api.OpenShiftClusterDocument) error {
|
oc, err = a.oc.PatchWithLease(ctx, a.clusterResourceID, func(oscd *api.OpenShiftClusterDocument) error {
|
||||||
|
oscd.OpenShiftCluster.Properties.LastProvisioningState = oscd.OpenShiftCluster.Properties.ProvisioningState
|
||||||
oscd.OpenShiftCluster.Properties.ProvisioningState = api.ProvisioningStateAdminUpdating
|
oscd.OpenShiftCluster.Properties.ProvisioningState = api.ProvisioningStateAdminUpdating
|
||||||
oscd.OpenShiftCluster.Properties.MaintenanceState = api.MaintenanceStateUnplanned
|
oscd.OpenShiftCluster.Properties.MaintenanceState = api.MaintenanceStateUnplanned
|
||||||
return nil
|
return nil
|
||||||
|
@ -151,7 +151,7 @@ func (a *actuator) Process(ctx context.Context) (bool, error) {
|
||||||
a.log.Error(err)
|
a.log.Error(err)
|
||||||
|
|
||||||
// attempt to dequeue the document, for what it's worth
|
// attempt to dequeue the document, for what it's worth
|
||||||
_, leaseErr := a.oc.EndLease(ctx, a.clusterResourceID, previousProvisioningState, previousFailedProvisioningState, nil)
|
_, leaseErr := a.oc.EndLease(ctx, a.clusterResourceID, oc.OpenShiftCluster.Properties.LastProvisioningState, oc.OpenShiftCluster.Properties.FailedProvisioningState, nil)
|
||||||
if leaseErr != nil {
|
if leaseErr != nil {
|
||||||
return false, fmt.Errorf("failed ending lease early on cluster document: %w", leaseErr)
|
return false, fmt.Errorf("failed ending lease early on cluster document: %w", leaseErr)
|
||||||
}
|
}
|
||||||
|
@ -162,12 +162,11 @@ func (a *actuator) Process(ctx context.Context) (bool, error) {
|
||||||
|
|
||||||
// Execute on the manifests we want to action
|
// Execute on the manifests we want to action
|
||||||
for _, doc := range manifestsToAction {
|
for _, doc := range manifestsToAction {
|
||||||
// here
|
taskLog := a.log.WithFields(logrus.Fields{
|
||||||
f, ok := a.tasks[doc.MaintenanceManifest.MaintenanceTaskID]
|
"manifestID": doc.ID,
|
||||||
if !ok {
|
"taskID": doc.MaintenanceManifest.MaintenanceTaskID,
|
||||||
a.log.Infof("not found %v", doc.MaintenanceManifest.MaintenanceTaskID)
|
})
|
||||||
continue
|
taskLog.Info("begin processing manifest")
|
||||||
}
|
|
||||||
|
|
||||||
// Attempt a dequeue
|
// Attempt a dequeue
|
||||||
doc, err = a.mmf.Lease(ctx, a.clusterResourceID, doc.ID)
|
doc, err = a.mmf.Lease(ctx, a.clusterResourceID, doc.ID)
|
||||||
|
@ -177,9 +176,23 @@ func (a *actuator) Process(ctx context.Context) (bool, error) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// error if we don't know what this task is, then continue
|
||||||
|
f, ok := a.tasks[doc.MaintenanceManifest.MaintenanceTaskID]
|
||||||
|
if !ok {
|
||||||
|
a.log.Errorf("not found %v", doc.MaintenanceManifest.MaintenanceTaskID)
|
||||||
|
msg := "task ID not registered"
|
||||||
|
_, err = a.mmf.EndLease(ctx, doc.ClusterResourceID, doc.ID, api.MaintenanceManifestStateFailed, &msg)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Error(fmt.Errorf("failed ending lease early on manifest: %w", err))
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
var state api.MaintenanceManifestState
|
var state api.MaintenanceManifestState
|
||||||
var msg string
|
var msg string
|
||||||
|
|
||||||
|
taskLog.Info("executing manifest")
|
||||||
|
|
||||||
// Perform the task with a timeout
|
// Perform the task with a timeout
|
||||||
err = taskContext.RunInTimeout(time.Minute*60, func() error {
|
err = taskContext.RunInTimeout(time.Minute*60, func() error {
|
||||||
innerErr := f(taskContext, doc, oc)
|
innerErr := f(taskContext, doc, oc)
|
||||||
|
@ -193,32 +206,36 @@ func (a *actuator) Process(ctx context.Context) (bool, error) {
|
||||||
msg = taskContext.GetResultMessage()
|
msg = taskContext.GetResultMessage()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Error(err)
|
|
||||||
|
|
||||||
if doc.Dequeues >= maxDequeueCount {
|
if doc.Dequeues >= maxDequeueCount {
|
||||||
msg = fmt.Sprintf("did not succeed after %d times, failing -- %s", doc.Dequeues, err.Error())
|
msg = fmt.Sprintf("did not succeed after %d times, failing -- %s", doc.Dequeues, err.Error())
|
||||||
state = api.MaintenanceManifestStateRetriesExceeded
|
state = api.MaintenanceManifestStateRetriesExceeded
|
||||||
|
taskLog.Error(msg)
|
||||||
} else if utilmimo.IsRetryableError(err) {
|
} else if utilmimo.IsRetryableError(err) {
|
||||||
// If an error is retryable (i.e explicitly marked as a transient error
|
// If an error is retryable (i.e explicitly marked as a transient error
|
||||||
// by wrapping it in utilmimo.TransientError), then mark it back as
|
// by wrapping it in utilmimo.TransientError), then mark it back as
|
||||||
// Pending so that it will get picked up and retried.
|
// Pending so that it will get picked up and retried.
|
||||||
state = api.MaintenanceManifestStatePending
|
state = api.MaintenanceManifestStatePending
|
||||||
|
taskLog.Error(fmt.Errorf("task returned a retryable error: %w", err))
|
||||||
} else {
|
} else {
|
||||||
// Terminal errors (explicitly marked or unwrapped) cause task failure
|
// Terminal errors (explicitly marked or unwrapped) cause task failure
|
||||||
state = api.MaintenanceManifestStateFailed
|
state = api.MaintenanceManifestStateFailed
|
||||||
|
taskLog.Error(fmt.Errorf("task returned a terminal error: %w", err))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Mark tasks that don't have an error as succeeded implicitly
|
// Mark tasks that don't have an error as succeeded implicitly
|
||||||
state = api.MaintenanceManifestStateCompleted
|
state = api.MaintenanceManifestStateCompleted
|
||||||
|
taskLog.Info("manifest executed successfully")
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = a.mmf.EndLease(ctx, doc.ClusterResourceID, doc.ID, state, &msg)
|
_, err = a.mmf.EndLease(ctx, doc.ClusterResourceID, doc.ID, state, &msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Error(err)
|
taskLog.Error(fmt.Errorf("failed ending lease on manifest: %w", err))
|
||||||
}
|
}
|
||||||
|
taskLog.Info("manifest processing complete")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove any set maintenance state
|
// Remove any set maintenance state
|
||||||
|
a.log.Info("removing maintenance state on cluster")
|
||||||
oc, err = a.oc.PatchWithLease(ctx, a.clusterResourceID, func(oscd *api.OpenShiftClusterDocument) error {
|
oc, err = a.oc.PatchWithLease(ctx, a.clusterResourceID, func(oscd *api.OpenShiftClusterDocument) error {
|
||||||
oscd.OpenShiftCluster.Properties.MaintenanceState = api.MaintenanceStateNone
|
oscd.OpenShiftCluster.Properties.MaintenanceState = api.MaintenanceStateNone
|
||||||
return nil
|
return nil
|
||||||
|
@ -228,7 +245,8 @@ func (a *actuator) Process(ctx context.Context) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// release the OpenShiftCluster
|
// release the OpenShiftCluster
|
||||||
_, err = a.oc.EndLease(ctx, a.clusterResourceID, previousProvisioningState, previousFailedProvisioningState, nil)
|
a.log.Info("ending lease on cluster")
|
||||||
|
_, err = a.oc.EndLease(ctx, a.clusterResourceID, oc.OpenShiftCluster.Properties.LastProvisioningState, oc.OpenShiftCluster.Properties.FailedProvisioningState, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("failed ending lease on cluster document: %w", err)
|
return false, fmt.Errorf("failed ending lease on cluster document: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,7 @@ import (
|
||||||
"github.com/Azure/ARO-RP/pkg/api/admin"
|
"github.com/Azure/ARO-RP/pkg/api/admin"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = Describe("[Admin API] Cluster admin update action", func() {
|
var _ = Describe("[Admin API] Cluster admin update action", Serial, func() {
|
||||||
BeforeEach(skipIfNotInDevelopmentEnv)
|
BeforeEach(skipIfNotInDevelopmentEnv)
|
||||||
|
|
||||||
It("must run cluster update operation on a cluster", func(ctx context.Context) {
|
It("must run cluster update operation on a cluster", func(ctx context.Context) {
|
||||||
|
|
|
@ -37,7 +37,7 @@ var _ = Describe("MIMO Actuator E2E Testing", func() {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
It("Should be able to schedule and run a maintenance set via the admin API", func(ctx context.Context) {
|
It("Should be able to schedule and run a maintenance set via the admin API", Serial, func(ctx context.Context) {
|
||||||
var oc = &admin.OpenShiftCluster{}
|
var oc = &admin.OpenShiftCluster{}
|
||||||
testflag := "aro.e2e.testflag." + uuid.DefaultGenerator.Generate()
|
testflag := "aro.e2e.testflag." + uuid.DefaultGenerator.Generate()
|
||||||
|
|
||||||
|
|
Загрузка…
Ссылка в новой задаче