Skip to content

Commit

Permalink
add validation for ephemeral job
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
  • Loading branch information
mingzhou.swx committed Aug 3, 2023
1 parent 76f45ac commit a32948b
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 21 deletions.
21 changes: 21 additions & 0 deletions config/webhook/manifests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -727,3 +727,24 @@ webhooks:
resources:
- uniteddeployments
sideEffects: None
- admissionReviewVersions:
- v1
- v1beta1
clientConfig:
service:
name: webhook-service
namespace: system
path: /validate-apps-kruise-io-v1alpha1-ephemeraljob
failurePolicy: Fail
name: vephemeraljob.kb.io
rules:
- apiGroups:
- apps.kruise.io
apiVersions:
- v1alpha1
operations:
- CREATE
- UPDATE
resources:
- ephemeraljobs
sideEffects: None
24 changes: 16 additions & 8 deletions pkg/controller/ephemeraljob/ephemeraljob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
kubecontroller "k8s.io/kubernetes/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -65,8 +66,9 @@ func Add(mgr manager.Manager) error {
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) *ReconcileEphemeralJob {
return &ReconcileEphemeralJob{
Client: utilclient.NewClientFromManager(mgr, "ephemeraljob-controller"),
scheme: mgr.GetScheme(),
Client: utilclient.NewClientFromManager(mgr, "ephemeraljob-controller"),
scheme: mgr.GetScheme(),
eventRecorder: mgr.GetEventRecorderFor("ephemeraljob-controller"),
}
}

Expand Down Expand Up @@ -98,7 +100,8 @@ var _ reconcile.Reconciler = &ReconcileEphemeralJob{}
// ReconcileEphemeralJob reconciles a ImagePullJob object
type ReconcileEphemeralJob struct {
client.Client
scheme *runtime.Scheme
scheme *runtime.Scheme
eventRecorder record.EventRecorder
}

// +kubebuilder:rbac:groups=apps.kruise.io,resources=ephemeraljobs,verbs=get;list;watch;update;patch;delete
Expand Down Expand Up @@ -345,12 +348,15 @@ func (r *ReconcileEphemeralJob) syncTargetPods(job *appsv1alpha1.EphemeralJob, t
scaleExpectations.ExpectScale(key, expectations.Create, podEphemeralContainerName)
}
if err := control.CreateEphemeralContainer(pod); err != nil {
r.eventRecorder.Event(job, v1.EventTypeWarning, "CreateFailed",
fmt.Sprintf("Failed to create ephemeral container for pod %s: %v", pod.Name, err))
for _, podEphemeralContainerName := range getPodEphemeralContainers(pod, job) {
scaleExpectations.ObserveScale(key, expectations.Create, podEphemeralContainerName)
}
return err
}

r.eventRecorder.Event(job, v1.EventTypeNormal, "CreateSuccessfully",
fmt.Sprintf("create ephemeral container for pod %s successfully", pod.Name))
return nil
})

Expand Down Expand Up @@ -440,12 +446,14 @@ func (r *ReconcileEphemeralJob) removeEphemeralContainers(job *appsv1alpha1.Ephe
return err
}

var errors error
control := econtainer.New(job)
for _, pod := range targetPods {
if e := econtainer.New(job).RemoveEphemeralContainer(pod); e != nil {
errors = e
if err = control.RemoveEphemeralContainer(pod); err != nil {
r.eventRecorder.Event(job, v1.EventTypeWarning, "RemoveFailed",
fmt.Sprintf("Failed to remove ephemeral container for pod %s: %v", pod.Name, err))
return err
}
}

return errors
return nil
}
14 changes: 9 additions & 5 deletions pkg/util/inplaceupdate/inplace_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (

var (
containerImagePatchRexp = regexp.MustCompile("^/spec/containers/([0-9]+)/image$")
containerCpuPatchRexp = regexp.MustCompile("^/spec/containers/([0-9]+)/resources/cpu/.*$")
containerMemPatchRexp = regexp.MustCompile("^/spec/containers/([0-9]+)/resources/memory/.*$")
rfc6901Decoder = strings.NewReplacer("~1", "/", "~0", "~")

Clock clock.Clock = clock.RealClock{}
Expand Down Expand Up @@ -79,11 +81,13 @@ type Interface interface {
type UpdateSpec struct {
Revision string `json:"revision"`

ContainerImages map[string]string `json:"containerImages,omitempty"`
ContainerRefMetadata map[string]metav1.ObjectMeta `json:"containerRefMetadata,omitempty"`
MetaDataPatch []byte `json:"metaDataPatch,omitempty"`
UpdateEnvFromMetadata bool `json:"updateEnvFromMetadata,omitempty"`
GraceSeconds int32 `json:"graceSeconds,omitempty"`
ContainerImages map[string]string `json:"containerImages,omitempty"`
ContainerRefMetadata map[string]metav1.ObjectMeta `json:"containerRefMetadata,omitempty"`
ContainerResourceCpu map[string]v1.ResourceRequirements `json:"containerResourceCpu,omitempty"`
ContainerResourceMem map[string]v1.ResourceRequirements `json:"containerResourceMem,omitempty"`
MetaDataPatch []byte `json:"metaDataPatch,omitempty"`
UpdateEnvFromMetadata bool `json:"updateEnvFromMetadata,omitempty"`
GraceSeconds int32 `json:"graceSeconds,omitempty"`

OldTemplate *v1.PodTemplateSpec `json:"oldTemplate,omitempty"`
NewTemplate *v1.PodTemplateSpec `json:"newTemplate,omitempty"`
Expand Down
25 changes: 17 additions & 8 deletions pkg/util/inplaceupdate/inplace_update_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func defaultCalculateInPlaceUpdateSpec(oldRevision, newRevision *apps.Controller
Revision: newRevision.Name,
ContainerImages: make(map[string]string),
ContainerRefMetadata: make(map[string]metav1.ObjectMeta),
ContainerResources: make(map[string]v1.ResourceRequirements),

Check failure on line 250 in pkg/util/inplaceupdate/inplace_update_defaults.go

View workflow job for this annotation

GitHub Actions / advanced-daemonset

unknown field 'ContainerResources' in struct literal of type UpdateSpec

Check failure on line 250 in pkg/util/inplaceupdate/inplace_update_defaults.go

View workflow job for this annotation

GitHub Actions / ephemeraljob

unknown field 'ContainerResources' in struct literal of type UpdateSpec

Check failure on line 250 in pkg/util/inplaceupdate/inplace_update_defaults.go

View workflow job for this annotation

GitHub Actions / astatefulset

unknown field 'ContainerResources' in struct literal of type UpdateSpec

Check failure on line 250 in pkg/util/inplaceupdate/inplace_update_defaults.go

View workflow job for this annotation

GitHub Actions / golangci-lint

unknown field `ContainerResources` in struct literal (typecheck)

Check failure on line 250 in pkg/util/inplaceupdate/inplace_update_defaults.go

View workflow job for this annotation

GitHub Actions / pullimages-containerrecreate

unknown field 'ContainerResources' in struct literal of type UpdateSpec

Check failure on line 250 in pkg/util/inplaceupdate/inplace_update_defaults.go

View workflow job for this annotation

GitHub Actions / other

unknown field 'ContainerResources' in struct literal of type UpdateSpec

Check failure on line 250 in pkg/util/inplaceupdate/inplace_update_defaults.go

View workflow job for this annotation

GitHub Actions / unit-tests

unknown field 'ContainerResources' in struct literal of type UpdateSpec

Check failure on line 250 in pkg/util/inplaceupdate/inplace_update_defaults.go

View workflow job for this annotation

GitHub Actions / sidecarset

unknown field 'ContainerResources' in struct literal of type UpdateSpec

Check failure on line 250 in pkg/util/inplaceupdate/inplace_update_defaults.go

View workflow job for this annotation

GitHub Actions / podUnavailableBudget

unknown field 'ContainerResources' in struct literal of type UpdateSpec

Check failure on line 250 in pkg/util/inplaceupdate/inplace_update_defaults.go

View workflow job for this annotation

GitHub Actions / ephemeraljob

unknown field 'ContainerResources' in struct literal of type UpdateSpec

Check failure on line 250 in pkg/util/inplaceupdate/inplace_update_defaults.go

View workflow job for this annotation

GitHub Actions / other

unknown field 'ContainerResources' in struct literal of type UpdateSpec

Check failure on line 250 in pkg/util/inplaceupdate/inplace_update_defaults.go

View workflow job for this annotation

GitHub Actions / podUnavailableBudget

unknown field 'ContainerResources' in struct literal of type UpdateSpec
GraceSeconds: opts.GracePeriodSeconds,
}
if opts.GetRevision != nil {
Expand All @@ -265,16 +266,24 @@ func defaultCalculateInPlaceUpdateSpec(oldRevision, newRevision *apps.Controller
}
return nil
}
if op.Operation != "replace" || !containerImagePatchRexp.MatchString(op.Path) {
return nil
if op.Operation == "replace" && containerImagePatchRexp.MatchString(op.Path) {
// for example: /spec/containers/0/image
words := strings.Split(op.Path, "/")
idx, _ := strconv.Atoi(words[3])
if len(oldTemp.Spec.Containers) <= idx {
return nil
}
updateSpec.ContainerImages[oldTemp.Spec.Containers[idx].Name] = op.Value.(string)
}
// for example: /spec/containers/0/image
words := strings.Split(op.Path, "/")
idx, _ := strconv.Atoi(words[3])
if len(oldTemp.Spec.Containers) <= idx {
return nil
if op.Operation == "replace" && containerCpuPatchRexp.MatchString(op.Path) {
// for example: /spec/containers/0/image
words := strings.Split(op.Path, "/")
idx, _ := strconv.Atoi(words[3])
if len(oldTemp.Spec.Containers) <= idx {
return nil
}
updateSpec.ContainerImages[oldTemp.Spec.Containers[idx].Name] = op.Value.(string)
}
updateSpec.ContainerImages[oldTemp.Spec.Containers[idx].Name] = op.Value.(string)
}

if len(metadataPatches) > 0 {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
Copyright 2023 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package validating

import (
"context"
"net/http"
"unsafe"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/apis/core"
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/kubernetes/pkg/apis/core/validation"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

// EphemeralJobCreateUpdateHandler handles ImagePullJob
type EphemeralJobCreateUpdateHandler struct {
// Decoder decodes objects
Decoder *admission.Decoder
}

var _ admission.Handler = &EphemeralJobCreateUpdateHandler{}

// Handle handles admission requests.
func (h *EphemeralJobCreateUpdateHandler) Handle(ctx context.Context, req admission.Request) admission.Response {
job := &appsv1alpha1.EphemeralJob{}

err := h.Decoder.Decode(req, job)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}

if err := validateEphemeralJobSpec(&job.Spec, field.NewPath("spec")); err != nil {
klog.Warningf("Error validate EphemeralJob %s/%s: %v", job.Namespace, job.Name, err)
return admission.Errored(http.StatusBadRequest, err.ToAggregate())
}

return admission.ValidationResponse(true, "allowed")
}

func validateEphemeralJobSpec(spec *appsv1alpha1.EphemeralJobSpec, path *field.Path) field.ErrorList {
var allErrs field.ErrorList
if spec.Selector != nil {
if spec.Selector.MatchLabels != nil || spec.Selector.MatchExpressions != nil {
if _, err := metav1.LabelSelectorAsSelector(spec.Selector); err != nil {
allErrs = append(allErrs, field.Invalid(path.Child("selector"), spec.Selector, err.Error()))
}
}
}

var ephemeralContainers []v1.EphemeralContainer
ecPath := path.Child("template").Child("ephemeralContainers")
for index, ec := range spec.Template.EphemeralContainers {
idxPath := ecPath.Index(index)

// VolumeMount subpaths have the potential to leak resources since they're implemented with bind mounts
// that aren't cleaned up until the pod exits. Since they also imply that the container is being used
// as part of the workload, they're disallowed entirely.
for i, vm := range ec.VolumeMounts {
if vm.SubPath != "" {
allErrs = append(allErrs, field.Forbidden(idxPath.Child("volumeMounts").Index(i).Child("subPath"), "cannot be set for an Ephemeral Container"))
}
if vm.SubPathExpr != "" {
allErrs = append(allErrs, field.Forbidden(idxPath.Child("volumeMounts").Index(i).Child("subPathExpr"), "cannot be set for an Ephemeral Container"))
}
}

// VolumeMount cannot be validated further by ValidatePodEphemeralContainersUpdate method because we do not know the volumes and target container of target Pods
ec.VolumeMounts, ec.TargetContainerName = nil, ""
corev1.SetDefaults_Container((*v1.Container)(&ec.EphemeralContainerCommon))
ephemeralContainers = append(ephemeralContainers, ec)
}

// validateEphemeralContainers is a private method in k8s validation package, so we have to use ValidatePodEphemeralContainersUpdate
// to validate fields of the fields of ephemeral containers
mockedNewPod := &core.Pod{Spec: core.PodSpec{EphemeralContainers: *(*[]core.EphemeralContainer)(unsafe.Pointer(&ephemeralContainers))}}
mockedOldPod := &core.Pod{Spec: core.PodSpec{EphemeralContainers: *(*[]core.EphemeralContainer)(unsafe.Pointer(&ephemeralContainers))}}
return append(allErrs, validation.ValidatePodEphemeralContainersUpdate(mockedNewPod, mockedOldPod, validation.PodValidationOptions{})...)
}
Loading

0 comments on commit a32948b

Please sign in to comment.