update mimo DB to have a fetch which isn't just pending

This commit is contained in:
Amber Brown 2024-07-18 13:17:47 +10:00
Родитель a0285862b3
Коммит 958b339427
3 изменённых файлов: 51 добавлений и 9 удалений

Просмотреть файл

@ -15,8 +15,9 @@ import (
)
const (
MaintenanceManifestQueryForCluster = `SELECT * FROM MaintenanceManifests doc WHERE doc.maintenanceManifest.state IN ("Pending") AND doc.clusterResourceID = @clusterID`
MaintenanceManifestQueueLengthQuery = `SELECT VALUE COUNT(1) FROM MaintenanceManifests doc WHERE doc.maintenanceManifest.state IN ("Pending") AND (doc.leaseExpires ?? 0) < GetCurrentTimestamp() / 1000`
MaintenanceManifestDequeueQueryForCluster = `SELECT * FROM MaintenanceManifests doc WHERE doc.maintenanceManifest.state IN ("Pending") AND doc.clusterResourceID = @clusterID`
MaintenanceManifestQueryForCluster = `SELECT * FROM MaintenanceManifests doc WHERE doc.clusterResourceID = @clusterID`
MaintenanceManifestQueueLengthQuery = `SELECT VALUE COUNT(1) FROM MaintenanceManifests doc WHERE doc.maintenanceManifest.state IN ("Pending") AND (doc.leaseExpires ?? 0) < GetCurrentTimestamp() / 1000`
)
type MaintenanceManifestDocumentMutator func(*api.MaintenanceManifestDocument) error
@ -30,7 +31,8 @@ type maintenanceManifests struct {
type MaintenanceManifests interface {
Create(context.Context, *api.MaintenanceManifestDocument) (*api.MaintenanceManifestDocument, error)
GetByClusterResourceID(context.Context, string, string) (cosmosdb.MaintenanceManifestDocumentIterator, error)
GetByClusterResourceID(ctx context.Context, clusterResourceID string, continuation string) (cosmosdb.MaintenanceManifestDocumentIterator, error)
GetQueuedByClusterResourceID(ctx context.Context, clusterResourceID string, continuation string) (cosmosdb.MaintenanceManifestDocumentIterator, error)
Patch(context.Context, string, string, MaintenanceManifestDocumentMutator) (*api.MaintenanceManifestDocument, error)
PatchWithLease(context.Context, string, string, MaintenanceManifestDocumentMutator) (*api.MaintenanceManifestDocument, error)
Lease(ctx context.Context, clusterResourceID string, id string) (*api.MaintenanceManifestDocument, error)
@ -182,6 +184,22 @@ func (c *maintenanceManifests) GetByClusterResourceID(ctx context.Context, clust
}, &cosmosdb.Options{Continuation: continuation}), nil
}
func (c *maintenanceManifests) GetQueuedByClusterResourceID(ctx context.Context, clusterResourceID string, continuation string) (cosmosdb.MaintenanceManifestDocumentIterator, error) {
if clusterResourceID != strings.ToLower(clusterResourceID) {
return nil, fmt.Errorf("clusterResourceID %q is not lower case", clusterResourceID)
}
return c.c.Query("", &cosmosdb.Query{
Query: MaintenanceManifestDequeueQueryForCluster,
Parameters: []cosmosdb.Parameter{
{
Name: "@clusterResourceID",
Value: clusterResourceID,
},
},
}, &cosmosdb.Options{Continuation: continuation}), nil
}
func (c *maintenanceManifests) EndLease(ctx context.Context, clusterResourceID string, id string, provisioningState api.MaintenanceManifestState, statusString *string) (*api.MaintenanceManifestDocument, error) {
return c.patchWithLease(ctx, clusterResourceID, id, func(doc *api.MaintenanceManifestDocument) error {
doc.MaintenanceManifest.State = provisioningState

Просмотреть файл

@ -73,7 +73,7 @@ func (a *actuator) AddTasks(tasks map[string]tasks.TaskFunc) {
func (a *actuator) Process(ctx context.Context) (bool, error) {
// Get the manifests for this cluster which need to be worked
i, err := a.mmf.GetByClusterResourceID(ctx, a.clusterResourceID, "")
i, err := a.mmf.GetQueuedByClusterResourceID(ctx, a.clusterResourceID, "")
if err != nil {
return false, fmt.Errorf("failed getting manifests: %w", err)
}

Просмотреть файл

@ -5,7 +5,6 @@ package database
import (
"context"
"fmt"
"strconv"
"time"
@ -18,12 +17,40 @@ func injectMaintenanceManifests(c *cosmosdb.FakeMaintenanceManifestDocumentClien
c.SetQueryHandler(database.MaintenanceManifestQueryForCluster, func(client cosmosdb.MaintenanceManifestDocumentClient, query *cosmosdb.Query, options *cosmosdb.Options) cosmosdb.MaintenanceManifestDocumentRawIterator {
return fakeMaintenanceManifestsForCluster(client, query, options, now)
})
c.SetQueryHandler(database.MaintenanceManifestDequeueQueryForCluster, func(client cosmosdb.MaintenanceManifestDocumentClient, query *cosmosdb.Query, options *cosmosdb.Options) cosmosdb.MaintenanceManifestDocumentRawIterator {
return fakeMaintenanceManifestsDequeueForCluster(client, query, options, now)
})
c.SetTriggerHandler("renewLease", func(ctx context.Context, doc *api.MaintenanceManifestDocument) error {
return fakeMaintenanceManifestsRenewLeaseTrigger(ctx, doc, now)
})
}
func fakeMaintenanceManifestsDequeueForCluster(client cosmosdb.MaintenanceManifestDocumentClient, query *cosmosdb.Query, options *cosmosdb.Options, now func() time.Time) cosmosdb.MaintenanceManifestDocumentRawIterator {
startingIndex, err := fakeMaintenanceManifestsGetContinuation(options)
if err != nil {
return cosmosdb.NewFakeMaintenanceManifestDocumentErroringRawIterator(err)
}
input, err := client.ListAll(context.Background(), nil)
if err != nil {
// TODO: should this never happen?
panic(err)
}
clusterResourceID := query.Parameters[0].Value
var results []*api.MaintenanceManifestDocument
for _, r := range input.MaintenanceManifestDocuments {
if r.ClusterResourceID == clusterResourceID &&
r.MaintenanceManifest.State == api.MaintenanceManifestStatePending {
results = append(results, r)
}
}
return cosmosdb.NewFakeMaintenanceManifestDocumentIterator(results, startingIndex)
}
func fakeMaintenanceManifestsForCluster(client cosmosdb.MaintenanceManifestDocumentClient, query *cosmosdb.Query, options *cosmosdb.Options, now func() time.Time) cosmosdb.MaintenanceManifestDocumentRawIterator {
startingIndex, err := fakeMaintenanceManifestsGetContinuation(options)
if err != nil {
@ -38,12 +65,9 @@ func fakeMaintenanceManifestsForCluster(client cosmosdb.MaintenanceManifestDocum
clusterResourceID := query.Parameters[0].Value
fmt.Print(clusterResourceID, startingIndex)
var results []*api.MaintenanceManifestDocument
for _, r := range input.MaintenanceManifestDocuments {
if r.ClusterResourceID == clusterResourceID &&
r.MaintenanceManifest.State == api.MaintenanceManifestStatePending {
if r.ClusterResourceID == clusterResourceID {
results = append(results, r)
}
}