From 43a7871d1cfc94b392c339470926d78d44e24a23 Mon Sep 17 00:00:00 2001 From: Craig Condit Date: Wed, 26 Jan 2022 09:29:10 -0600 Subject: [PATCH] [YUNIKORN-1041] Fix binding of dynamic volumes to pod (#359) Closes: #359 --- pkg/cache/context.go | 56 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index ee3ae2eec..e857128eb 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/controller/volume/scheduling" "github.com/apache/incubator-yunikorn-k8shim/pkg/appmgmt/interfaces" schedulercache "github.com/apache/incubator-yunikorn-k8shim/pkg/cache/external" @@ -356,19 +357,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error { zap.String("podName", pod.Name)) } else { log.Logger().Info("Binding Pod Volumes", zap.String("podName", pod.Name)) - boundClaims, claimsToBind, _, err := ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod) + boundClaims, claimsToBind, unboundClaimsImmediate, err := ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod) if err != nil { + log.Logger().Error("Failed to get pod volumes", + zap.String("podName", assumedPod.Name), + zap.Error(err)) + return err + } + if len(unboundClaimsImmediate) > 0 { + err = fmt.Errorf("pod %s has unbound immediate claims", pod.Name) + log.Logger().Error("Pod has unbound immediate claims", + zap.String("podName", assumedPod.Name), + zap.Error(err)) return err } node, err := ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName) if err != nil { + log.Logger().Error("Failed to get node info", + zap.String("podName", assumedPod.Name), + zap.String("nodeName", assumedPod.Spec.NodeName), + zap.Error(err)) + return err + } + volumes, reasons, err := ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, claimsToBind, node) + if err != nil { + log.Logger().Error("Failed to find pod volumes", + zap.String("podName", assumedPod.Name), + zap.String("nodeName", assumedPod.Spec.NodeName), + zap.Int("claimsToBind", len(claimsToBind)), + zap.Error(err)) return err } - volumes, _, err := ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, claimsToBind, node) + if len(reasons) > 0 { + sReasons := make([]string, 0) + for _, reason := range reasons { + sReasons = append(sReasons, string(reason)) + } + sReason := strings.Join(sReasons, ", ") + err = fmt.Errorf("pod %s has conflicting volume claims: %s", pod.Name, sReason) + log.Logger().Error("Pod has conflicting volume claims", + zap.String("podName", assumedPod.Name), + zap.String("nodeName", assumedPod.Spec.NodeName), + zap.Int("claimsToBind", len(claimsToBind)), + zap.Error(err)) + return err + } + if volumes.StaticBindings == nil { + // convert nil to empty array + volumes.StaticBindings = make([]*scheduling.BindingInfo, 0) + } + if volumes.DynamicProvisions == nil { + // convert nil to empty array + volumes.DynamicProvisions = make([]*v1.PersistentVolumeClaim, 0) + } + err = ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(assumedPod, volumes) if err != nil { + log.Logger().Error("Failed to bind pod volumes", + zap.String("podName", assumedPod.Name), + zap.String("nodeName", assumedPod.Spec.NodeName), + zap.Int("dynamicProvisions", len(volumes.DynamicProvisions)), + zap.Int("staticBindings", len(volumes.StaticBindings))) return err } - return ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(assumedPod, volumes) } } return nil