From a67947e3454ea5f9cc761a5647a7f61f150df91c Mon Sep 17 00:00:00 2001 From: Yak2p Date: Tue, 23 Jan 2024 03:35:36 -0600 Subject: [PATCH] fix(statefulset): fix maxUnavailable for rolling upgrades not taking into account pods that fail later in the updateIndexes. (#1480) Signed-off-by: Yesphet --- .../statefulset/stateful_set_control.go | 149 +++++++++++------- .../statefulset/stateful_set_control_test.go | 108 ++++++++++++- 2 files changed, 195 insertions(+), 62 deletions(-) diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index eae2bd6fa9..f1247aec6d 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -704,24 +704,59 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( return &status, nil } - // TODO: separate out the below update only related logic + return ssc.rollingUpdateStatefulsetPods( + set, &status, currentRevision, updateRevision, revisions, pods, replicas, minReadySeconds, + ) +} + +func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods( + set *appsv1beta1.StatefulSet, + status *appsv1beta1.StatefulSetStatus, + currentRevision *apps.ControllerRevision, + updateRevision *apps.ControllerRevision, + revisions []*apps.ControllerRevision, + pods []*v1.Pod, + replicas []*v1.Pod, + minReadySeconds int32, +) (*appsv1beta1.StatefulSetStatus, error) { // If update expectations have not satisfied yet, skip updating pods if updateSatisfied, _, updateDirtyPods := updateExpectations.SatisfiedExpectations(getStatefulSetKey(set), updateRevision.Name); !updateSatisfied { klog.V(4).Infof("Not satisfied update for %v, updateDirtyPods=%v", getStatefulSetKey(set), updateDirtyPods) - return &status, nil + return status, nil + } + + // refresh states for all pods + var modified bool + for _, pod := range pods { + if pod == nil { + continue + } + refreshed, duration, err := ssc.refreshPodState(set, pod, updateRevision.Name) + if err != nil { + return status, err + } else if duration > 0 { + durationStore.Push(getStatefulSetKey(set), duration) + } + if refreshed { + modified = true + } + } + if modified { + return status, nil } + var err error // we compute the minimum ordinal of the target sequence for a destructive update based on the strategy. maxUnavailable := 1 if set.Spec.UpdateStrategy.RollingUpdate != nil { if set.Spec.UpdateStrategy.RollingUpdate.Paused { - return &status, nil + return status, nil } maxUnavailable, err = intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(set.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, intstrutil.FromInt(1)), int(*set.Spec.Replicas), false) if err != nil { - return &status, err + return status, err } // maxUnavailable should not be less than 1 if maxUnavailable < 1 { @@ -729,36 +764,66 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } } - // refresh states for all pods - var modified bool - for _, pod := range pods { - refreshed, duration, err := ssc.refreshPodState(set, pod, updateRevision.Name) - if err != nil { - return &status, err - } else if duration > 0 { - durationStore.Push(getStatefulSetKey(set), duration) + minWaitTime := appsv1beta1.MaxMinReadySeconds * time.Second + unavailablePods := sets.NewString() + opts := &inplaceupdate.UpdateOptions{} + opts = inplaceupdate.SetOptionsDefaults(opts) + // counts any targets in the replicas that are unhealthy (terminated / in-place update not-ready / not running and + // ready for minReadySeconds) for checking if the count satisfied the MaxUnavailable limit. + for target := range replicas { + if replicas[target] == nil { + continue } - if refreshed { - modified = true + if !isHealthy(replicas[target]) { + // 1. count pod as unavailable if it's unhealthy or terminating + unavailablePods.Insert(replicas[target].Name) + } else if completedErr := opts.CheckPodUpdateCompleted(replicas[target]); completedErr != nil { + // 2. count pod as unavailable if it's in-place updating and not ready + klog.V(4).Infof("StatefulSet %s/%s check Pod %s in-place update not-ready: %v", + set.Namespace, + set.Name, + replicas[target].Name, + completedErr) + unavailablePods.Insert(replicas[target].Name) + } else if isAvailable, waitTime := isRunningAndAvailable(replicas[target], minReadySeconds); !isAvailable { + // 3. count pod as unavailable if it's not available yet given the minReadySeconds requirement + unavailablePods.Insert(replicas[target].Name) + // make sure that we will wait for the first pod to get available + if waitTime != 0 && waitTime <= minWaitTime { + minWaitTime = waitTime + durationStore.Push(getStatefulSetKey(set), waitTime) + } } } - if modified { - return &status, nil - } - var unavailablePods []string updateIndexes := sortPodsToUpdate(set.Spec.UpdateStrategy.RollingUpdate, updateRevision.Name, *set.Spec.Replicas, replicas) klog.V(3).Infof("Prepare to update pods indexes %v for StatefulSet %s", updateIndexes, getStatefulSetKey(set)) - minWaitTime := appsv1beta1.MaxMinReadySeconds * time.Second // update pods in sequence for _, target := range updateIndexes { + // the target is already up-to-date, go to next + if getPodRevision(replicas[target]) == updateRevision.Name { + continue + } + + // the unavailable pods count exceed the maxUnavailable and the target is available, so we can't process it, + // wait for unhealthy Pods on update + if len(unavailablePods) >= maxUnavailable && !unavailablePods.Has(replicas[target].Name) { + klog.V(4).Infof( + "StatefulSet %s/%s is waiting for unavailable Pods %v to update, blocked pod %s", + set.Namespace, + set.Name, + unavailablePods.List(), + replicas[target].Name) + return status, nil + } + // delete the Pod if it is not already terminating and does not match the update revision. - if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) { + if !isTerminating(replicas[target]) { // todo validate in-place for pub inplacing, inplaceUpdateErr := ssc.inPlaceUpdatePod(set, replicas[target], updateRevision, revisions) if inplaceUpdateErr != nil { - return &status, inplaceUpdateErr + return status, inplaceUpdateErr } if !inplacing { klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update", @@ -766,51 +831,19 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( set.Name, replicas[target].Name) if _, err := ssc.deletePod(set, replicas[target]); err != nil { - return &status, err + return status, err } } + // mark target as unavailable because it's updated + unavailablePods.Insert(replicas[target].Name) if getPodRevision(replicas[target]) == currentRevision.Name { status.CurrentReplicas-- } } - - opts := &inplaceupdate.UpdateOptions{} - opts = inplaceupdate.SetOptionsDefaults(opts) - if getPodRevision(replicas[target]) != updateRevision.Name || !isHealthy(replicas[target]) { - unavailablePods = append(unavailablePods, replicas[target].Name) - } else if completedErr := opts.CheckPodUpdateCompleted(replicas[target]); completedErr != nil { - klog.V(4).Infof("StatefulSet %s/%s check Pod %s in-place update not-ready: %v", - set.Namespace, - set.Name, - replicas[target].Name, - completedErr) - unavailablePods = append(unavailablePods, replicas[target].Name) - } else { - // check if the updated pod is running and available given minReadySeconds - if isAvailable, waitTime := isRunningAndAvailable(replicas[target], minReadySeconds); !isAvailable { - // the pod is not available yet given the minReadySeconds requirement - unavailablePods = append(unavailablePods, replicas[target].Name) - // make sure that we will wait for the first pod to get available - if waitTime != 0 && waitTime <= minWaitTime { - minWaitTime = waitTime - durationStore.Push(getStatefulSetKey(set), waitTime) - } - } - } - - // wait for unhealthy Pods on update - if len(unavailablePods) >= maxUnavailable { - klog.V(4).Infof( - "StatefulSet %s/%s is waiting for unavailable Pods %v to update", - set.Namespace, - set.Name, - unavailablePods) - return &status, nil - } - } - return &status, nil + + return status, nil } func (ssc *defaultStatefulSetControl) deletePod(set *appsv1beta1.StatefulSet, pod *v1.Pod) (bool, error) { diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 1675a72de5..32735fa41d 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -1692,6 +1692,9 @@ func TestUpdateStatefulSetWithMinReadySeconds(t *testing.T) { pod.Status.Phase = v1.PodRunning condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue} podutil.UpdatePodCondition(&pod.Status, &condition) + // force reset pod ready time to now because we use InPlaceIfPossible strategy in these testcases and + // the fakeObjectManager won't reset the pod's status when update pod in place. + podutil.GetPodReadyCondition(pod.Status).LastTransitionTime = metav1.Now() fakeResourceVersion(pod) if err := om.podsIndexer.Update(pod); err != nil { return err @@ -1813,7 +1816,7 @@ func TestUpdateStatefulSetWithMinReadySeconds(t *testing.T) { } // update the image set.Spec.Template.Spec.Containers[0].Image = "foo" - // reconcile once, start with no pod + // reconcile once, start with no pod updated if err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { t.Fatalf("%s: %s", test.name, err) } @@ -1825,7 +1828,7 @@ func TestUpdateStatefulSetWithMinReadySeconds(t *testing.T) { if err := test.updatePod(spc, set, pods); err != nil { t.Fatalf("%s: %s", test.name, err) } - // reconcile once more + // reconcile twice pods, err = spc.podsLister.Pods(set.Namespace).List(selector) if err != nil { t.Fatalf("%s: %s", test.name, err) @@ -2124,6 +2127,90 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) { } } +func TestStatefulSetControlRollingUpdateBlockByMaxUnavailable(t *testing.T) { + set := burst(newStatefulSet(6)) + var partition int32 = 3 + var maxUnavailable = intstr.FromInt(2) + set.Spec.UpdateStrategy = appsv1beta1.StatefulSetUpdateStrategy{ + Type: apps.RollingUpdateStatefulSetStrategyType, + RollingUpdate: func() *appsv1beta1.RollingUpdateStatefulSetStrategy { + return &appsv1beta1.RollingUpdateStatefulSetStrategy{ + Partition: &partition, + MaxUnavailable: &maxUnavailable, + } + }(), + } + + client := fake.NewSimpleClientset() + kruiseClient := kruisefake.NewSimpleClientset(set) + spc, _, ssc, stop := setupController(client, kruiseClient) + defer close(stop) + if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil { + t.Fatal(err) + } + set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatal(err) + } + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Fatal(err) + } + + // set pod 0 to terminating + originalPods, err := spc.setPodTerminated(set, 0) + if err != nil { + t.Fatal(err) + } + sort.Sort(ascendingOrdinal(originalPods)) + + // start to update + set.Spec.Template.Spec.Containers[0].Image = "foo" + + // first update pod 5 only because pod 0 is terminating + if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil { + t.Fatal(err) + } + pods, err := spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + sort.Sort(ascendingOrdinal(pods)) + if !reflect.DeepEqual(pods, originalPods[:5]) { + t.Fatalf("Expected pods %v, got pods %v", originalPods[:3], pods) + } + + // create new pods 5 + if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + if len(pods) != 6 { + t.Fatalf("Expected create pods 5, got pods %v", pods) + } + + // set pod 2 to terminating and pod 5 to ready + spc.setPodTerminated(set, 2) + spc.setPodRunning(set, 5) + originalPods, _ = spc.setPodReady(set, 5) + sort.Sort(ascendingOrdinal(originalPods)) + // should not update any pods because pod 0 and 2 are terminating + if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + sort.Sort(ascendingOrdinal(pods)) + if !reflect.DeepEqual(pods, originalPods) { + t.Fatalf("Expected pods %v, got pods %v", originalPods, pods) + } +} + func TestStatefulSetControlInPlaceUpdate(t *testing.T) { set := burst(newStatefulSet(3)) var partition int32 = 1 @@ -2957,6 +3044,12 @@ func (om *fakeObjectManager) setPodRunning(set *appsv1beta1.StatefulSet, ordinal } func (om *fakeObjectManager) setPodReady(set *appsv1beta1.StatefulSet, ordinal int) ([]*v1.Pod, error) { + return om.setPodReadyWithMinReadySeconds(set, ordinal, 0) +} + +func (om *fakeObjectManager) setPodReadyWithMinReadySeconds( + set *appsv1beta1.StatefulSet, ordinal int, minReadySeconds int32, +) ([]*v1.Pod, error) { selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { return nil, err @@ -2972,6 +3065,13 @@ func (om *fakeObjectManager) setPodReady(set *appsv1beta1.StatefulSet, ordinal i pod := pods[ordinal].DeepCopy() condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue} podutil.UpdatePodCondition(&pod.Status, &condition) + if readyTime := podutil.GetPodReadyCondition(pod.Status).LastTransitionTime; minReadySeconds > 0 && + metav1.Now().Sub(readyTime.Time) < time.Duration(minReadySeconds)*time.Second { + + podutil.GetPodReadyCondition(pod.Status).LastTransitionTime = metav1.NewTime( + time.Now().Add(-time.Second * time.Duration(minReadySeconds)), + ) + } fakeResourceVersion(pod) om.podsIndexer.Update(pod) return om.podsLister.Pods(set.Namespace).List(selector) @@ -3242,7 +3342,7 @@ func scaleUpStatefulSetControl(set *appsv1beta1.StatefulSet, if err != nil { return err } - for set.Status.ReadyReplicas < *set.Spec.Replicas { + for set.Status.UpdatedAvailableReplicas < *set.Spec.Replicas { pods, err := om.podsLister.Pods(set.Namespace).List(selector) if err != nil { return err @@ -3269,7 +3369,7 @@ func scaleUpStatefulSetControl(set *appsv1beta1.StatefulSet, return err } case v1.PodRunning: - if pods, err = om.setPodReady(set, ord); err != nil { + if pods, err = om.setPodReadyWithMinReadySeconds(set, ord, getMinReadySeconds(set)); err != nil { return err } default: