Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add prometheus for pub and deletion protection #1398

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
extclient "github.com/openkruise/kruise/pkg/client"
"github.com/openkruise/kruise/pkg/control/pubcontrol"
"github.com/openkruise/kruise/pkg/controller"
"github.com/openkruise/kruise/pkg/features"
"github.com/openkruise/kruise/pkg/util"
Expand Down Expand Up @@ -188,6 +189,7 @@ func main() {
setupLog.Error(err, "unable to start ControllerFinder")
os.Exit(1)
}
pubcontrol.InitPubControl(mgr.GetClient(), controllerfinder.Finder, mgr.GetEventRecorderFor("pub-controller"))

setupLog.Info("register field index")
if err := fieldindex.RegisterFieldIndexes(mgr.GetCache()); err != nil {
Expand Down
17 changes: 13 additions & 4 deletions pkg/control/pubcontrol/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ import (
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type PubControl interface {
var PubControl pubControl
var recorder record.EventRecorder
var kclient client.Client

type pubControl interface {
// IsPodReady indicates whether pod is fully ready
// 1. pod.Status.Phase == v1.PodRunning
// 2. pod.condition PodReady == true
Expand All @@ -41,9 +47,12 @@ type PubControl interface {
IsPodUnavailableChanged(oldPod, newPod *corev1.Pod) bool
// get pub for pod
GetPubForPod(pod *corev1.Pod) (*policyv1alpha1.PodUnavailableBudget, error)
// get pod controller of
GetPodControllerOf(pod *corev1.Pod) *metav1.OwnerReference
}

func NewPubControl(client client.Client) PubControl {
controllerFinder := controllerfinder.Finder
return &commonControl{controllerFinder: controllerFinder, Client: client}
func InitPubControl(cli client.Client, finder *controllerfinder.ControllerFinder, rec record.EventRecorder) {
recorder = rec
kclient = cli
PubControl = &commonControl{controllerFinder: finder, Client: cli}
}
6 changes: 6 additions & 0 deletions pkg/control/pubcontrol/pub_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
kubecontroller "k8s.io/kubernetes/pkg/controller"
Expand All @@ -43,6 +44,7 @@ type commonControl struct {
}

func (c *commonControl) IsPodReady(pod *corev1.Pod) bool {
klog.Infof("IsPodReady pod(%s)", util.DumpJSON(pod))
// 1. pod.Status.Phase == v1.PodRunning
// 2. pod.condition PodReady == true
if !util.IsRunningAndReady(pod) {
Expand Down Expand Up @@ -168,6 +170,10 @@ func (c *commonControl) GetPubForPod(pod *corev1.Pod) (*policyv1alpha1.PodUnavai
return pub, nil
}

func (c *commonControl) GetPodControllerOf(pod *corev1.Pod) *metav1.OwnerReference {
return metav1.GetControllerOf(pod)
}

func getSidecarSetsInPod(pod *corev1.Pod) (sidecarSets, containers sets.String) {
containers = sets.NewString()
sidecarSets = sets.NewString()
Expand Down
38 changes: 38 additions & 0 deletions pkg/control/pubcontrol/pub_control_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright 2023 The Kruise Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pubcontrol

import (
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

var (
PodUnavailableBudgetMetrics = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "pod_unavailable_budget",
Help: "Pod Unavailable Budget Metrics",
// kind = CloneSet, Deployment, StatefulSet, etc.
// name = workload.name, if pod don't have workload, then name is pod.name
// username = client useragent
}, []string{"kind_namespace_name", "username"},
)
)

func init() {
metrics.Registry.MustRegister(PodUnavailableBudgetMetrics)
}
29 changes: 21 additions & 8 deletions pkg/control/pubcontrol/pub_control_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
Expand All @@ -58,21 +57,21 @@ const (
// parameters:
// 1. allowed(bool) indicates whether to allow this update operation
// 2. err(error)
func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, pod *corev1.Pod, operation policyv1alpha1.PubOperation, dryRun bool) (allowed bool, reason string, err error) {
func PodUnavailableBudgetValidatePod(pod *corev1.Pod, operation policyv1alpha1.PubOperation, username string, dryRun bool) (allowed bool, reason string, err error) {
klog.V(3).Infof("validating pod(%s/%s) operation(%s) for PodUnavailableBudget", pod.Namespace, pod.Name, operation)
// pods that contain annotations[pod.kruise.io/pub-no-protect]="true" will be ignore
// and will no longer check the pub quota
if pod.Annotations[policyv1alpha1.PodPubNoProtectionAnnotation] == "true" {
klog.V(3).Infof("pod(%s/%s) contains annotations[%s]=true, then don't need check pub", pod.Namespace, pod.Name, policyv1alpha1.PodPubNoProtectionAnnotation)
return true, "", nil
// If the pod is not ready, it doesn't count towards healthy and we should not decrement
} else if !control.IsPodReady(pod) {
} else if !PubControl.IsPodReady(pod) {
klog.V(3).Infof("pod(%s/%s) is not ready, then don't need check pub", pod.Namespace, pod.Name)
return true, "", nil
}

// pub for pod
pub, err := control.GetPubForPod(pod)
pub, err := PubControl.GetPubForPod(pod)
if err != nil {
return false, "", err
// if there is no matching PodUnavailableBudget, just return true
Expand Down Expand Up @@ -119,7 +118,7 @@ func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, p
}

informerCached := &policyv1alpha1.PodUnavailableBudget{}
if err := client.Get(context.TODO(), types.NamespacedName{Namespace: pub.Namespace,
if err := kclient.Get(context.TODO(), types.NamespacedName{Namespace: pub.Namespace,
Name: pub.Name}, informerCached); err == nil {
var localRV, informerRV int64
_ = runtime.Convert_string_To_int64(&pubClone.ResourceVersion, &localRV, nil)
Expand All @@ -133,8 +132,22 @@ func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, p

// Try to verify-and-decrement
// If it was false already, or if it becomes false during the course of our retries,
err := checkAndDecrement(pod.Name, pubClone, operation)
err = checkAndDecrement(pod.Name, pubClone, operation)
if err != nil {
var kind, namespace, name string
if ref := PubControl.GetPodControllerOf(pod); ref != nil {
kind = ref.Kind
name = ref.Name
} else {
kind = "unknown"
name = pod.Name
}
namespace = pod.Namespace
if namespace == "" {
namespace = "default"
}
PodUnavailableBudgetMetrics.WithLabelValues(fmt.Sprintf("%s_%s_%s", kind, namespace, name), username).Add(1)
recorder.Eventf(pod, corev1.EventTypeWarning, "PubPreventPodDeletion", "openkruise pub prevents pod deletion")
return err
}

Expand All @@ -147,15 +160,15 @@ func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, p
pubClone.Namespace, pubClone.Name, len(pubClone.Status.DisruptedPods), len(pubClone.Status.UnavailablePods),
pubClone.Status.TotalReplicas, pubClone.Status.DesiredAvailable, pubClone.Status.CurrentAvailable, pubClone.Status.UnavailableAllowed)
start = time.Now()
err = client.Status().Update(context.TODO(), pubClone)
err = kclient.Status().Update(context.TODO(), pubClone)
costOfUpdate += time.Since(start)
if err == nil {
if err = util.GlobalCache.Add(pubClone); err != nil {
klog.Errorf("Add cache failed for PodUnavailableBudget(%s/%s): %s", pub.Namespace, pub.Name, err.Error())
}
return nil
}
// if conflict, then retry
// if conflicts, then retry
conflictTimes++
refresh = true
return err
Expand Down
15 changes: 9 additions & 6 deletions pkg/control/pubcontrol/pub_control_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ import (
"testing"
"time"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"

appspub "github.com/openkruise/kruise/apis/apps/pub"
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/record"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
utilpointer "k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -278,8 +279,9 @@ func TestPodUnavailableBudgetValidatePod(t *testing.T) {
for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cs.getPub()).Build()
control := NewPubControl(fakeClient)
allow, _, err := PodUnavailableBudgetValidatePod(fakeClient, control, cs.getPod(), cs.operation, false)
finder := &controllerfinder.ControllerFinder{Client: fakeClient}
InitPubControl(fakeClient, finder, record.NewFakeRecorder(10))
allow, _, err := PodUnavailableBudgetValidatePod(cs.getPod(), cs.operation, "fake-user", false)
if err != nil {
t.Fatalf("PodUnavailableBudgetValidatePod failed: %s", err.Error())
}
Expand Down Expand Up @@ -385,9 +387,10 @@ func TestGetPodUnavailableBudgetForPod(t *testing.T) {
for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cs.getDeployment(), cs.getReplicaSet(), cs.getPub()).Build()
control := NewPubControl(fakeClient)
finder := &controllerfinder.ControllerFinder{Client: fakeClient}
InitPubControl(fakeClient, finder, record.NewFakeRecorder(10))
pod := cs.getPod()
pub, err := control.GetPubForPod(pod)
pub, err := PubControl.GetPubForPod(pod)
if err != nil {
t.Fatalf("GetPubForPod failed: %s", err.Error())
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/controller/cloneset/sync/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package sync

import (
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/control/pubcontrol"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
Expand Down Expand Up @@ -49,7 +48,6 @@ type realControl struct {
inplaceControl inplaceupdate.Interface
recorder record.EventRecorder
controllerFinder *controllerfinder.ControllerFinder
pubControl pubcontrol.PubControl
}

func New(c client.Client, recorder record.EventRecorder) Interface {
Expand All @@ -59,6 +57,5 @@ func New(c client.Client, recorder record.EventRecorder) Interface {
lifecycleControl: lifecycle.New(c),
recorder: recorder,
controllerFinder: controllerfinder.Finder,
pubControl: pubcontrol.NewPubControl(c),
}
}
2 changes: 1 addition & 1 deletion pkg/controller/cloneset/sync/cloneset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (c *realControl) Update(cs *appsv1alpha1.CloneSet,
pod := pods[idx]
// Determine the pub before updating the pod
if utilfeature.DefaultFeatureGate.Enabled(features.PodUnavailableBudgetUpdateGate) {
allowed, _, err := pubcontrol.PodUnavailableBudgetValidatePod(c.Client, c.pubControl, pod, policyv1alpha1.PubUpdateOperation, false)
allowed, _, err := pubcontrol.PodUnavailableBudgetValidatePod(pod, policyv1alpha1.PubUpdateOperation, "kruise-manager", false)
if err != nil {
return err
// pub check does not pass, try again in seconds
Expand Down
2 changes: 0 additions & 2 deletions pkg/controller/cloneset/sync/cloneset_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/openkruise/kruise/apis"
appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/control/pubcontrol"
clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/features"
Expand Down Expand Up @@ -959,7 +958,6 @@ func TestUpdate(t *testing.T) {
inplaceupdate.New(fakeClient, clonesetutils.RevisionAdapterImpl),
record.NewFakeRecorder(10),
&controllerfinder.ControllerFinder{Client: fakeClient},
pubcontrol.NewPubControl(fakeClient),
}
currentRevision := mc.updateRevision
if len(mc.revisions) > 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
Scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor("podunavailablebudget-controller"),
controllerFinder: controllerfinder.Finder,
pubControl: pubcontrol.NewPubControl(mgr.GetClient()),
}
}

Expand Down Expand Up @@ -203,7 +202,6 @@ type ReconcilePodUnavailableBudget struct {
Scheme *runtime.Scheme
recorder record.EventRecorder
controllerFinder *controllerfinder.ControllerFinder
pubControl pubcontrol.PubControl
}

// +kubebuilder:rbac:groups=policy.kruise.io,resources=podunavailablebudgets,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -251,7 +249,7 @@ func (r *ReconcilePodUnavailableBudget) Reconcile(_ context.Context, req ctrl.Re

func (r *ReconcilePodUnavailableBudget) syncPodUnavailableBudget(pub *policyv1alpha1.PodUnavailableBudget) (*time.Time, error) {
currentTime := time.Now()
pods, expectedCount, err := r.pubControl.GetPodsForPub(pub)
pods, expectedCount, err := pubcontrol.PubControl.GetPodsForPub(pub)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -319,7 +317,7 @@ func (r *ReconcilePodUnavailableBudget) syncPodUnavailableBudget(pub *policyv1al
// unavailablePods contains information about pods whose specification changed(in-place update), in case of informer cache latency, after 5 seconds to remove it.
var disruptedPods, unavailablePods map[string]metav1.Time
disruptedPods, unavailablePods, recheckTime = r.buildDisruptedAndUnavailablePods(pods, pubClone, currentTime)
currentAvailable := countAvailablePods(pods, disruptedPods, unavailablePods, r.pubControl)
currentAvailable := countAvailablePods(pods, disruptedPods, unavailablePods)

start = time.Now()
updateErr := r.updatePubStatus(pubClone, currentAvailable, desiredAvailable, expectedCount, disruptedPods, unavailablePods)
Expand Down Expand Up @@ -361,7 +359,7 @@ func (r *ReconcilePodUnavailableBudget) patchRelatedPubAnnotationInPod(pub *poli
return nil
}

func countAvailablePods(pods []*corev1.Pod, disruptedPods, unavailablePods map[string]metav1.Time, control pubcontrol.PubControl) (currentAvailable int32) {
func countAvailablePods(pods []*corev1.Pod, disruptedPods, unavailablePods map[string]metav1.Time) (currentAvailable int32) {
recordPods := sets.String{}
for pName := range disruptedPods {
recordPods.Insert(pName)
Expand All @@ -379,7 +377,7 @@ func countAvailablePods(pods []*corev1.Pod, disruptedPods, unavailablePods map[s
continue
}
// pod consistent and ready
if control.IsPodStateConsistent(pod) && control.IsPodReady(pod) {
if pubcontrol.PubControl.IsPodStateConsistent(pod) && pubcontrol.PubControl.IsPodReady(pod) {
currentAvailable++
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/podunavailablebudget/pub_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1150,13 +1150,13 @@ func TestPubReconcile(t *testing.T) {
for _, obj := range cs.getReplicaSet() {
_ = fakeClient.Create(context.TODO(), obj)
}

finder := &controllerfinder.ControllerFinder{Client: fakeClient}
pubcontrol.InitPubControl(fakeClient, finder, record.NewFakeRecorder(10))
controllerfinder.Finder = &controllerfinder.ControllerFinder{Client: fakeClient}
reconciler := ReconcilePodUnavailableBudget{
Client: fakeClient,
recorder: record.NewFakeRecorder(10),
controllerFinder: &controllerfinder.ControllerFinder{Client: fakeClient},
pubControl: pubcontrol.NewPubControl(fakeClient),
}
_, err := reconciler.syncPodUnavailableBudget(pub)
if err != nil {
Expand Down
Loading
Loading