split out openShiftClusterBackend

This commit is contained in:
Jim Minter 2019-11-27 16:34:10 -06:00
Родитель dfcab19851
Коммит a6085c3c58
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 0730CBDA10D1A2D3
2 изменённых файлов: 147 добавлений и 122 удалений

Просмотреть файл

@ -2,7 +2,6 @@ package backend
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -10,8 +9,6 @@ import (
"github.com/Azure/go-autorest/autorest" "github.com/Azure/go-autorest/autorest"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/jim-minter/rp/pkg/api"
"github.com/jim-minter/rp/pkg/backend/openshiftcluster"
"github.com/jim-minter/rp/pkg/database" "github.com/jim-minter/rp/pkg/database"
"github.com/jim-minter/rp/pkg/env" "github.com/jim-minter/rp/pkg/env"
) )
@ -31,6 +28,8 @@ type backend struct {
workers int32 workers int32
stopping atomic.Value stopping atomic.Value
ocb *openShiftClusterBackend
domain string domain string
} }
@ -61,6 +60,8 @@ func NewBackend(ctx context.Context, log *logrus.Entry, env env.Interface, db *d
b.cond = sync.NewCond(&b.mu) b.cond = sync.NewCond(&b.mu)
b.stopping.Store(false) b.stopping.Store(false)
b.ocb = &openShiftClusterBackend{backend: b}
return b, nil return b, nil
} }
@ -77,7 +78,7 @@ func (b *backend) Run(stop <-chan struct{}) {
for { for {
b.mu.Lock() b.mu.Lock()
for atomic.LoadInt32(&b.workers) == maxWorkers && !b.stopping.Load().(bool) { for atomic.LoadInt32(&b.workers) >= maxWorkers && !b.stopping.Load().(bool) {
b.cond.Wait() b.cond.Wait()
} }
b.mu.Unlock() b.mu.Unlock()
@ -86,125 +87,13 @@ func (b *backend) Run(stop <-chan struct{}) {
break break
} }
doc, err := b.db.OpenShiftClusters.Dequeue() didWork, err := b.ocb.try()
if err != nil || doc == nil { if err != nil {
if err != nil { b.baseLog.Error(err)
b.baseLog.Error(err) }
}
if !didWork {
<-t.C <-t.C
continue
}
log := b.baseLog.WithField("resource", doc.OpenShiftCluster.ID)
if doc.Dequeues > maxDequeueCount {
log.Warnf("dequeued %d times, failing", doc.Dequeues)
err = b.setTerminalState(doc, api.ProvisioningStateFailed)
if err != nil {
log.Error(err)
}
} else {
log.Print("dequeued")
go func() {
atomic.AddInt32(&b.workers, 1)
defer func() {
atomic.AddInt32(&b.workers, -1)
b.cond.Signal()
}()
t := time.Now()
err := b.handle(context.Background(), log, doc)
if err != nil {
log.Error(err)
}
log.WithField("durationMs", int(time.Now().Sub(t)/time.Millisecond)).Print("done")
}()
} }
} }
} }
func (b *backend) handle(ctx context.Context, log *logrus.Entry, doc *api.OpenShiftClusterDocument) error {
stop := b.heartbeat(log, doc)
defer stop()
m, err := openshiftcluster.NewManager(log, b.db.OpenShiftClusters, b.authorizer, doc.OpenShiftCluster, b.domain)
if err != nil {
log.Error(err)
return b.setTerminalState(doc, api.ProvisioningStateFailed)
}
switch doc.OpenShiftCluster.Properties.ProvisioningState {
case api.ProvisioningStateUpdating:
log.Print("updating")
err = m.Update(ctx)
case api.ProvisioningStateDeleting:
log.Print("deleting")
err = m.Delete(ctx)
}
stop()
if err != nil {
log.Error(err)
return b.setTerminalState(doc, api.ProvisioningStateFailed)
}
switch doc.OpenShiftCluster.Properties.ProvisioningState {
case api.ProvisioningStateUpdating:
return b.setTerminalState(doc, api.ProvisioningStateSucceeded)
case api.ProvisioningStateDeleting:
return b.db.OpenShiftClusters.Delete(doc)
default:
return fmt.Errorf("unexpected state %q", doc.OpenShiftCluster.Properties.ProvisioningState)
}
}
func (b *backend) heartbeat(log *logrus.Entry, doc *api.OpenShiftClusterDocument) func() {
var stopped bool
stop, done := make(chan struct{}), make(chan struct{})
go func() {
defer close(done)
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
_, err := b.db.OpenShiftClusters.Lease(doc.OpenShiftCluster.Key)
if err != nil {
log.Error(err)
return
}
select {
case <-t.C:
case <-stop:
return
}
}
}()
return func() {
if !stopped {
close(stop)
<-done
stopped = true
}
}
}
func (b *backend) setTerminalState(doc *api.OpenShiftClusterDocument, state api.ProvisioningState) error {
_, err := b.db.OpenShiftClusters.Patch(doc.OpenShiftCluster.Key, func(doc *api.OpenShiftClusterDocument) error {
doc.LeaseOwner = nil
doc.LeaseExpires = 0
doc.Dequeues = 0
doc.OpenShiftCluster.Properties.ProvisioningState = state
return nil
})
return err
}

Просмотреть файл

@ -0,0 +1,136 @@
package backend
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/sirupsen/logrus"
"github.com/jim-minter/rp/pkg/api"
"github.com/jim-minter/rp/pkg/backend/openshiftcluster"
)
type openShiftClusterBackend struct {
*backend
}
func (ocb *openShiftClusterBackend) try() (bool, error) {
doc, err := ocb.db.OpenShiftClusters.Dequeue()
if err != nil {
return false, err
}
if doc == nil {
return false, nil
}
log := ocb.baseLog.WithField("resource", doc.OpenShiftCluster.ID)
if doc.Dequeues > maxDequeueCount {
log.Warnf("dequeued %d times, failing", doc.Dequeues)
return true, ocb.setTerminalState(doc.OpenShiftCluster, api.ProvisioningStateFailed)
}
log.Print("dequeued")
atomic.AddInt32(&ocb.workers, 1)
go func() {
defer func() {
atomic.AddInt32(&ocb.workers, -1)
ocb.cond.Signal()
}()
t := time.Now()
err := ocb.handle(context.Background(), log, doc)
if err != nil {
log.Error(err)
}
log.WithField("durationMs", int(time.Now().Sub(t)/time.Millisecond)).Print("done")
}()
return true, nil
}
func (ocb *openShiftClusterBackend) handle(ctx context.Context, log *logrus.Entry, doc *api.OpenShiftClusterDocument) error {
stop := ocb.heartbeat(log, doc.OpenShiftCluster)
defer stop()
m, err := openshiftcluster.NewManager(log, ocb.db.OpenShiftClusters, ocb.authorizer, doc.OpenShiftCluster, ocb.domain)
if err != nil {
log.Error(err)
return ocb.setTerminalState(doc.OpenShiftCluster, api.ProvisioningStateFailed)
}
switch doc.OpenShiftCluster.Properties.ProvisioningState {
case api.ProvisioningStateUpdating:
log.Print("updating")
err = m.Update(ctx)
case api.ProvisioningStateDeleting:
log.Print("deleting")
err = m.Delete(ctx)
}
stop()
if err != nil {
log.Error(err)
return ocb.setTerminalState(doc.OpenShiftCluster, api.ProvisioningStateFailed)
}
switch doc.OpenShiftCluster.Properties.ProvisioningState {
case api.ProvisioningStateUpdating:
return ocb.setTerminalState(doc.OpenShiftCluster, api.ProvisioningStateSucceeded)
case api.ProvisioningStateDeleting:
return ocb.db.OpenShiftClusters.Delete(doc)
default:
return fmt.Errorf("unexpected state %q", doc.OpenShiftCluster.Properties.ProvisioningState)
}
}
func (ocb *openShiftClusterBackend) heartbeat(log *logrus.Entry, oc *api.OpenShiftCluster) func() {
var stopped bool
stop, done := make(chan struct{}), make(chan struct{})
go func() {
defer close(done)
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
_, err := ocb.db.OpenShiftClusters.Lease(oc.Key)
if err != nil {
log.Error(err)
return
}
select {
case <-t.C:
case <-stop:
return
}
}
}()
return func() {
if !stopped {
close(stop)
<-done
stopped = true
}
}
}
func (ocb *openShiftClusterBackend) setTerminalState(oc *api.OpenShiftCluster, state api.ProvisioningState) error {
_, err := ocb.db.OpenShiftClusters.Patch(oc.Key, func(doc *api.OpenShiftClusterDocument) error {
doc.LeaseOwner = nil
doc.LeaseExpires = 0
doc.Dequeues = 0
doc.OpenShiftCluster.Properties.ProvisioningState = state
return nil
})
return err
}