Skip to content

Commit

Permalink
fix(statefulset): fix maxUnavailable for rolling upgrades not taking …
Browse files Browse the repository at this point in the history
…into account pods that fail later in the updateIndexes
  • Loading branch information
Yesphet committed Jan 2, 2024
1 parent fa7a1da commit a322891
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 63 deletions.
145 changes: 86 additions & 59 deletions pkg/controller/statefulset/stateful_set_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,118 +707,145 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
}
}

return ssc.rollingUpdateStatefulsetPods(
set, &status, currentRevision, updateRevision, revisions, pods, replicas, minReadySeconds,
)
}

func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods(
set *appsv1beta1.StatefulSet,
status *appsv1beta1.StatefulSetStatus,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
revisions []*apps.ControllerRevision,
pods []*v1.Pod,
replicas []*v1.Pod,
minReadySeconds int32,
) (*appsv1beta1.StatefulSetStatus, error) {

// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
return &status, nil
return status, nil
}

// TODO: separate out the below update only related logic

// If update expectations have not satisfied yet, skip updating pods
if updateSatisfied, _, updateDirtyPods := updateExpectations.SatisfiedExpectations(getStatefulSetKey(set), updateRevision.Name); !updateSatisfied {
klog.V(4).Infof("Not satisfied update for %v, updateDirtyPods=%v", getStatefulSetKey(set), updateDirtyPods)
return &status, nil
return status, nil
}

// refresh states for all pods
var modified bool
for _, pod := range pods {
if pod == nil {
continue
}
refreshed, duration, err := ssc.refreshPodState(set, pod, updateRevision.Name)
if err != nil {
return status, err
} else if duration > 0 {
durationStore.Push(getStatefulSetKey(set), duration)
}
if refreshed {
modified = true
}
}
if modified {
return status, nil
}

var err error
// we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
maxUnavailable := 1
if set.Spec.UpdateStrategy.RollingUpdate != nil {
if set.Spec.UpdateStrategy.RollingUpdate.Paused {
return &status, nil
return status, nil
}

maxUnavailable, err = intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(set.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, intstrutil.FromInt(1)), int(*set.Spec.Replicas), false)
if err != nil {
return &status, err
return status, err
}
// maxUnavailable should not be less than 1
if maxUnavailable < 1 {
maxUnavailable = 1
}
}

// refresh states for all pods
var modified bool
for _, pod := range pods {
refreshed, duration, err := ssc.refreshPodState(set, pod, updateRevision.Name)
if err != nil {
return &status, err
} else if duration > 0 {
durationStore.Push(getStatefulSetKey(set), duration)
minWaitTime := appsv1beta1.MaxMinReadySeconds * time.Second
unavailablePods := sets.NewString()
opts := &inplaceupdate.UpdateOptions{}
opts = inplaceupdate.SetOptionsDefaults(opts)
// counts any targets in the replicas that are unhealthy (terminated / in-place update not-ready / not running and
// ready for minReadySeconds) for checking if the count satisfied the MaxUnavailable limit.
for target := range replicas {
if replicas[target] == nil {
continue
}
if refreshed {
modified = true
if !isHealthy(replicas[target]) {
// 1. count pod as unavailable if it's unhealthy or terminating
unavailablePods.Insert(replicas[target].Name)
} else if completedErr := opts.CheckPodUpdateCompleted(replicas[target]); completedErr != nil {
// 2. count pod as unavailable if it's in-place updating and not ready
klog.V(4).Infof("StatefulSet %s/%s check Pod %s in-place update not-ready: %v",
set.Namespace,
set.Name,
replicas[target].Name,
completedErr)
unavailablePods.Insert(replicas[target].Name)
} else if isAvailable, waitTime := isRunningAndAvailable(replicas[target], minReadySeconds); !isAvailable {
// 3. count pod as unavailable if it's not available yet given the minReadySeconds requirement
unavailablePods.Insert(replicas[target].Name)
// make sure that we will wait for the first pod to get available
if waitTime != 0 && waitTime <= minWaitTime {
minWaitTime = waitTime
durationStore.Push(getStatefulSetKey(set), waitTime)
}
}
}
if modified {
return &status, nil
}

var unavailablePods []string
updateIndexes := sortPodsToUpdate(set.Spec.UpdateStrategy.RollingUpdate, updateRevision.Name, *set.Spec.Replicas, replicas)
klog.V(3).Infof("Prepare to update pods indexes %v for StatefulSet %s", updateIndexes, getStatefulSetKey(set))
minWaitTime := appsv1beta1.MaxMinReadySeconds * time.Second
// update pods in sequence
for _, target := range updateIndexes {

// the unavailable pods count exceed the maxUnavailable and the target is available, so we can't process it,
// wait for unhealthy Pods on update
if len(unavailablePods) >= maxUnavailable && !unavailablePods.Has(replicas[target].Name) {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for unavailable Pods %v to update",
set.Namespace,
set.Name,
unavailablePods.List())
return status, nil
}

// delete the Pod if it is not already terminating and does not match the update revision.
if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
// todo validate in-place for pub
inplacing, inplaceUpdateErr := ssc.inPlaceUpdatePod(set, replicas[target], updateRevision, revisions)
if inplaceUpdateErr != nil {
return &status, inplaceUpdateErr
return status, inplaceUpdateErr
}
if !inplacing {
klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update",
set.Namespace,
set.Name,
replicas[target].Name)
if _, err := ssc.deletePod(set, replicas[target]); err != nil {
return &status, err
return status, err
}
}
// mark target as unavailable because it's updated
unavailablePods.Insert(replicas[target].Name)

if getPodRevision(replicas[target]) == currentRevision.Name {
status.CurrentReplicas--
}
}

opts := &inplaceupdate.UpdateOptions{}
opts = inplaceupdate.SetOptionsDefaults(opts)
if getPodRevision(replicas[target]) != updateRevision.Name || !isHealthy(replicas[target]) {
unavailablePods = append(unavailablePods, replicas[target].Name)
} else if completedErr := opts.CheckPodUpdateCompleted(replicas[target]); completedErr != nil {
klog.V(4).Infof("StatefulSet %s/%s check Pod %s in-place update not-ready: %v",
set.Namespace,
set.Name,
replicas[target].Name,
completedErr)
unavailablePods = append(unavailablePods, replicas[target].Name)
} else {
// check if the updated pod is running and available given minReadySeconds
if isAvailable, waitTime := isRunningAndAvailable(replicas[target], minReadySeconds); !isAvailable {
// the pod is not available yet given the minReadySeconds requirement
unavailablePods = append(unavailablePods, replicas[target].Name)
// make sure that we will wait for the first pod to get available
if waitTime != 0 && waitTime <= minWaitTime {
minWaitTime = waitTime
durationStore.Push(getStatefulSetKey(set), waitTime)
}
}
}

// wait for unhealthy Pods on update
if len(unavailablePods) >= maxUnavailable {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for unavailable Pods %v to update",
set.Namespace,
set.Name,
unavailablePods)
return &status, nil
}

}
return &status, nil

return status, nil
}

func (ssc *defaultStatefulSetControl) deletePod(set *appsv1beta1.StatefulSet, pod *v1.Pod) (bool, error) {
Expand Down
108 changes: 104 additions & 4 deletions pkg/controller/statefulset/stateful_set_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1692,6 +1692,9 @@ func TestUpdateStatefulSetWithMinReadySeconds(t *testing.T) {
pod.Status.Phase = v1.PodRunning
condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
podutil.UpdatePodCondition(&pod.Status, &condition)
// force reset pod ready time to now because we use InPlaceIfPossible strategy in these testcases and
// the fakeObjectManager won't reset the pod's status when update pod in place.
podutil.GetPodReadyCondition(pod.Status).LastTransitionTime = metav1.Now()
fakeResourceVersion(pod)
if err := om.podsIndexer.Update(pod); err != nil {
return err
Expand Down Expand Up @@ -1813,7 +1816,7 @@ func TestUpdateStatefulSetWithMinReadySeconds(t *testing.T) {
}
// update the image
set.Spec.Template.Spec.Containers[0].Image = "foo"
// reconcile once, start with no pod
// reconcile once, start with no pod updated
if err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatalf("%s: %s", test.name, err)
}
Expand All @@ -1825,7 +1828,7 @@ func TestUpdateStatefulSetWithMinReadySeconds(t *testing.T) {
if err := test.updatePod(spc, set, pods); err != nil {
t.Fatalf("%s: %s", test.name, err)
}
// reconcile once more
// reconcile twice
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatalf("%s: %s", test.name, err)
Expand Down Expand Up @@ -2124,6 +2127,90 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
}
}

func TestStatefulSetControlRollingUpdateBlockByMaxUnavailable(t *testing.T) {
set := burst(newStatefulSet(6))
var partition int32 = 3
var maxUnavailable = intstr.FromInt(2)
set.Spec.UpdateStrategy = appsv1beta1.StatefulSetUpdateStrategy{
Type: apps.RollingUpdateStatefulSetStrategyType,
RollingUpdate: func() *appsv1beta1.RollingUpdateStatefulSetStrategy {
return &appsv1beta1.RollingUpdateStatefulSetStrategy{
Partition: &partition,
MaxUnavailable: &maxUnavailable,
}
}(),
}

client := fake.NewSimpleClientset()
kruiseClient := kruisefake.NewSimpleClientset(set)
spc, _, ssc, stop := setupController(client, kruiseClient)
defer close(stop)
if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil {
t.Fatal(err)
}
set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatal(err)
}
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Fatal(err)
}

// set pod 0 to terminating
originalPods, err := spc.setPodTerminated(set, 0)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(originalPods))

// start to update
set.Spec.Template.Spec.Containers[0].Image = "foo"

// first update pod 5 only because pod 0 is terminating
if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(pods))
if !reflect.DeepEqual(pods, originalPods[:5]) {
t.Fatalf("Expected pods %v, got pods %v", originalPods[:3], pods)
}

// create new pods 5
if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
if len(pods) != 6 {
t.Fatalf("Expected create pods 5, got pods %v", pods)
}

// set pod 2 to terminating and pod 5 to ready
spc.setPodTerminated(set, 2)
spc.setPodRunning(set, 5)
originalPods, _ = spc.setPodReady(set, 5)
sort.Sort(ascendingOrdinal(originalPods))
// should not update any pods because pod 0 and 2 are terminating
if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(pods))
if !reflect.DeepEqual(pods, originalPods) {
t.Fatalf("Expected pods %v, got pods %v", originalPods, pods)
}
}

func TestStatefulSetControlInPlaceUpdate(t *testing.T) {
set := burst(newStatefulSet(3))
var partition int32 = 1
Expand Down Expand Up @@ -2957,6 +3044,12 @@ func (om *fakeObjectManager) setPodRunning(set *appsv1beta1.StatefulSet, ordinal
}

func (om *fakeObjectManager) setPodReady(set *appsv1beta1.StatefulSet, ordinal int) ([]*v1.Pod, error) {
return om.setPodReadyWithMinReadySeconds(set, ordinal, 0)
}

func (om *fakeObjectManager) setPodReadyWithMinReadySeconds(
set *appsv1beta1.StatefulSet, ordinal int, minReadySeconds int32,
) ([]*v1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
Expand All @@ -2972,6 +3065,13 @@ func (om *fakeObjectManager) setPodReady(set *appsv1beta1.StatefulSet, ordinal i
pod := pods[ordinal].DeepCopy()
condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
podutil.UpdatePodCondition(&pod.Status, &condition)
if readyTime := podutil.GetPodReadyCondition(pod.Status).LastTransitionTime; minReadySeconds > 0 &&
metav1.Now().Sub(readyTime.Time) < time.Duration(minReadySeconds)*time.Second {

podutil.GetPodReadyCondition(pod.Status).LastTransitionTime = metav1.NewTime(
time.Now().Add(-time.Second * time.Duration(minReadySeconds)),
)
}
fakeResourceVersion(pod)
om.podsIndexer.Update(pod)
return om.podsLister.Pods(set.Namespace).List(selector)
Expand Down Expand Up @@ -3242,7 +3342,7 @@ func scaleUpStatefulSetControl(set *appsv1beta1.StatefulSet,
if err != nil {
return err
}
for set.Status.ReadyReplicas < *set.Spec.Replicas {
for set.Status.UpdatedAvailableReplicas < *set.Spec.Replicas {
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
return err
Expand All @@ -3269,7 +3369,7 @@ func scaleUpStatefulSetControl(set *appsv1beta1.StatefulSet,
return err
}
case v1.PodRunning:
if pods, err = om.setPodReady(set, ord); err != nil {
if pods, err = om.setPodReadyWithMinReadySeconds(set, ord, getMinReadySeconds(set)); err != nil {
return err
}
default:
Expand Down

0 comments on commit a322891

Please sign in to comment.