diff --git a/pkg/database/mimo.go b/pkg/database/mimo.go index 970546d6afd..33d66841257 100644 --- a/pkg/database/mimo.go +++ b/pkg/database/mimo.go @@ -33,9 +33,8 @@ type MaintenanceManifests interface { GetByClusterID(context.Context, string, string) (cosmosdb.MaintenanceManifestDocumentIterator, error) Patch(context.Context, string, string, MaintenanceManifestDocumentMutator) (*api.MaintenanceManifestDocument, error) PatchWithLease(context.Context, string, string, MaintenanceManifestDocumentMutator) (*api.MaintenanceManifestDocument, error) - Lease(context.Context, string, string) (*api.MaintenanceManifestDocument, error) + Lease(ctx context.Context, clusterID string, id string) (*api.MaintenanceManifestDocument, error) EndLease(context.Context, string, string, api.MaintenanceManifestState, *string) (*api.MaintenanceManifestDocument, error) - Dequeue(ctx context.Context, clusterID string, id string) (*api.MaintenanceManifestDocument, error) Get(context.Context, string, string) (*api.MaintenanceManifestDocument, error) NewUUID() string @@ -138,24 +137,7 @@ func (c *maintenanceManifests) QueueLength(ctx context.Context, collid string) ( } func (c *maintenanceManifests) Patch(ctx context.Context, clusterID string, id string, f MaintenanceManifestDocumentMutator) (*api.MaintenanceManifestDocument, error) { - var doc *api.MaintenanceManifestDocument - - err := cosmosdb.RetryOnPreconditionFailed(func() (err error) { - doc, err = c.Get(ctx, clusterID, id) - if err != nil { - return - } - - err = f(doc) - if err != nil { - return - } - - doc, err = c.c.Replace(ctx, doc.ClusterID, doc, nil) - return - }) - - return doc, err + return c.patch(ctx, clusterID, id, f, nil) } func (c *maintenanceManifests) patch(ctx context.Context, clusterID string, id string, f MaintenanceManifestDocumentMutator, options *cosmosdb.Options) (*api.MaintenanceManifestDocument, error) { @@ -179,8 +161,10 @@ func (c *maintenanceManifests) patch(ctx context.Context, clusterID string, id s return doc, err } +// PatchWithLease performs a patch on the cluster that verifies the lease is +// being held by this client before applying. func (c *maintenanceManifests) PatchWithLease(ctx context.Context, clusterID string, id string, f MaintenanceManifestDocumentMutator) (*api.MaintenanceManifestDocument, error) { - return c.patchWithLease(ctx, clusterID, id, f, nil) + return c.patchWithLease(ctx, clusterID, id, f, &cosmosdb.Options{PreTriggers: []string{"renewLease"}}) } func (c *maintenanceManifests) patchWithLease(ctx context.Context, clusterID string, id string, f MaintenanceManifestDocumentMutator, options *cosmosdb.Options) (*api.MaintenanceManifestDocument, error) { @@ -221,12 +205,6 @@ func (c *maintenanceManifests) GetByClusterID(ctx context.Context, clusterID str }, &cosmosdb.Options{Continuation: continuation}), nil } -func (c *maintenanceManifests) Lease(ctx context.Context, clusterID string, id string) (*api.MaintenanceManifestDocument, error) { - return c.patchWithLease(ctx, clusterID, id, func(doc *api.MaintenanceManifestDocument) error { - return nil - }, &cosmosdb.Options{PreTriggers: []string{"renewLease"}}) -} - func (c *maintenanceManifests) EndLease(ctx context.Context, clusterID string, id string, provisioningState api.MaintenanceManifestState, statusString *string) (*api.MaintenanceManifestDocument, error) { return c.patchWithLease(ctx, clusterID, id, func(doc *api.MaintenanceManifestDocument) error { doc.MaintenanceManifest.State = provisioningState @@ -244,7 +222,8 @@ func (c *maintenanceManifests) EndLease(ctx context.Context, clusterID string, i }, nil) } -func (c *maintenanceManifests) Dequeue(ctx context.Context, clusterID string, id string) (*api.MaintenanceManifestDocument, error) { +// Lease performs the initial lease/dequeue on the document. +func (c *maintenanceManifests) Lease(ctx context.Context, clusterID string, id string) (*api.MaintenanceManifestDocument, error) { if clusterID != strings.ToLower(clusterID) { return nil, fmt.Errorf("clusterID %q is not lower case", clusterID) } diff --git a/pkg/mimo/actuator/manager.go b/pkg/mimo/actuator/manager.go index e4e72c9668b..f0e65baea79 100644 --- a/pkg/mimo/actuator/manager.go +++ b/pkg/mimo/actuator/manager.go @@ -151,7 +151,7 @@ func (a *actuator) Process(ctx context.Context) (bool, error) { } // Attempt a dequeue - doc, err = a.mmf.Dequeue(ctx, a.clusterID, doc.ID) + doc, err = a.mmf.Lease(ctx, a.clusterID, doc.ID) if err != nil { // log and continue if it doesn't work a.log.Error(err)