Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rdspec and protectedpvcs condition #1605

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions internal/controller/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,8 +935,7 @@ func (d *DRPCInstance) ensureCleanupAndVolSyncReplicationSetup(srcCluster string
// in the MW, but the VRGs in the vrgs slice are fetched using MCV.
vrg, ok := d.vrgs[srcCluster]
if !ok || len(vrg.Spec.VolSync.RDSpec) != 0 {
return fmt.Errorf(fmt.Sprintf("Waiting for RDSpec count on cluster %s to go to zero. VRG OK? %v",
srcCluster, ok))
return fmt.Errorf(fmt.Sprintf("Waiting for RDSpec count on cluster %s to go to zero. VRG OK? %v", srcCluster, ok))
}

err = d.EnsureCleanup(srcCluster)
Expand Down Expand Up @@ -1406,6 +1405,8 @@ func (d *DRPCInstance) moveVRGToSecondaryEverywhere() bool {
}

func (d *DRPCInstance) cleanupSecondaries(skipCluster string) (bool, error) {
d.log.Info("Cleaning up secondaries.")

for _, clusterName := range rmnutil.DRPolicyClusterNames(d.drPolicy) {
if skipCluster == clusterName {
continue
Expand Down Expand Up @@ -1531,7 +1532,7 @@ func (d *DRPCInstance) createVRGManifestWork(homeCluster string, repState rmn.Re
annotations[DRPCNameAnnotation] = d.instance.Name
annotations[DRPCNamespaceAnnotation] = d.instance.Namespace

if err := d.mwu.CreateOrUpdateVRGManifestWork(
if _, err := d.mwu.CreateOrUpdateVRGManifestWork(
d.instance.Name, d.vrgNamespace,
homeCluster, vrg, annotations); err != nil {
d.log.Error(err, "failed to create or update VolumeReplicationGroup manifest")
Expand Down Expand Up @@ -1760,7 +1761,7 @@ func (d *DRPCInstance) EnsureCleanup(clusterToSkip string) error {
}

if !clean {
msg := "cleaning secondaries"
msg := "cleaning up secondaries"
addOrUpdateCondition(&d.instance.Status.Conditions, rmn.ConditionPeerReady, d.instance.Generation,
metav1.ConditionFalse, rmn.ReasonCleaning, msg)

Expand All @@ -1775,8 +1776,7 @@ func (d *DRPCInstance) EnsureCleanup(clusterToSkip string) error {

//nolint:gocognit
func (d *DRPCInstance) cleanupForVolSync(clusterToSkip string) error {
d.log.Info("VolSync needs both VRGs. No need to clean up secondary")
d.log.Info("Ensure secondary on peer")
d.log.Info("VolSync needs both VRGs. Ensure secondary setup on peer")

peersReady := true

Expand All @@ -1793,7 +1793,7 @@ func (d *DRPCInstance) cleanupForVolSync(clusterToSkip string) error {

// Recreate the VRG ManifestWork for the secondary. This typically happens during Hub Recovery.
if errors.IsNotFound(err) {
err := d.createVolSyncDestManifestWork(clusterToSkip)
err := d.ensureVolSyncSetup(clusterToSkip)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/drplacementcontrol_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2244,7 +2244,7 @@ func adoptExistingVRGManifestWork(
annotations[DRPCNameAnnotation] = drpc.Name
annotations[DRPCNamespaceAnnotation] = drpc.Namespace

err := mwu.CreateOrUpdateVRGManifestWork(drpc.Name, vrgNamespace, cluster, *vrg, annotations)
_, err := mwu.CreateOrUpdateVRGManifestWork(drpc.Name, vrgNamespace, cluster, *vrg, annotations)
if err != nil {
log.Info("error updating VRG via ManifestWork during adoption", "error", err, "cluster", cluster)
}
Expand Down Expand Up @@ -2281,7 +2281,7 @@ func adoptOrphanVRG(

vrg.Annotations[DRPCUIDAnnotation] = string(drpc.UID)

if err := mwu.CreateOrUpdateVRGManifestWork(
if _, err := mwu.CreateOrUpdateVRGManifestWork(
drpc.Name, vrgNamespace,
cluster, *vrg, annotations); err != nil {
log.Info("error creating VRG via ManifestWork during adoption", "error", err, "cluster", cluster)
Expand Down
210 changes: 185 additions & 25 deletions internal/controller/drplacementcontrol_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@ const (
SyncDRPolicyName = "my-sync-dr-peers"
MModeReplicationID = "storage-replication-id-1"
MModeCSIProvisioner = "test.csi.com"

pvcCount = 2 // Count of fake PVCs reported in the VRG status
)

var (
NumberOfVrgsToReturnWhenRebuildingState = 0

UseApplicationSet = false
ProtectedPVCCount = 2 // Count of fake PVCs reported in the VRG status
RunningVolSyncTests = false
UseApplicationSet = false

west1Cluster = &spokeClusterV1.ManagedCluster{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -425,7 +423,7 @@ func (f FakeMCVGetter) GetVRGFromManagedCluster(resourceName, resourceNamespace,
return nil, fmt.Errorf("%s: Faking cluster down %s", getFunctionNameAtIndex(2), managedCluster)
}

vrg, err := getVRGFromManifestWork(managedCluster, resourceNamespace)
vrg, err := GetFakeVRGFromMCVUsingMW(managedCluster, resourceNamespace)
if err != nil {
if errors.IsNotFound(err) {
return fakeVRGConditionally(resourceNamespace, managedCluster, err)
Expand Down Expand Up @@ -506,7 +504,8 @@ func getVRGNamespace(defaultNamespace string) string {
}

//nolint:funlen
func getVRGFromManifestWork(managedCluster, resourceNamespace string) (*rmn.VolumeReplicationGroup, error) {
func GetFakeVRGFromMCVUsingMW(managedCluster, resourceNamespace string,
) (*rmn.VolumeReplicationGroup, error) {
manifestLookupKey := types.NamespacedName{
Name: rmnutil.ManifestWorkName(DRPCCommonName, getVRGNamespace(resourceNamespace), "vrg"),
Namespace: managedCluster,
Expand Down Expand Up @@ -540,14 +539,10 @@ func getVRGFromManifestWork(managedCluster, resourceNamespace string) (*rmn.Volu
vrg.Status.FinalSyncComplete = true
vrg.Status.ProtectedPVCs = []rmn.ProtectedPVC{}

for i := 0; i < pvcCount; i++ {
protectedPVC := rmn.ProtectedPVC{}
protectedPVC.Name = fmt.Sprintf("fakePVC%d", i)
protectedPVC.StorageIdentifiers.ReplicationID.ID = MModeReplicationID
protectedPVC.StorageIdentifiers.StorageProvisioner = MModeCSIProvisioner
protectedPVC.StorageIdentifiers.ReplicationID.Modes = []rmn.MMode{rmn.MModeFailover}

vrg.Status.ProtectedPVCs = append(vrg.Status.ProtectedPVCs, protectedPVC)
if RunningVolSyncTests {
createFakeProtectedPVCsForVolSync(vrg)
} else {
createFakeProtectedPVCsForVolRep(vrg)
}

// Always report conditions as a success?
Expand Down Expand Up @@ -598,6 +593,28 @@ func getVRGFromManifestWork(managedCluster, resourceNamespace string) (*rmn.Volu
return vrg, nil
}

func createFakeProtectedPVCsForVolRep(vrg *rmn.VolumeReplicationGroup) {
for i := 0; i < ProtectedPVCCount; i++ {
protectedPVC := rmn.ProtectedPVC{}
protectedPVC.Name = fmt.Sprintf("fakePVC%d", i)
protectedPVC.StorageIdentifiers.ReplicationID.ID = MModeReplicationID
protectedPVC.StorageIdentifiers.StorageProvisioner = MModeCSIProvisioner
protectedPVC.StorageIdentifiers.ReplicationID.Modes = []rmn.MMode{rmn.MModeFailover}

vrg.Status.ProtectedPVCs = append(vrg.Status.ProtectedPVCs, protectedPVC)
}
}

func createFakeProtectedPVCsForVolSync(vrg *rmn.VolumeReplicationGroup) {
for i := 0; i < ProtectedPVCCount; i++ {
protectedPVC := rmn.ProtectedPVC{}
protectedPVC.Name = fmt.Sprintf("fakePVC-%d-for-volsync", i)
protectedPVC.ProtectedByVolSync = true

vrg.Status.ProtectedPVCs = append(vrg.Status.ProtectedPVCs, protectedPVC)
}
}

func fakeVRGWithMModesProtectedPVC(vrgNamespace string) *rmn.VolumeReplicationGroup {
vrg := getDefaultVRG(vrgNamespace).DeepCopy()
vrg.Status = rmn.VolumeReplicationGroupStatus{
Expand Down Expand Up @@ -1004,7 +1021,8 @@ func createVRGMW(name, namespace, homeCluster string) {
TargetNamespace: namespace,
}

Expect(mwu.CreateOrUpdateVRGManifestWork(name, namespace, homeCluster, *vrg, nil)).To(Succeed())
_, err := mwu.CreateOrUpdateVRGManifestWork(name, namespace, homeCluster, *vrg, nil)
Expect(err).To(Succeed())
}

func updateManifestWorkStatus(clusterNamespace, vrgNamespace, mwType, workType string) {
Expand Down Expand Up @@ -1383,7 +1401,7 @@ func verifyDRPCStatusPreferredClusterExpectation(namespace string, drState rmn.D
return d.ClusterName == East1ManagedCluster &&
idx != -1 &&
condition.Reason == string(drState) &&
len(updatedDRPC.Status.ResourceConditions.ResourceMeta.ProtectedPVCs) == pvcCount
len(updatedDRPC.Status.ResourceConditions.ResourceMeta.ProtectedPVCs) == ProtectedPVCCount
}

return false
Expand Down Expand Up @@ -1494,7 +1512,7 @@ func runFailoverAction(placementObj client.Object, fromCluster, toCluster string
fenceCluster(fromCluster, manualFence)
}

recoverToFailoverCluster(placementObj, fromCluster, toCluster)
recoverToFailoverCluster(placementObj, fromCluster, toCluster, false)
// TODO: DRCluster as part of Unfence operation, first unfences
// the NetworkFence CR and then deletes it. Hence, by the
// time this test is made, depending upon whether NetworkFence
Expand Down Expand Up @@ -1547,7 +1565,7 @@ func runRelocateAction(placementObj client.Object, fromCluster string, isSyncDR
resetdrCluster(toCluster1)
}

relocateToPreferredCluster(placementObj, fromCluster)
relocateToPreferredCluster(placementObj, fromCluster, false)
// TODO: DRCluster as part of Unfence operation, first unfences
// the NetworkFence CR and then deletes it. Hence, by the
// time this test is made, depending upon whether NetworkFence
Expand Down Expand Up @@ -1597,7 +1615,7 @@ func clearDRActionAfterRelocate(userPlacementRule *plrv1.PlacementRule, preferre
Expect(decision.ClusterName).To(Equal(preferredCluster))
}

func relocateToPreferredCluster(placementObj client.Object, fromCluster string) {
func relocateToPreferredCluster(placementObj client.Object, fromCluster string, skipWaitForWMDeletion bool) {
toCluster1 := "east1-cluster"

setDRPCSpecExpectationTo(placementObj.GetNamespace(), toCluster1, fromCluster, rmn.ActionRelocate)
Expand All @@ -1608,12 +1626,14 @@ func relocateToPreferredCluster(placementObj client.Object, fromCluster string)
verifyDRPCStatusPreferredClusterExpectation(placementObj.GetNamespace(), rmn.Relocated)
verifyVRGManifestWorkCreatedAsPrimary(placementObj.GetNamespace(), toCluster1)

waitForVRGMWDeletion(West1ManagedCluster, placementObj.GetNamespace())
if !skipWaitForWMDeletion {
waitForVRGMWDeletion(West1ManagedCluster, placementObj.GetNamespace())
}

waitForCompletion(string(rmn.Relocated))
}

func recoverToFailoverCluster(placementObj client.Object, fromCluster, toCluster string) {
func recoverToFailoverCluster(placementObj client.Object, fromCluster, toCluster string, skipWaitForWMDeletion bool) {
setDRPCSpecExpectationTo(placementObj.GetNamespace(), fromCluster, toCluster, rmn.ActionFailover)

updateManifestWorkStatus(toCluster, placementObj.GetNamespace(), "vrg", ocmworkv1.WorkApplied)
Expand All @@ -1622,7 +1642,9 @@ func recoverToFailoverCluster(placementObj client.Object, fromCluster, toCluster
verifyDRPCStatusPreferredClusterExpectation(placementObj.GetNamespace(), rmn.FailedOver)
verifyVRGManifestWorkCreatedAsPrimary(placementObj.GetNamespace(), toCluster)

waitForVRGMWDeletion(fromCluster, placementObj.GetNamespace())
if !skipWaitForWMDeletion {
waitForVRGMWDeletion(fromCluster, placementObj.GetNamespace())
}

waitForCompletion(string(rmn.FailedOver))
}
Expand Down Expand Up @@ -1752,7 +1774,7 @@ func verifyInitialDRPCDeployment(userPlacement client.Object, preferredCluster s
func verifyFailoverToSecondary(placementObj client.Object, toCluster string,
isSyncDR bool,
) {
recoverToFailoverCluster(placementObj, East1ManagedCluster, toCluster)
recoverToFailoverCluster(placementObj, East1ManagedCluster, toCluster, false)

// TODO: DRCluster as part of Unfence operation, first unfences
// the NetworkFence CR and then deletes it. Hence, by the
Expand Down Expand Up @@ -1786,7 +1808,7 @@ func verifyActionResultForPlacement(placement *clrapiv1beta1.Placement, homeClus
Expect(placementDecision).ShouldNot(BeNil())
Expect(placementDecision.GetLabels()[rmnutil.ExcludeFromVeleroBackup]).Should(Equal("true"))
Expect(placementDecision.Status.Decisions[0].ClusterName).Should(Equal(homeCluster))
vrg, err := getVRGFromManifestWork(homeCluster, placement.GetNamespace())
vrg, err := GetFakeVRGFromMCVUsingMW(homeCluster, placement.GetNamespace())
Expect(err).NotTo(HaveOccurred())

switch plType {
Expand Down Expand Up @@ -2652,8 +2674,146 @@ var _ = Describe("DRPlacementControl Reconciler", func() {
Expect(getManifestWorkCount(East1ManagedCluster)).Should(Equal(0))
})
})

Context("Test DRPlacementControl With VolSync Setup", func() {
var userPlacementRule *plrv1.PlacementRule
var drpc *rmn.DRPlacementControl

Specify("DRClusters", func() {
RunningVolSyncTests = true
populateDRClusters()
})
When("The Application is deployed for VolSync", func() {
It("Should deploy to East1ManagedCluster", func() {
var placementObj client.Object
placementObj, drpc = InitialDeploymentAsync(
DefaultDRPCNamespace, UserPlacementRuleName, East1ManagedCluster, UsePlacementRule)
userPlacementRule = placementObj.(*plrv1.PlacementRule)
Expect(userPlacementRule).NotTo(BeNil())
verifyInitialDRPCDeployment(userPlacementRule, East1ManagedCluster)
verifyDRPCOwnedByPlacement(userPlacementRule, getLatestDRPC(DefaultDRPCNamespace))
})
})
When("DRAction is changed to Failover", func() {
It("Should failover to Secondary (West1ManagedCluster)", func() {
recoverToFailoverCluster(userPlacementRule, East1ManagedCluster, West1ManagedCluster, true)
Expect(getVRGManifestWorkCount()).Should(Equal(2))
verifyRDSpecAfterActionSwitch(West1ManagedCluster, East1ManagedCluster, 2)
})
})
When("DRAction is set to Relocate", func() {
It("Should relocate to Primary (East1ManagedCluster)", func() {
relocateToPreferredCluster(userPlacementRule, West1ManagedCluster, true)
Expect(getVRGManifestWorkCount()).Should(Equal(2))
verifyRDSpecAfterActionSwitch(East1ManagedCluster, West1ManagedCluster, 2)
})
})
When("DRAction is changed back to Failover using only 1 protectedPVC", func() {
It("Should failover to secondary (West1ManagedCluster)", func() {
ProtectedPVCCount = 1
recoverToFailoverCluster(userPlacementRule, East1ManagedCluster, West1ManagedCluster, true)
Expect(getVRGManifestWorkCount()).Should(Equal(2))
verifyRDSpecAfterActionSwitch(West1ManagedCluster, East1ManagedCluster, 1)
ProtectedPVCCount = 2
})
})
When("DRAction is set back to Relocate using only 1 protectedPVC", func() {
It("Should relocate to Primary (East1ManagedCluster)", func() {
ProtectedPVCCount = 1
relocateToPreferredCluster(userPlacementRule, West1ManagedCluster, true)
Expect(getVRGManifestWorkCount()).Should(Equal(2))
verifyRDSpecAfterActionSwitch(East1ManagedCluster, West1ManagedCluster, 1)
ProtectedPVCCount = 2
})
})
When("DRAction is changed back to Failover using only 10 protectedPVC", func() {
It("Should failover to secondary (West1ManagedCluster)", func() {
ProtectedPVCCount = 10
recoverToFailoverCluster(userPlacementRule, East1ManagedCluster, West1ManagedCluster, true)
Expect(getVRGManifestWorkCount()).Should(Equal(2))
verifyRDSpecAfterActionSwitch(West1ManagedCluster, East1ManagedCluster, 10)
ProtectedPVCCount = 2
})
})
When("DRAction is set back to Relocate using only 10 protectedPVC", func() {
It("Should relocate to Primary (East1ManagedCluster)", func() {
ProtectedPVCCount = 10
relocateToPreferredCluster(userPlacementRule, West1ManagedCluster, true)
Expect(getVRGManifestWorkCount()).Should(Equal(2))
verifyRDSpecAfterActionSwitch(East1ManagedCluster, West1ManagedCluster, 10)
ProtectedPVCCount = 2
})
})
When("Deleting DRPolicy with DRPC references", func() {
It("Should retain the deleted DRPolicy in the API server", func() {
deleteDRPolicyAsync()
ensureDRPolicyIsNotDeleted(drpc)
})
})
When("Deleting user PlacementRule", func() {
It("Should cleanup DRPC", func() {
deleteUserPlacementRule(UserPlacementRuleName, DefaultDRPCNamespace)
})
})

When("Deleting DRPC", func() {
It("Should delete VRG and NS MWs and MCVs from Primary (East1ManagedCluster)", func() {
Expect(getManifestWorkCount(East1ManagedCluster)).Should(BeElementOf(3, 4)) // DRCluster + VRG MW
deleteDRPC()
waitForCompletion("deleted")
Expect(getManifestWorkCount(East1ManagedCluster)).Should(Equal(1)) // DRCluster
Expect(getManagedClusterViewCount(East1ManagedCluster)).Should(Equal(0)) // NS + VRG MCV
ensureNamespaceMWsDeletedFromAllClusters(DefaultDRPCNamespace)
})
It("should delete the DRPC causing its referenced drpolicy to be deleted"+
" by drpolicy controller since no DRPCs reference it anymore", func() {
ensureDRPolicyIsDeleted(drpc.Spec.DRPolicyRef.Name)
})
})
Specify("delete drclusters", func() {
RunningVolSyncTests = false
deleteDRClustersAsync()
})
})
})

func getVRGManifestWorkCount() int {
count := 0

for _, drCluster := range drClusters {
mwName := rmnutil.ManifestWorkName(DRPCCommonName, DefaultDRPCNamespace, rmnutil.MWTypeVRG)
mw := &ocmworkv1.ManifestWork{}

err := k8sClient.Get(context.TODO(), types.NamespacedName{Name: mwName, Namespace: drCluster.Name}, mw)
if err == nil {
count++
}
}

return count
}

func getVRGFromManifestWork(clusterNamespace string) (*rmn.VolumeReplicationGroup, error) {
mwName := rmnutil.ManifestWorkName(DRPCCommonName, DefaultDRPCNamespace, rmnutil.MWTypeVRG)
mw := &ocmworkv1.ManifestWork{}

err := k8sClient.Get(context.TODO(), types.NamespacedName{Name: mwName, Namespace: clusterNamespace}, mw)
Expect(err).NotTo(HaveOccurred())

return rmnutil.ExtractVRGFromManifestWork(mw)
}

func verifyRDSpecAfterActionSwitch(primaryCluster, secondaryCluster string, numOfRDSpecs int) {
// For Primary Cluster
vrg, err := getVRGFromManifestWork(primaryCluster)
Expect(err).NotTo(HaveOccurred())
Expect(len(vrg.Spec.VolSync.RDSpec)).Should(Equal(0))
// For Secondary Cluster
vrg, err = getVRGFromManifestWork(secondaryCluster)
Expect(err).NotTo(HaveOccurred())
Expect(len(vrg.Spec.VolSync.RDSpec)).Should(Equal(numOfRDSpecs))
}

func verifyDRPCStateAndProgression(expectedAction rmn.DRAction, expectedPhase rmn.DRState,
exptectedPorgression rmn.ProgressionStatus,
) {
Expand Down
Loading