Skip to content

Commit

Permalink
add prometheus for pub and deletion protection (#1398)
Browse files Browse the repository at this point in the history
Signed-off-by: liheng.zms <liheng.zms@alibaba-inc.com>
  • Loading branch information
zmberg authored Sep 6, 2023
1 parent ebcee23 commit 2774109
Show file tree
Hide file tree
Showing 22 changed files with 179 additions and 57 deletions.
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
extclient "github.com/openkruise/kruise/pkg/client"
"github.com/openkruise/kruise/pkg/control/pubcontrol"
"github.com/openkruise/kruise/pkg/controller"
"github.com/openkruise/kruise/pkg/features"
"github.com/openkruise/kruise/pkg/util"
Expand Down Expand Up @@ -188,6 +189,7 @@ func main() {
setupLog.Error(err, "unable to start ControllerFinder")
os.Exit(1)
}
pubcontrol.InitPubControl(mgr.GetClient(), controllerfinder.Finder, mgr.GetEventRecorderFor("pub-controller"))

setupLog.Info("register field index")
if err := fieldindex.RegisterFieldIndexes(mgr.GetCache()); err != nil {
Expand Down
17 changes: 13 additions & 4 deletions pkg/control/pubcontrol/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ import (
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type PubControl interface {
var PubControl pubControl
var recorder record.EventRecorder
var kclient client.Client

type pubControl interface {
// IsPodReady indicates whether pod is fully ready
// 1. pod.Status.Phase == v1.PodRunning
// 2. pod.condition PodReady == true
Expand All @@ -41,9 +47,12 @@ type PubControl interface {
IsPodUnavailableChanged(oldPod, newPod *corev1.Pod) bool
// get pub for pod
GetPubForPod(pod *corev1.Pod) (*policyv1alpha1.PodUnavailableBudget, error)
// get pod controller of
GetPodControllerOf(pod *corev1.Pod) *metav1.OwnerReference
}

func NewPubControl(client client.Client) PubControl {
controllerFinder := controllerfinder.Finder
return &commonControl{controllerFinder: controllerFinder, Client: client}
func InitPubControl(cli client.Client, finder *controllerfinder.ControllerFinder, rec record.EventRecorder) {
recorder = rec
kclient = cli
PubControl = &commonControl{controllerFinder: finder, Client: cli}
}
6 changes: 6 additions & 0 deletions pkg/control/pubcontrol/pub_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
kubecontroller "k8s.io/kubernetes/pkg/controller"
Expand All @@ -43,6 +44,7 @@ type commonControl struct {
}

func (c *commonControl) IsPodReady(pod *corev1.Pod) bool {
klog.Infof("IsPodReady pod(%s)", util.DumpJSON(pod))
// 1. pod.Status.Phase == v1.PodRunning
// 2. pod.condition PodReady == true
if !util.IsRunningAndReady(pod) {
Expand Down Expand Up @@ -168,6 +170,10 @@ func (c *commonControl) GetPubForPod(pod *corev1.Pod) (*policyv1alpha1.PodUnavai
return pub, nil
}

func (c *commonControl) GetPodControllerOf(pod *corev1.Pod) *metav1.OwnerReference {
return metav1.GetControllerOf(pod)
}

func getSidecarSetsInPod(pod *corev1.Pod) (sidecarSets, containers sets.String) {
containers = sets.NewString()
sidecarSets = sets.NewString()
Expand Down
38 changes: 38 additions & 0 deletions pkg/control/pubcontrol/pub_control_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright 2023 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pubcontrol

import (
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

var (
PodUnavailableBudgetMetrics = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "pod_unavailable_budget",
Help: "Pod Unavailable Budget Metrics",
// kind = CloneSet, Deployment, StatefulSet, etc.
// name = workload.name, if pod don't have workload, then name is pod.name
// username = client useragent
}, []string{"kind_namespace_name", "username"},
)
)

func init() {
metrics.Registry.MustRegister(PodUnavailableBudgetMetrics)
}
29 changes: 21 additions & 8 deletions pkg/control/pubcontrol/pub_control_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
Expand All @@ -58,21 +57,21 @@ const (
// parameters:
// 1. allowed(bool) indicates whether to allow this update operation
// 2. err(error)
func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, pod *corev1.Pod, operation policyv1alpha1.PubOperation, dryRun bool) (allowed bool, reason string, err error) {
func PodUnavailableBudgetValidatePod(pod *corev1.Pod, operation policyv1alpha1.PubOperation, username string, dryRun bool) (allowed bool, reason string, err error) {
klog.V(3).Infof("validating pod(%s/%s) operation(%s) for PodUnavailableBudget", pod.Namespace, pod.Name, operation)
// pods that contain annotations[pod.kruise.io/pub-no-protect]="true" will be ignore
// and will no longer check the pub quota
if pod.Annotations[policyv1alpha1.PodPubNoProtectionAnnotation] == "true" {
klog.V(3).Infof("pod(%s/%s) contains annotations[%s]=true, then don't need check pub", pod.Namespace, pod.Name, policyv1alpha1.PodPubNoProtectionAnnotation)
return true, "", nil
// If the pod is not ready, it doesn't count towards healthy and we should not decrement
} else if !control.IsPodReady(pod) {
} else if !PubControl.IsPodReady(pod) {
klog.V(3).Infof("pod(%s/%s) is not ready, then don't need check pub", pod.Namespace, pod.Name)
return true, "", nil
}

// pub for pod
pub, err := control.GetPubForPod(pod)
pub, err := PubControl.GetPubForPod(pod)
if err != nil {
return false, "", err
// if there is no matching PodUnavailableBudget, just return true
Expand Down Expand Up @@ -119,7 +118,7 @@ func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, p
}

informerCached := &policyv1alpha1.PodUnavailableBudget{}
if err := client.Get(context.TODO(), types.NamespacedName{Namespace: pub.Namespace,
if err := kclient.Get(context.TODO(), types.NamespacedName{Namespace: pub.Namespace,
Name: pub.Name}, informerCached); err == nil {
var localRV, informerRV int64
_ = runtime.Convert_string_To_int64(&pubClone.ResourceVersion, &localRV, nil)
Expand All @@ -133,8 +132,22 @@ func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, p

// Try to verify-and-decrement
// If it was false already, or if it becomes false during the course of our retries,
err := checkAndDecrement(pod.Name, pubClone, operation)
err = checkAndDecrement(pod.Name, pubClone, operation)
if err != nil {
var kind, namespace, name string
if ref := PubControl.GetPodControllerOf(pod); ref != nil {
kind = ref.Kind
name = ref.Name
} else {
kind = "unknown"
name = pod.Name
}
namespace = pod.Namespace
if namespace == "" {
namespace = "default"
}
PodUnavailableBudgetMetrics.WithLabelValues(fmt.Sprintf("%s_%s_%s", kind, namespace, name), username).Add(1)
recorder.Eventf(pod, corev1.EventTypeWarning, "PubPreventPodDeletion", "openkruise pub prevents pod deletion")
return err
}

Expand All @@ -147,15 +160,15 @@ func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, p
pubClone.Namespace, pubClone.Name, len(pubClone.Status.DisruptedPods), len(pubClone.Status.UnavailablePods),
pubClone.Status.TotalReplicas, pubClone.Status.DesiredAvailable, pubClone.Status.CurrentAvailable, pubClone.Status.UnavailableAllowed)
start = time.Now()
err = client.Status().Update(context.TODO(), pubClone)
err = kclient.Status().Update(context.TODO(), pubClone)
costOfUpdate += time.Since(start)
if err == nil {
if err = util.GlobalCache.Add(pubClone); err != nil {
klog.Errorf("Add cache failed for PodUnavailableBudget(%s/%s): %s", pub.Namespace, pub.Name, err.Error())
}
return nil
}
// if conflict, then retry
// if conflicts, then retry
conflictTimes++
refresh = true
return err
Expand Down
15 changes: 9 additions & 6 deletions pkg/control/pubcontrol/pub_control_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ import (
"testing"
"time"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"

appspub "github.com/openkruise/kruise/apis/apps/pub"
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/record"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
utilpointer "k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -278,8 +279,9 @@ func TestPodUnavailableBudgetValidatePod(t *testing.T) {
for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cs.getPub()).Build()
control := NewPubControl(fakeClient)
allow, _, err := PodUnavailableBudgetValidatePod(fakeClient, control, cs.getPod(), cs.operation, false)
finder := &controllerfinder.ControllerFinder{Client: fakeClient}
InitPubControl(fakeClient, finder, record.NewFakeRecorder(10))
allow, _, err := PodUnavailableBudgetValidatePod(cs.getPod(), cs.operation, "fake-user", false)
if err != nil {
t.Fatalf("PodUnavailableBudgetValidatePod failed: %s", err.Error())
}
Expand Down Expand Up @@ -385,9 +387,10 @@ func TestGetPodUnavailableBudgetForPod(t *testing.T) {
for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cs.getDeployment(), cs.getReplicaSet(), cs.getPub()).Build()
control := NewPubControl(fakeClient)
finder := &controllerfinder.ControllerFinder{Client: fakeClient}
InitPubControl(fakeClient, finder, record.NewFakeRecorder(10))
pod := cs.getPod()
pub, err := control.GetPubForPod(pod)
pub, err := PubControl.GetPubForPod(pod)
if err != nil {
t.Fatalf("GetPubForPod failed: %s", err.Error())
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/controller/cloneset/sync/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package sync

import (
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/control/pubcontrol"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
Expand Down Expand Up @@ -49,7 +48,6 @@ type realControl struct {
inplaceControl inplaceupdate.Interface
recorder record.EventRecorder
controllerFinder *controllerfinder.ControllerFinder
pubControl pubcontrol.PubControl
}

func New(c client.Client, recorder record.EventRecorder) Interface {
Expand All @@ -59,6 +57,5 @@ func New(c client.Client, recorder record.EventRecorder) Interface {
lifecycleControl: lifecycle.New(c),
recorder: recorder,
controllerFinder: controllerfinder.Finder,
pubControl: pubcontrol.NewPubControl(c),
}
}
2 changes: 1 addition & 1 deletion pkg/controller/cloneset/sync/cloneset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (c *realControl) Update(cs *appsv1alpha1.CloneSet,
pod := pods[idx]
// Determine the pub before updating the pod
if utilfeature.DefaultFeatureGate.Enabled(features.PodUnavailableBudgetUpdateGate) {
allowed, _, err := pubcontrol.PodUnavailableBudgetValidatePod(c.Client, c.pubControl, pod, policyv1alpha1.PubUpdateOperation, false)
allowed, _, err := pubcontrol.PodUnavailableBudgetValidatePod(pod, policyv1alpha1.PubUpdateOperation, "kruise-manager", false)
if err != nil {
return err
// pub check does not pass, try again in seconds
Expand Down
2 changes: 0 additions & 2 deletions pkg/controller/cloneset/sync/cloneset_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/openkruise/kruise/apis"
appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/control/pubcontrol"
clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/features"
Expand Down Expand Up @@ -959,7 +958,6 @@ func TestUpdate(t *testing.T) {
inplaceupdate.New(fakeClient, clonesetutils.RevisionAdapterImpl),
record.NewFakeRecorder(10),
&controllerfinder.ControllerFinder{Client: fakeClient},
pubcontrol.NewPubControl(fakeClient),
}
currentRevision := mc.updateRevision
if len(mc.revisions) > 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
Scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor("podunavailablebudget-controller"),
controllerFinder: controllerfinder.Finder,
pubControl: pubcontrol.NewPubControl(mgr.GetClient()),
}
}

Expand Down Expand Up @@ -203,7 +202,6 @@ type ReconcilePodUnavailableBudget struct {
Scheme *runtime.Scheme
recorder record.EventRecorder
controllerFinder *controllerfinder.ControllerFinder
pubControl pubcontrol.PubControl
}

// +kubebuilder:rbac:groups=policy.kruise.io,resources=podunavailablebudgets,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -251,7 +249,7 @@ func (r *ReconcilePodUnavailableBudget) Reconcile(_ context.Context, req ctrl.Re

func (r *ReconcilePodUnavailableBudget) syncPodUnavailableBudget(pub *policyv1alpha1.PodUnavailableBudget) (*time.Time, error) {
currentTime := time.Now()
pods, expectedCount, err := r.pubControl.GetPodsForPub(pub)
pods, expectedCount, err := pubcontrol.PubControl.GetPodsForPub(pub)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -319,7 +317,7 @@ func (r *ReconcilePodUnavailableBudget) syncPodUnavailableBudget(pub *policyv1al
// unavailablePods contains information about pods whose specification changed(in-place update), in case of informer cache latency, after 5 seconds to remove it.
var disruptedPods, unavailablePods map[string]metav1.Time
disruptedPods, unavailablePods, recheckTime = r.buildDisruptedAndUnavailablePods(pods, pubClone, currentTime)
currentAvailable := countAvailablePods(pods, disruptedPods, unavailablePods, r.pubControl)
currentAvailable := countAvailablePods(pods, disruptedPods, unavailablePods)

start = time.Now()
updateErr := r.updatePubStatus(pubClone, currentAvailable, desiredAvailable, expectedCount, disruptedPods, unavailablePods)
Expand Down Expand Up @@ -361,7 +359,7 @@ func (r *ReconcilePodUnavailableBudget) patchRelatedPubAnnotationInPod(pub *poli
return nil
}

func countAvailablePods(pods []*corev1.Pod, disruptedPods, unavailablePods map[string]metav1.Time, control pubcontrol.PubControl) (currentAvailable int32) {
func countAvailablePods(pods []*corev1.Pod, disruptedPods, unavailablePods map[string]metav1.Time) (currentAvailable int32) {
recordPods := sets.String{}
for pName := range disruptedPods {
recordPods.Insert(pName)
Expand All @@ -379,7 +377,7 @@ func countAvailablePods(pods []*corev1.Pod, disruptedPods, unavailablePods map[s
continue
}
// pod consistent and ready
if control.IsPodStateConsistent(pod) && control.IsPodReady(pod) {
if pubcontrol.PubControl.IsPodStateConsistent(pod) && pubcontrol.PubControl.IsPodReady(pod) {
currentAvailable++
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/podunavailablebudget/pub_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1150,13 +1150,13 @@ func TestPubReconcile(t *testing.T) {
for _, obj := range cs.getReplicaSet() {
_ = fakeClient.Create(context.TODO(), obj)
}

finder := &controllerfinder.ControllerFinder{Client: fakeClient}
pubcontrol.InitPubControl(fakeClient, finder, record.NewFakeRecorder(10))
controllerfinder.Finder = &controllerfinder.ControllerFinder{Client: fakeClient}
reconciler := ReconcilePodUnavailableBudget{
Client: fakeClient,
recorder: record.NewFakeRecorder(10),
controllerFinder: &controllerfinder.ControllerFinder{Client: fakeClient},
pubControl: pubcontrol.NewPubControl(fakeClient),
}
_, err := reconciler.syncPodUnavailableBudget(pub)
if err != nil {
Expand Down
Loading

0 comments on commit 2774109

Please sign in to comment.