Skip to content

Commit

Permalink
update daemonset and statefulset to 1.24.16
Browse files Browse the repository at this point in the history
Signed-off-by: Kuromesi <blackfacepan@163.com>
  • Loading branch information
Kuromesi committed Aug 3, 2023
1 parent 25a8fdb commit da374ff
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 162 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ github.com/go-openapi/strfmt v0.21.1/go.mod h1:I/XVKeLc5+MM5oPNN7P6urMOpuLXEcNrC
github.com/go-openapi/strfmt v0.21.2 h1:5NDNgadiX1Vhemth/TH4gCGopWSTdDjxl60H3B7f+os=
github.com/go-openapi/strfmt v0.21.2/go.mod h1:I/XVKeLc5+MM5oPNN7P6urMOpuLXEcNrCX/rPGuWb0k=
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-openapi/swag v0.19.15/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-openapi/swag v0.21.1/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=
Expand Down
21 changes: 16 additions & 5 deletions pkg/controller/daemonset/daemonset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (dsc *ReconcileDaemonSet) updateDaemonSetStatus(ctx context.Context, ds *ap
}
numberUnavailable := desiredNumberScheduled - numberAvailable

err = dsc.storeDaemonSetStatus(ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen, hash)
err = dsc.storeDaemonSetStatus(ctx, ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen, hash)
if err != nil {
return fmt.Errorf("error storing status for DaemonSet %v: %v", ds.Name, err)
}
Expand All @@ -568,7 +568,18 @@ func (dsc *ReconcileDaemonSet) updateDaemonSetStatus(ctx context.Context, ds *ap
return nil
}

func (dsc *ReconcileDaemonSet) storeDaemonSetStatus(ds *appsv1alpha1.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable int, updateObservedGen bool, hash string) error {
func (dsc *ReconcileDaemonSet) storeDaemonSetStatus(
ctx context.Context,
ds *appsv1alpha1.DaemonSet,
desiredNumberScheduled,
currentNumberScheduled,
numberMisscheduled,
numberReady,
updatedNumberScheduled,
numberAvailable,
numberUnavailable int,
updateObservedGen bool,
hash string) error {
if int(ds.Status.DesiredNumberScheduled) == desiredNumberScheduled &&
int(ds.Status.CurrentNumberScheduled) == currentNumberScheduled &&
int(ds.Status.NumberMisscheduled) == numberMisscheduled &&
Expand Down Expand Up @@ -597,14 +608,14 @@ func (dsc *ReconcileDaemonSet) storeDaemonSetStatus(ds *appsv1alpha1.DaemonSet,
toUpdate.Status.NumberUnavailable = int32(numberUnavailable)
toUpdate.Status.DaemonSetHash = hash

if _, updateErr = dsClient.UpdateStatus(context.TODO(), toUpdate, metav1.UpdateOptions{}); updateErr == nil {
if _, updateErr = dsClient.UpdateStatus(ctx, toUpdate, metav1.UpdateOptions{}); updateErr == nil {
klog.Infof("Updated DaemonSet %s/%s status to %v", ds.Namespace, ds.Name, kruiseutil.DumpJSON(toUpdate.Status))
return nil
}

klog.Errorf("Update DaemonSet status %v failed: %v", ds.Status, updateErr)
// Update the set with the latest resource version for the next poll
if toUpdate, getErr = dsClient.Get(context.TODO(), ds.Name, metav1.GetOptions{}); getErr != nil {
if toUpdate, getErr = dsClient.Get(ctx, ds.Name, metav1.GetOptions{}); getErr != nil {
// If the GET fails we can't trust status.Replicas anymore. This error
// is bound to be more interesting than the update failure.
klog.Errorf("Get DaemonSet %v for status update failed: %v", ds.Name, getErr)
Expand Down Expand Up @@ -1028,7 +1039,7 @@ func (dsc *ReconcileDaemonSet) cleanupHistory(ctx context.Context, ds *appsv1alp
continue
}
// Clean up
err := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Delete(context.TODO(), history.Name, metav1.DeleteOptions{})
err := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Delete(ctx, history.Name, metav1.DeleteOptions{})
if err != nil {
return err
}
Expand Down
37 changes: 22 additions & 15 deletions pkg/controller/daemonset/daemonset_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
kubecontroller "k8s.io/kubernetes/pkg/controller"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
Expand All @@ -48,7 +49,7 @@ func (dsc *ReconcileDaemonSet) constructHistory(ctx context.Context, ds *appsv1a
if _, ok := history.Labels[apps.DefaultDaemonSetUniqueLabelKey]; !ok {
toUpdate := history.DeepCopy()
toUpdate.Labels[apps.DefaultDaemonSetUniqueLabelKey] = toUpdate.Name
history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(context.TODO(), toUpdate, metav1.UpdateOptions{})
history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(ctx, toUpdate, metav1.UpdateOptions{})
if err != nil {
return nil, nil, err
}
Expand All @@ -70,7 +71,7 @@ func (dsc *ReconcileDaemonSet) constructHistory(ctx context.Context, ds *appsv1a
switch len(currentHistories) {
case 0:
// Create a new history if the current one isn't found
cur, err = dsc.snapshot(ds, currRevision)
cur, err = dsc.snapshot(ctx, ds, currRevision)
if err != nil {
return nil, nil, err
}
Expand All @@ -83,7 +84,7 @@ func (dsc *ReconcileDaemonSet) constructHistory(ctx context.Context, ds *appsv1a
if cur.Revision < currRevision {
toUpdate := cur.DeepCopy()
toUpdate.Revision = currRevision
_, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(context.TODO(), toUpdate, metav1.UpdateOptions{})
_, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(ctx, toUpdate, metav1.UpdateOptions{})
if err != nil {
return nil, nil, err
}
Expand All @@ -104,7 +105,7 @@ func (dsc *ReconcileDaemonSet) controlledHistories(ctx context.Context, ds *apps

// List all histories to include those that don't match the selector anymore
// but have a ControllerRef pointing to the controller.
histories, err := dsc.historyLister.List(labels.Everything())
histories, err := dsc.historyLister.ControllerRevisions(ds.Namespace).List(labels.Everything())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -172,7 +173,7 @@ func maxRevision(histories []*apps.ControllerRevision) int64 {
return max
}

func (dsc *ReconcileDaemonSet) snapshot(ds *appsv1alpha1.DaemonSet, revision int64) (*apps.ControllerRevision, error) {
func (dsc *ReconcileDaemonSet) snapshot(ctx context.Context, ds *appsv1alpha1.DaemonSet, revision int64) (*apps.ControllerRevision, error) {
patch, err := getPatch(ds)
if err != nil {
return nil, err
Expand All @@ -191,10 +192,10 @@ func (dsc *ReconcileDaemonSet) snapshot(ds *appsv1alpha1.DaemonSet, revision int
Revision: revision,
}

history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Create(context.TODO(), history, metav1.CreateOptions{})
history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Create(ctx, history, metav1.CreateOptions{})
if outerErr := err; errors.IsAlreadyExists(outerErr) {
// TODO: Is it okay to get from historyLister?
existedHistory, getErr := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
existedHistory, getErr := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Get(ctx, name, metav1.GetOptions{})
if getErr != nil {
return nil, getErr
}
Expand All @@ -209,7 +210,7 @@ func (dsc *ReconcileDaemonSet) snapshot(ds *appsv1alpha1.DaemonSet, revision int

// Handle name collisions between different history
// Get the latest DaemonSet from the API server to make sure collision count is only increased when necessary
currDS, getErr := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(context.TODO(), ds.Name, metav1.GetOptions{})
currDS, getErr := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(ctx, ds.Name, metav1.GetOptions{})
if getErr != nil {
return nil, getErr
}
Expand All @@ -221,7 +222,7 @@ func (dsc *ReconcileDaemonSet) snapshot(ds *appsv1alpha1.DaemonSet, revision int
currDS.Status.CollisionCount = new(int32)
}
*currDS.Status.CollisionCount++
_, updateErr := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).UpdateStatus(context.TODO(), currDS, metav1.UpdateOptions{})
_, updateErr := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).UpdateStatus(ctx, currDS, metav1.UpdateOptions{})
if updateErr != nil {
return nil, updateErr
}
Expand Down Expand Up @@ -250,12 +251,18 @@ func (dsc *ReconcileDaemonSet) dedupCurHistories(ctx context.Context, ds *appsv1
}
for _, pod := range pods {
if pod.Labels[apps.DefaultDaemonSetUniqueLabelKey] != keepCur.Labels[apps.DefaultDaemonSetUniqueLabelKey] {
toUpdate := pod.DeepCopy()
if toUpdate.Labels == nil {
toUpdate.Labels = make(map[string]string)
patchRaw := map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
apps.DefaultDaemonSetUniqueLabelKey: keepCur.Labels[apps.DefaultDaemonSetUniqueLabelKey],
},
},
}
toUpdate.Labels[apps.DefaultDaemonSetUniqueLabelKey] = keepCur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
_, err = dsc.kubeClient.CoreV1().Pods(ds.Namespace).Update(context.TODO(), toUpdate, metav1.UpdateOptions{})
patchJson, err := json.Marshal(patchRaw)
if err != nil {
return nil, err
}
_, err = dsc.kubeClient.CoreV1().Pods(ds.Namespace).Patch(ctx, pod.Name, types.MergePatchType, patchJson, metav1.PatchOptions{})
if err != nil {
return nil, err
}
Expand All @@ -267,7 +274,7 @@ func (dsc *ReconcileDaemonSet) dedupCurHistories(ctx context.Context, ds *appsv1
continue
}
// Remove duplicates
err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Delete(context.TODO(), cur.Name, metav1.DeleteOptions{})
err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Delete(ctx, cur.Name, metav1.DeleteOptions{})
if err != nil {
return nil, err
}
Expand Down
15 changes: 6 additions & 9 deletions pkg/controller/statefulset/stateful_pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,14 @@ import (
"k8s.io/klog/v2"

appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
kruiseappslisters "github.com/openkruise/kruise/pkg/client/listers/apps/v1beta1"
"github.com/openkruise/kruise/pkg/features"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
)

// StatefulPodControlObjectManager abstracts the manipulation of Pods and PVCs. The real controller implements this
// with a clientset for writes and listers for reads; for tests we provide stubs.
type StatefulPodControlObjectManager interface {
CreatePod(pod *v1.Pod) error
CreatePod(ctx context.Context, pod *v1.Pod) error
GetPod(namespace, podName string) (*v1.Pod, error)
UpdatePod(pod *v1.Pod) error
DeletePod(pod *v1.Pod) error
Expand All @@ -62,12 +61,11 @@ type StatefulPodControl struct {
// clientset, listers and EventRecorder.
func NewStatefulPodControl(
client clientset.Interface,
setLister kruiseappslisters.StatefulSetLister,
podLister corelisters.PodLister,
claimLister corelisters.PersistentVolumeClaimLister,
recorder record.EventRecorder,
) *StatefulPodControl {
return &StatefulPodControl{&realStatefulPodControlObjectManager{client, setLister, podLister, claimLister}, recorder}
return &StatefulPodControl{&realStatefulPodControlObjectManager{client, podLister, claimLister}, recorder}
}

// NewStatefulPodControlFromManager creates a StatefulPodControl using the given StatefulPodControlObjectManager and recorder.
Expand All @@ -78,13 +76,12 @@ func NewStatefulPodControlFromManager(om StatefulPodControlObjectManager, record
// realStatefulPodControlObjectManager uses a clientset.Interface and listers.
type realStatefulPodControlObjectManager struct {
client clientset.Interface
setLister kruiseappslisters.StatefulSetLister
podLister corelisters.PodLister
claimLister corelisters.PersistentVolumeClaimLister
}

func (om *realStatefulPodControlObjectManager) CreatePod(pod *v1.Pod) error {
_, err := om.client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
func (om *realStatefulPodControlObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error {
_, err := om.client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
return err
}

Expand Down Expand Up @@ -115,14 +112,14 @@ func (om *realStatefulPodControlObjectManager) UpdateClaim(claim *v1.PersistentV
return err
}

func (spc *StatefulPodControl) CreateStatefulPod(set *appsv1beta1.StatefulSet, pod *v1.Pod) error {
func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *appsv1beta1.StatefulSet, pod *v1.Pod) error {
// Create the Pod's PVCs prior to creating the Pod
if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
spc.recordPodEvent("create", set, pod, err)
return err
}
// If we created the PVCs attempt to create the Pod
err := spc.objectMgr.CreatePod(pod)
err := spc.objectMgr.CreatePod(ctx, pod)
// sink already exists errors
if apierrors.IsAlreadyExists(err) {
return err
Expand Down
Loading

0 comments on commit da374ff

Please sign in to comment.