From 594ddbc4c433ad68368f35e698a1172d38f70fa9 Mon Sep 17 00:00:00 2001 From: Peter Bacsko Date: Tue, 5 Mar 2024 16:44:55 -0600 Subject: [PATCH] [YUNIKORN-2467] Remove AllocationAsk from the core when a pod is completed (#797) Closes: #797 Signed-off-by: Craig Condit --- pkg/cache/task.go | 2 +- pkg/cache/task_test.go | 2 +- pkg/common/si_helper.go | 13 +++++++++++-- pkg/common/si_helper_test.go | 9 ++++++--- pkg/shim/scheduler_mock_test.go | 5 +++++ pkg/shim/scheduler_test.go | 3 +++ 6 files changed, 27 insertions(+), 7 deletions(-) diff --git a/pkg/cache/task.go b/pkg/cache/task.go index 223fd6999..3a41157d6 100644 --- a/pkg/cache/task.go +++ b/pkg/cache/task.go @@ -535,7 +535,7 @@ func (task *Task) releaseAllocation() { return } releaseRequest = common.CreateReleaseAllocationRequestForTask( - task.applicationID, task.allocationID, task.application.partition, task.terminationType) + task.applicationID, task.taskID, task.allocationID, task.application.partition, task.terminationType) } if releaseRequest.Releases != nil { diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go index d31339c99..1f36c7b4a 100644 --- a/pkg/cache/task_test.go +++ b/pkg/cache/task_test.go @@ -643,7 +643,7 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) { PartitionName: "default", } mockedAPIProvider.MockSchedulerAPIUpdateAllocationFn(func(request *si.AllocationRequest) error { - assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 0, + assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1, "allocationAskToRelease is not in the expected length") assert.Equal(t, len(request.Releases.AllocationsToRelease), 1, "allocationsToRelease is not in the expected length") diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go index 8af265eb7..4434a82bd 100644 --- a/pkg/common/si_helper.go +++ b/pkg/common/si_helper.go @@ -142,7 +142,7 @@ func GetTerminationTypeFromString(terminationTypeStr string) si.TerminationType return si.TerminationType_STOPPED_BY_RM } -func CreateReleaseAllocationRequestForTask(appID, allocationID, partition, terminationType string) *si.AllocationRequest { +func CreateReleaseAllocationRequestForTask(appID, taskID, allocationID, partition, terminationType string) *si.AllocationRequest { toReleases := make([]*si.AllocationRelease, 0) toReleases = append(toReleases, &si.AllocationRelease{ ApplicationID: appID, @@ -152,8 +152,17 @@ func CreateReleaseAllocationRequestForTask(appID, allocationID, partition, termi Message: "task completed", }) + toReleaseAsk := make([]*si.AllocationAskRelease, 1) + toReleaseAsk[0] = &si.AllocationAskRelease{ + ApplicationID: appID, + AllocationKey: taskID, + PartitionName: partition, + Message: "task request completed", + } + releaseRequest := si.AllocationReleasesRequest{ - AllocationsToRelease: toReleases, + AllocationsToRelease: toReleases, + AllocationAsksToRelease: toReleaseAsk, } return &si.AllocationRequest{ diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go index 251cdd975..f3a5fcf64 100644 --- a/pkg/common/si_helper_test.go +++ b/pkg/common/si_helper_test.go @@ -33,15 +33,18 @@ import ( const nodeID = "node-01" func TestCreateReleaseAllocationRequest(t *testing.T) { - request := CreateReleaseAllocationRequestForTask("app01", "alloc01", "default", "STOPPED_BY_RM") + request := CreateReleaseAllocationRequestForTask("app01", "task01", "alloc01", "default", "STOPPED_BY_RM") assert.Assert(t, request.Releases != nil) assert.Assert(t, request.Releases.AllocationsToRelease != nil) - assert.Assert(t, request.Releases.AllocationAsksToRelease == nil) + assert.Assert(t, request.Releases.AllocationAsksToRelease != nil) assert.Equal(t, len(request.Releases.AllocationsToRelease), 1) - assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 0) + assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1) assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID, "app01") assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationID, "alloc01") assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName, "default") + assert.Equal(t, request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01") + assert.Equal(t, request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01") + assert.Equal(t, request.Releases.AllocationAsksToRelease[0].PartitionName, "default") } func TestCreateReleaseAskRequestForTask(t *testing.T) { diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go index dd12a93de..9b13ee7c1 100644 --- a/pkg/shim/scheduler_mock_test.go +++ b/pkg/shim/scheduler_mock_test.go @@ -32,6 +32,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/apache/yunikorn-core/pkg/entrypoint" + "github.com/apache/yunikorn-core/pkg/scheduler/objects" "github.com/apache/yunikorn-k8shim/pkg/cache" "github.com/apache/yunikorn-k8shim/pkg/client" "github.com/apache/yunikorn-k8shim/pkg/common" @@ -322,6 +323,10 @@ func (fc *MockScheduler) waitForApplicationStateInCore(appID, partition, expecte }, time.Second, 5*time.Second) } +func (fc *MockScheduler) getApplicationFromCore(appID, partition string) *objects.Application { + return fc.coreContext.Scheduler.GetClusterContext().GetApplication(appID, partition) +} + func (fc *MockScheduler) GetPodBindStats() client.BindStats { return fc.apiProvider.GetPodBindStats() } diff --git a/pkg/shim/scheduler_test.go b/pkg/shim/scheduler_test.go index 38fe981fc..84bb52ba2 100644 --- a/pkg/shim/scheduler_test.go +++ b/pkg/shim/scheduler_test.go @@ -104,6 +104,9 @@ partitions: cluster.waitAndAssertTaskState(t, "app0001", "task0002", cache.TaskStates().Completed) err = cluster.waitForApplicationStateInCore("app0001", partitionName, "Completing") assert.NilError(t, err) + app := cluster.getApplicationFromCore("app0001", partitionName) + assert.Equal(t, 0, len(app.GetAllRequests()), "asks were not removed from the application") + assert.Equal(t, 0, len(app.GetAllAllocations()), "allocations were not removed from the application") } func TestRejectApplications(t *testing.T) {