avoid race condition by putting EndLease in database layer

This commit is contained in:
Jim Minter 2019-11-28 08:24:01 -06:00
Родитель 8c6cc6016f
Коммит 5da5f9bdc1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 0730CBDA10D1A2D3
2 изменённых файлов: 28 добавлений и 16 удалений

Просмотреть файл

@ -28,7 +28,7 @@ func (ocb *openShiftClusterBackend) try() (bool, error) {
log := ocb.baseLog.WithField("resource", doc.OpenShiftCluster.ID)
if doc.Dequeues > maxDequeueCount {
log.Warnf("dequeued %d times, failing", doc.Dequeues)
return true, ocb.setTerminalState(doc.OpenShiftCluster, api.ProvisioningStateFailed)
return true, ocb.endLease(doc.OpenShiftCluster, api.ProvisioningStateFailed)
}
log.Print("dequeued")
@ -59,7 +59,9 @@ func (ocb *openShiftClusterBackend) handle(ctx context.Context, log *logrus.Entr
m, err := openshiftcluster.NewManager(log, ocb.db.OpenShiftClusters, ocb.authorizer, doc.OpenShiftCluster, ocb.domain)
if err != nil {
log.Error(err)
return ocb.setTerminalState(doc.OpenShiftCluster, api.ProvisioningStateFailed)
stop()
return ocb.endLease(doc.OpenShiftCluster, api.ProvisioningStateFailed)
}
switch doc.OpenShiftCluster.Properties.ProvisioningState {
@ -75,12 +77,12 @@ func (ocb *openShiftClusterBackend) handle(ctx context.Context, log *logrus.Entr
if err != nil {
log.Error(err)
return ocb.setTerminalState(doc.OpenShiftCluster, api.ProvisioningStateFailed)
return ocb.endLease(doc.OpenShiftCluster, api.ProvisioningStateFailed)
}
switch doc.OpenShiftCluster.Properties.ProvisioningState {
case api.ProvisioningStateUpdating:
return ocb.setTerminalState(doc.OpenShiftCluster, api.ProvisioningStateSucceeded)
return ocb.endLease(doc.OpenShiftCluster, api.ProvisioningStateSucceeded)
case api.ProvisioningStateDeleting:
return ocb.db.OpenShiftClusters.Delete(doc)
@ -124,25 +126,17 @@ func (ocb *openShiftClusterBackend) heartbeat(log *logrus.Entry, oc *api.OpenShi
}
}
func (ocb *openShiftClusterBackend) setTerminalState(oc *api.OpenShiftCluster, state api.ProvisioningState) error {
func (ocb *openShiftClusterBackend) endLease(oc *api.OpenShiftCluster, provisioningState api.ProvisioningState) error {
var failedOperation api.FailedOperation
switch {
case state == api.ProvisioningStateFailed && oc.Properties.Installation != nil:
case provisioningState == api.ProvisioningStateFailed && oc.Properties.Installation != nil:
failedOperation = api.FailedOperationInstall
case state == api.ProvisioningStateFailed && oc.Properties.Installation == nil:
case provisioningState == api.ProvisioningStateFailed && oc.Properties.Installation == nil:
failedOperation = api.FailedOperationUpdate
default:
failedOperation = api.FailedOperationNone
}
_, err := ocb.db.OpenShiftClusters.Patch(oc.Key, func(doc *api.OpenShiftClusterDocument) error {
doc.OpenShiftCluster.Properties.ProvisioningState = state
doc.OpenShiftCluster.Properties.FailedOperation = failedOperation
doc.LeaseOwner = nil
doc.LeaseExpires = 0
doc.Dequeues = 0
return nil
})
_, err := ocb.db.OpenShiftClusters.EndLease(oc.Key, provisioningState, failedOperation)
return err
}

Просмотреть файл

@ -28,6 +28,7 @@ type OpenShiftClusters interface {
ListByPrefix(string, api.Key) (cosmosdb.OpenShiftClusterDocumentIterator, error)
Dequeue() (*api.OpenShiftClusterDocument, error)
Lease(api.Key) (*api.OpenShiftClusterDocument, error)
EndLease(api.Key, api.ProvisioningState, api.FailedOperation) (*api.OpenShiftClusterDocument, error)
}
// NewOpenShiftClusters returns a new OpenShiftClusters
@ -217,6 +218,23 @@ func (c *openShiftClusters) Lease(key api.Key) (*api.OpenShiftClusterDocument, e
}, &cosmosdb.Options{PreTriggers: []string{"renewLease"}})
}
func (c *openShiftClusters) EndLease(key api.Key, provisioningState api.ProvisioningState, failedOperation api.FailedOperation) (*api.OpenShiftClusterDocument, error) {
return c.patch(key, func(doc *api.OpenShiftClusterDocument) error {
if doc.LeaseOwner == nil || !uuid.Equal(*doc.LeaseOwner, c.uuid) {
return fmt.Errorf("lost lease")
}
doc.OpenShiftCluster.Properties.ProvisioningState = provisioningState
doc.OpenShiftCluster.Properties.FailedOperation = failedOperation
doc.LeaseOwner = nil
doc.LeaseExpires = 0
doc.Dequeues = 0
return nil
}, nil)
}
func (c *openShiftClusters) partitionKey(key api.Key) (string, error) {
r, err := azure.ParseResourceID(string(key))
return r.SubscriptionID, err