Skip to content

Commit

Permalink
try and fix e2e race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkowl committed Oct 18, 2024
1 parent 003bf9a commit 7f1f0ea
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 19 deletions.
36 changes: 34 additions & 2 deletions pkg/mimo/actuator/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var _ = Describe("MIMO Actuator", Ordered, func() {
var manifests database.MaintenanceManifests
var manifestsClient *cosmosdb.FakeMaintenanceManifestDocumentClient
var clusters database.OpenShiftClusters
//var clustersClient cosmosdb.OpenShiftClusterDocumentClient
var clustersClient *cosmosdb.FakeOpenShiftClusterDocumentClient

var a Actuator

Expand Down Expand Up @@ -75,7 +75,7 @@ var _ = Describe("MIMO Actuator", Ordered, func() {
BeforeEach(func() {
now := func() time.Time { return time.Unix(120, 0) }
manifests, manifestsClient = testdatabase.NewFakeMaintenanceManifests(now)
clusters, _ = testdatabase.NewFakeOpenShiftClusters()
clusters, clustersClient = testdatabase.NewFakeOpenShiftClusters()

a = &actuator{
log: log,
Expand Down Expand Up @@ -105,6 +105,9 @@ var _ = Describe("MIMO Actuator", Ordered, func() {
Key: strings.ToLower(clusterResourceID),
OpenShiftCluster: &api.OpenShiftCluster{
ID: clusterResourceID,
Properties: api.OpenShiftClusterProperties{
ProvisioningState: api.ProvisioningStateSucceeded,
},
},
})

Expand All @@ -130,6 +133,15 @@ var _ = Describe("MIMO Actuator", Ordered, func() {
RunAfter: 0,
},
})
checker.AddOpenShiftClusterDocuments(&api.OpenShiftClusterDocument{
Key: strings.ToLower(clusterResourceID),
OpenShiftCluster: &api.OpenShiftCluster{
ID: clusterResourceID,
Properties: api.OpenShiftClusterProperties{
ProvisioningState: api.ProvisioningStateSucceeded,
},
},
})
})

It("expires them", func() {
Expand All @@ -139,6 +151,9 @@ var _ = Describe("MIMO Actuator", Ordered, func() {

errs := checker.CheckMaintenanceManifests(manifestsClient)
Expect(errs).To(BeNil(), fmt.Sprintf("%v", errs))

errs = checker.CheckOpenShiftClusters(clustersClient)
Expect(errs).To(BeNil(), fmt.Sprintf("%v", errs))
})
})

Expand All @@ -151,6 +166,10 @@ var _ = Describe("MIMO Actuator", Ordered, func() {
Key: strings.ToLower(clusterResourceID),
OpenShiftCluster: &api.OpenShiftCluster{
ID: clusterResourceID,
Properties: api.OpenShiftClusterProperties{
ProvisioningState: api.ProvisioningStateSucceeded,
MaintenanceState: api.MaintenanceStateNone,
},
},
})

Expand Down Expand Up @@ -179,6 +198,16 @@ var _ = Describe("MIMO Actuator", Ordered, func() {
RunAfter: 0,
},
})
checker.AddOpenShiftClusterDocuments(&api.OpenShiftClusterDocument{
Key: strings.ToLower(clusterResourceID),
OpenShiftCluster: &api.OpenShiftCluster{
ID: clusterResourceID,
Properties: api.OpenShiftClusterProperties{
ProvisioningState: api.ProvisioningStateSucceeded,
MaintenanceState: api.MaintenanceStateNone,
},
},
})
})

It("runs them", func() {
Expand All @@ -198,6 +227,9 @@ var _ = Describe("MIMO Actuator", Ordered, func() {

errs := checker.CheckMaintenanceManifests(manifestsClient)
Expect(errs).To(BeNil(), fmt.Sprintf("%v", errs))

errs = checker.CheckOpenShiftClusters(clustersClient)
Expect(errs).To(BeNil(), fmt.Sprintf("%v", errs))
})
})

Expand Down
48 changes: 33 additions & 15 deletions pkg/mimo/actuator/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func (a *actuator) Process(ctx context.Context) (bool, error) {
return false, nil
}

a.log.Infof("Processing %d manifests", len(manifestsToAction))

// Dequeue the document
oc, err := a.oc.Get(ctx, a.clusterResourceID)
if err != nil {
Expand All @@ -136,12 +138,10 @@ func (a *actuator) Process(ctx context.Context) (bool, error) {
return false, fmt.Errorf("failed dequeuing cluster document: %w", err) // This will include StatusPreconditionFaileds
}

// Save these so we can reset them after
previousProvisioningState := oc.OpenShiftCluster.Properties.ProvisioningState
previousFailedProvisioningState := oc.OpenShiftCluster.Properties.FailedProvisioningState

// Mark the maintenance state as unplanned and put it in AdminUpdating
a.log.Infof("Marking cluster as in AdminUpdating")
oc, err = a.oc.PatchWithLease(ctx, a.clusterResourceID, func(oscd *api.OpenShiftClusterDocument) error {
oscd.OpenShiftCluster.Properties.LastProvisioningState = oscd.OpenShiftCluster.Properties.ProvisioningState
oscd.OpenShiftCluster.Properties.ProvisioningState = api.ProvisioningStateAdminUpdating
oscd.OpenShiftCluster.Properties.MaintenanceState = api.MaintenanceStateUnplanned
return nil
Expand All @@ -151,7 +151,7 @@ func (a *actuator) Process(ctx context.Context) (bool, error) {
a.log.Error(err)

// attempt to dequeue the document, for what it's worth
_, leaseErr := a.oc.EndLease(ctx, a.clusterResourceID, previousProvisioningState, previousFailedProvisioningState, nil)
_, leaseErr := a.oc.EndLease(ctx, a.clusterResourceID, oc.OpenShiftCluster.Properties.LastProvisioningState, oc.OpenShiftCluster.Properties.FailedProvisioningState, nil)
if leaseErr != nil {
return false, fmt.Errorf("failed ending lease early on cluster document: %w", leaseErr)
}
Expand All @@ -162,12 +162,11 @@ func (a *actuator) Process(ctx context.Context) (bool, error) {

// Execute on the manifests we want to action
for _, doc := range manifestsToAction {
// here
f, ok := a.tasks[doc.MaintenanceManifest.MaintenanceTaskID]
if !ok {
a.log.Infof("not found %v", doc.MaintenanceManifest.MaintenanceTaskID)
continue
}
taskLog := a.log.WithFields(logrus.Fields{
"manifestID": doc.ID,
"taskID": doc.MaintenanceManifest.MaintenanceTaskID,
})
taskLog.Info("begin processing manifest")

// Attempt a dequeue
doc, err = a.mmf.Lease(ctx, a.clusterResourceID, doc.ID)
Expand All @@ -177,9 +176,23 @@ func (a *actuator) Process(ctx context.Context) (bool, error) {
continue
}

// error if we don't know what this task is, then continue
f, ok := a.tasks[doc.MaintenanceManifest.MaintenanceTaskID]
if !ok {
a.log.Errorf("not found %v", doc.MaintenanceManifest.MaintenanceTaskID)
msg := "task ID not registered"
_, err = a.mmf.EndLease(ctx, doc.ClusterResourceID, doc.ID, api.MaintenanceManifestStateFailed, &msg)
if err != nil {
a.log.Error(fmt.Errorf("failed ending lease early on manifest: %w", err))
}
continue
}

var state api.MaintenanceManifestState
var msg string

taskLog.Info("executing manifest")

// Perform the task with a timeout
err = taskContext.RunInTimeout(time.Minute*60, func() error {
innerErr := f(taskContext, doc, oc)
Expand All @@ -193,32 +206,36 @@ func (a *actuator) Process(ctx context.Context) (bool, error) {
msg = taskContext.GetResultMessage()

if err != nil {
a.log.Error(err)

if doc.Dequeues >= maxDequeueCount {
msg = fmt.Sprintf("did not succeed after %d times, failing -- %s", doc.Dequeues, err.Error())
state = api.MaintenanceManifestStateRetriesExceeded
taskLog.Error(msg)
} else if utilmimo.IsRetryableError(err) {
// If an error is retryable (i.e explicitly marked as a transient error
// by wrapping it in utilmimo.TransientError), then mark it back as
// Pending so that it will get picked up and retried.
state = api.MaintenanceManifestStatePending
taskLog.Error(fmt.Errorf("task returned a retryable error: %w", err))
} else {
// Terminal errors (explicitly marked or unwrapped) cause task failure
state = api.MaintenanceManifestStateFailed
taskLog.Error(fmt.Errorf("task returned a terminal error: %w", err))
}
} else {
// Mark tasks that don't have an error as succeeded implicitly
state = api.MaintenanceManifestStateCompleted
taskLog.Info("manifest executed successfully")
}

_, err = a.mmf.EndLease(ctx, doc.ClusterResourceID, doc.ID, state, &msg)
if err != nil {
a.log.Error(err)
taskLog.Error(fmt.Errorf("failed ending lease on manifest: %w", err))
}
taskLog.Info("manifest processing complete")
}

// Remove any set maintenance state
a.log.Info("removing maintenance state on cluster")
oc, err = a.oc.PatchWithLease(ctx, a.clusterResourceID, func(oscd *api.OpenShiftClusterDocument) error {
oscd.OpenShiftCluster.Properties.MaintenanceState = api.MaintenanceStateNone
return nil
Expand All @@ -228,7 +245,8 @@ func (a *actuator) Process(ctx context.Context) (bool, error) {
}

// release the OpenShiftCluster
_, err = a.oc.EndLease(ctx, a.clusterResourceID, previousProvisioningState, previousFailedProvisioningState, nil)
a.log.Info("ending lease on cluster")
_, err = a.oc.EndLease(ctx, a.clusterResourceID, oc.OpenShiftCluster.Properties.LastProvisioningState, oc.OpenShiftCluster.Properties.FailedProvisioningState, nil)
if err != nil {
return false, fmt.Errorf("failed ending lease on cluster document: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/adminapi_cluster_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/Azure/ARO-RP/pkg/api/admin"
)

var _ = Describe("[Admin API] Cluster admin update action", func() {
var _ = Describe("[Admin API] Cluster admin update action", Serial, func() {
BeforeEach(skipIfNotInDevelopmentEnv)

It("must run cluster update operation on a cluster", func(ctx context.Context) {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/mimo_actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var _ = Describe("MIMO Actuator E2E Testing", func() {
})
})

It("Should be able to schedule and run a maintenance set via the admin API", func(ctx context.Context) {
It("Should be able to schedule and run a maintenance set via the admin API", Serial, func(ctx context.Context) {
var oc = &admin.OpenShiftCluster{}
testflag := "aro.e2e.testflag." + uuid.DefaultGenerator.Generate()

Expand Down

0 comments on commit 7f1f0ea

Please sign in to comment.