Skip to content

Commit

Permalink
[YUNIKORN-1190] Account for usage of pods without applicationId (#413)
Browse files Browse the repository at this point in the history
In plugin mode pods could be scheduled by the default scheduler and not
accuonted for in the node usage. This happens for pods without an
application ID but with the scheduler set to YuniKorn

Closes: #413

Signed-off-by: Wilfred Spiegelenburg <wilfreds@apache.org>
  • Loading branch information
craigcondit authored and wilfred-s committed Apr 27, 2022
1 parent d728f56 commit f77c610
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 8 deletions.
6 changes: 5 additions & 1 deletion pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,11 @@ func (ctx *Context) updatePodInCache(oldObj, newObj interface{}) {
func (ctx *Context) filterPods(obj interface{}) bool {
switch obj := obj.(type) {
case *v1.Pod:
return utils.GeneralPodFilter(obj)
if utils.GeneralPodFilter(obj) {
_, err := utils.GetApplicationIDFromPod(obj)
return err == nil
}
return false
default:
return false
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/cache/context_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ func (ctx *Context) recover(mgr []interfaces.Recoverable, due time.Duration) err
continue
}
// yunikorn scheduled pods add to existing allocations
if utils.GeneralPodFilter(&pod) {
_, err = utils.GetApplicationIDFromPod(&pod)
ykPod := utils.GeneralPodFilter(&pod) && err == nil
switch {
case ykPod:
if existingAlloc := getExistingAllocation(mgr, &pod); existingAlloc != nil {
log.Logger().Debug("Adding resources for existing pod",
zap.String("appID", existingAlloc.ApplicationID),
Expand All @@ -123,7 +126,7 @@ func (ctx *Context) recover(mgr []interfaces.Recoverable, due time.Duration) err
zap.String("nodeName", pod.Spec.NodeName),
zap.Stringer("resources", common.GetPodResource(&pod)))
}
} else if !utils.IsPodTerminated(&pod) {
case !utils.IsPodTerminated(&pod):
// pod is not terminated (succeed or failed) state,
// and it has a node assigned, that means the scheduler
// has already allocated the pod onto a node
Expand All @@ -141,7 +144,7 @@ func (ctx *Context) recover(mgr []interfaces.Recoverable, due time.Duration) err
occupiedResource = common.Add(occupiedResource, podResource)
nodeOccupiedResources[pod.Spec.NodeName] = occupiedResource
ctx.nodes.cache.AddPod(&pod)
} else {
default:
log.Logger().Debug("Skipping terminated pod",
zap.String("podUID", string(pod.UID)),
zap.String("podName", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)))
Expand Down
15 changes: 14 additions & 1 deletion pkg/cache/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,22 @@ func TestFilterPods(t *testing.T) {
},
Spec: v1.PodSpec{SchedulerName: "default-scheduler"},
}
pod3 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "yunikorn-test-00003",
UID: "UID-00003",
Labels: map[string]string{"applicationId": "test-00003"},
},
Spec: v1.PodSpec{SchedulerName: "yunikorn"},
}
assert.Check(t, !context.filterPods(nil), "nil object was allowed")
assert.Check(t, context.filterPods(pod1), "yunikorn-managed pod was filtered")
assert.Check(t, !context.filterPods(pod1), "yunikorn-managed pod with no app id was allowed")
assert.Check(t, !context.filterPods(pod2), "non-yunikorn-managed pod was allowed")
assert.Check(t, context.filterPods(pod3), "yunikorn-managed pod was filtered")
}

func TestAddPodToCache(t *testing.T) {
Expand Down
9 changes: 6 additions & 3 deletions pkg/cache/node_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@ func newNodeResourceCoordinator(nodes *schedulerNodes) *nodeResourceCoordinator

// filter pods that not scheduled by us
func (c *nodeResourceCoordinator) filterPods(obj interface{}) bool {
switch obj.(type) {
switch obj := obj.(type) {
case *v1.Pod:
pod := obj.(*v1.Pod)
return !utils.GeneralPodFilter(pod)
if utils.GeneralPodFilter(obj) {
_, err := utils.GetApplicationIDFromPod(obj)
return err != nil
}
return true
default:
return false
}
Expand Down
48 changes: 48 additions & 0 deletions pkg/cache/node_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"gotest.tools/assert"
v1 "k8s.io/api/core/v1"
apis "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
Expand Down Expand Up @@ -295,3 +296,50 @@ func TestDeleteTerminatedPod(t *testing.T) {
coordinator.deletePod(pod2)
assert.Equal(t, executed, false)
}

func TestNodeCoordinatorFilterPods(t *testing.T) {
mockedSchedulerAPI := newMockSchedulerAPI()
nodes := newSchedulerNodes(mockedSchedulerAPI, NewTestSchedulerCache())
host1 := utils.NodeForTest(Host1, "10G", "10")
nodes.addNode(host1)
coordinator := newNodeResourceCoordinator(nodes)

pod1 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "yunikorn-test-00001",
UID: "UID-00001",
},
Spec: v1.PodSpec{SchedulerName: "yunikorn"},
}
pod2 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "yunikorn-test-00002",
UID: "UID-00002",
},
Spec: v1.PodSpec{SchedulerName: "default-scheduler"},
}
pod3 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "yunikorn-test-00003",
UID: "UID-00003",
Labels: map[string]string{"applicationId": "test-00003"},
},
Spec: v1.PodSpec{SchedulerName: "yunikorn"},
}
assert.Check(t, !coordinator.filterPods(nil), "nil object was allowed")
assert.Check(t, coordinator.filterPods(pod1), "yunikorn-managed pod with no app id was filtered")
assert.Check(t, coordinator.filterPods(pod2), "non-yunikorn-managed pod was filtered")
assert.Check(t, !coordinator.filterPods(pod3), "yunikorn-managed pod was allowed")
}

0 comments on commit f77c610

Please sign in to comment.