From 56d00b5c96b7e56d706068a35c598eabea616c69 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Tue, 24 Sep 2024 10:53:53 +0800 Subject: [PATCH] f --- .../workload_group/workload_group_manager.cpp | 114 ++++++++++-------- .../workload_group/workload_group_manager.h | 3 + 2 files changed, 68 insertions(+), 49 deletions(-) diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 5d800d57ab1c7d8..73aa15b08fa9a69 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -376,53 +376,21 @@ void WorkloadGroupMgr::handle_paused_queries() { query_it = queries_list.erase(query_it); continue; } + if (query_ctx->is_cancelled()) { + LOG(INFO) << "query: " << print_id(query_ctx->query_id()) + << "was canceled, remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } if (query_ctx->paused_status().is()) { - // TODO - // If the query is an insert into select query, should consider memtable as revoke memory. - if (query_ctx->is_cancelled()) { - LOG(INFO) << "query: " << print_id(query_ctx->query_id()) - << "was canceled, remove from paused list"; - query_it = queries_list.erase(query_it); - continue; - } - - size_t revocable_size = 0; - size_t memory_usage = 0; - bool has_running_task = false; - query_ctx->get_revocable_info(&revocable_size, &memory_usage, &has_running_task); - if (has_running_task) { - LOG(INFO) << "query: " << print_id(query_ctx->query_id()) - << "is paused, but still has running task, skip it."; + bool spill_res = spill_or_cancel_query(query_ctx, query_ctx->paused_status()); + if (!spill_res) { ++query_it; continue; - } - - auto revocable_tasks = query_ctx->get_revocable_tasks(); - if (revocable_tasks.empty()) { - // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code and do try logic - query_ctx->cancel(doris::Status::Error( - "query reserve memory failed, but could not find memory that " - "could " - "release or spill to disk")); } else { - SCOPED_ATTACH_TASK(query_ctx.get()); - // TODO, should spill the task that has max memory, not all - for (auto* task : revocable_tasks) { - auto st = task->revoke_memory(); - if (!st.ok()) { - query_ctx->cancel(st); - break; - } - } - // The query is removed from paused queue and revoked memory, should clear the state - // to make sure it will be updated next time. - query_ctx->clear_paused_state(); - LOG(INFO) << "query: " << print_id(query_ctx->query_id()) << ", has " - << revocable_tasks.size() - << " tasks to revoke memory, revocable size: " << revocable_size; + query_it = queries_list.erase(query_it); + continue; } - query_it = queries_list.erase(query_it); - continue; } else if (query_ctx->paused_status().is()) { if (wg->memory_sufficent()) { wg->update_memory_sufficent(false); @@ -459,13 +427,14 @@ void WorkloadGroupMgr::handle_paused_queries() { "to 0 now"; } if (query_it->cache_ratio_ < 0.001) { - query_ctx->cancel(doris::Status::Error( - "The query reserved memory failed because process limit exceeded, and " - "there is no cache now. Maybe you should set the workload group's " - "limit to a lower value.")); - - ++query_it; - continue; + bool spill_res = spill_or_cancel_query(query_ctx, query_ctx->paused_status()); + if (!spill_res) { + ++query_it; + continue; + } else { + query_it = queries_list.erase(query_it); + continue; + } } if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted < 0.001 && @@ -481,6 +450,53 @@ void WorkloadGroupMgr::handle_paused_queries() { } } +bool WorkloadGroupMgr::spill_or_cancel_query(std::shared_ptr query_ctx, + Status paused_reason) { + // TODO: If the query is an insert into select query, should consider memtable as revoke memory. + size_t revocable_size = 0; + size_t memory_usage = 0; + bool has_running_task = false; + query_ctx->get_revocable_info(&revocable_size, &memory_usage, &has_running_task); + if (has_running_task) { + LOG(INFO) << "query: " << print_id(query_ctx->query_id()) + << "is paused, but still has running task, skip it."; + return false; + } + + auto revocable_tasks = query_ctx->get_revocable_tasks(); + if (revocable_tasks.empty()) { + if (paused_reason.is()) { + // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code and do try logic + query_ctx->cancel(doris::Status::Error( + "query reserve memory failed, but could not find memory that " + "could " + "release or spill to disk")); + } else { + query_ctx->cancel(doris::Status::Error( + "The query reserved memory failed because process limit exceeded, and " + "there is no cache now. And could not find task to spill. Maybe you should set " + "the workload group's limit to a lower value.")); + } + } else { + SCOPED_ATTACH_TASK(query_ctx.get()); + // TODO, should spill the task that has max memory, not all + for (auto* task : revocable_tasks) { + auto st = task->revoke_memory(); + if (!st.ok()) { + query_ctx->cancel(st); + break; + } + } + // The query is removed from paused queue and revoked memory, should clear the state + // to make sure it will be updated next time. + query_ctx->clear_paused_state(); + LOG(INFO) << "query: " << print_id(query_ctx->query_id()) << ", has " + << revocable_tasks.size() + << " tasks to revoke memory, revocable size: " << revocable_size; + } + return true; +} + void WorkloadGroupMgr::stop() { for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { iter->second->try_stop_schedulers(); diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index 662e576fca51d82..f84bf3a29ff7daf 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -102,6 +102,9 @@ class WorkloadGroupMgr { void update_load_memtable_usage(const std::map& wg_memtable_usages); +private: + bool spill_or_cancel_query(std::shared_ptr query_ctx, Status paused_reason); + private: std::shared_mutex _group_mutex; std::unordered_map _workload_groups;