From 033ee87a8cb5e37c7fa629dcfd683ecca24d50ca Mon Sep 17 00:00:00 2001 From: yiguolei Date: Wed, 25 Sep 2024 19:28:32 +0800 Subject: [PATCH] make overcommit workload group reserve using hard limit --- be/src/common/config.cpp | 2 +- .../memory/global_memory_arbitrator.cpp | 5 ++-- .../runtime/memory/global_memory_arbitrator.h | 1 + be/src/runtime/query_context.h | 1 + .../runtime/workload_group/workload_group.h | 21 ++++++++++------ .../workload_group/workload_group_manager.cpp | 25 ++++++++++++++++--- .../workload_group/workload_group_manager.h | 3 +++ 7 files changed, 45 insertions(+), 13 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 78e68c9de649be4..7e9ff8d88011f4b 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -96,7 +96,7 @@ DEFINE_String(mem_limit, "90%"); DEFINE_Double(soft_mem_limit_frac, "0.9"); // Cache capacity reduce mem limit as a fraction of soft mem limit. -DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.6"); +DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.7"); // Schema change memory limit as a fraction of soft memory limit. DEFINE_Double(schema_change_mem_limit_frac, "0.6"); diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp b/be/src/runtime/memory/global_memory_arbitrator.cpp index 0c774187ff3edd6..a6d2edd542ac4eb 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.cpp +++ b/be/src/runtime/memory/global_memory_arbitrator.cpp @@ -44,12 +44,13 @@ std::atomic GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted std::atomic GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted {1}; // The value that take affect std::atomic GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted {1}; +std::atomic GlobalMemoryArbitrator::any_workload_group_exceed_limit {false}; std::mutex GlobalMemoryArbitrator::memtable_memory_refresh_lock; std::condition_variable GlobalMemoryArbitrator::memtable_memory_refresh_cv; std::atomic GlobalMemoryArbitrator::memtable_memory_refresh_notify {false}; bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) { - if (sys_mem_available() - bytes < MemInfo::sys_mem_available_warning_water_mark()) { + if (sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) { return false; } int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed); @@ -59,7 +60,7 @@ bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) { if (UNLIKELY(vm_rss_sub_allocator_cache() + refresh_interval_memory_growth.load(std::memory_order_relaxed) + new_reserved_mem >= - MemInfo::soft_mem_limit())) { + MemInfo::mem_limit())) { return false; } } while (!_s_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem, diff --git a/be/src/runtime/memory/global_memory_arbitrator.h b/be/src/runtime/memory/global_memory_arbitrator.h index 468d442b6627e5e..e2b55c8aa98fae7 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.h +++ b/be/src/runtime/memory/global_memory_arbitrator.h @@ -182,6 +182,7 @@ class GlobalMemoryArbitrator { static std::atomic last_wg_trigger_cache_capacity_adjust_weighted; // The value that take affect static std::atomic last_affected_cache_capacity_adjust_weighted; + static std::atomic any_workload_group_exceed_limit; static void notify_cache_adjust_capacity() { cache_adjust_capacity_notify.store(true, std::memory_order_relaxed); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 96b18eedfb9da45..731832be34a3029 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -300,6 +300,7 @@ class QueryContext : public std::enable_shared_from_this { bool is_low_wartermark = false; bool is_high_wartermark = false; _workload_group->check_mem_used(&is_low_wartermark, &is_high_wartermark); + // If the wg is not enable hard limit, this will also take effect to lower down the memory usage. if (is_high_wartermark) { LOG(INFO) << "Query " << print_id(_query_id) diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 1d640afee1d0f51..c09a45ab4919602 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -78,8 +78,6 @@ class WorkloadGroup : public std::enable_shared_from_this { return _memory_limit; }; - int64_t weighted_memory_limit() const { return _weighted_memory_limit; }; - void set_weighted_memory_limit(int64_t weighted_memory_limit) { _weighted_memory_limit = weighted_memory_limit; } @@ -108,13 +106,17 @@ class WorkloadGroup : public std::enable_shared_from_this { // It is always true, and it will only fail when process memory is not // enough. if (_enable_memory_overcommit) { - return true; + if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(size)) { + return false; + } else { + return true; + } } auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load() + size; if ((realtime_total_mem_used > - ((double)_weighted_memory_limit * - _spill_high_watermark.load(std::memory_order_relaxed) / 100))) { + ((double)_memory_limit * _spill_high_watermark.load(std::memory_order_relaxed) / + 100))) { return false; } else { _wg_refresh_interval_memory_growth.fetch_add(size); @@ -129,10 +131,10 @@ class WorkloadGroup : public std::enable_shared_from_this { void check_mem_used(bool* is_low_wartermark, bool* is_high_wartermark) const { auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load(); *is_low_wartermark = (realtime_total_mem_used > - ((double)_weighted_memory_limit * + ((double)_memory_limit * _spill_low_watermark.load(std::memory_order_relaxed) / 100)); *is_high_wartermark = (realtime_total_mem_used > - ((double)_weighted_memory_limit * + ((double)_memory_limit * _spill_high_watermark.load(std::memory_order_relaxed) / 100)); } @@ -166,6 +168,11 @@ class WorkloadGroup : public std::enable_shared_from_this { return _memory_limit > 0; } + bool exceed_limit() { + std::shared_lock r_lock(_mutex); + return _memory_limit > 0 ? _total_mem_used > _memory_limit : false; + } + Status add_query(TUniqueId query_id, std::shared_ptr query_ctx) { std::unique_lock wlock(_mutex); if (_is_shutdown) { diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 290d1fe1d5b4b79..9f47a9e4d492d3d 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -34,9 +34,11 @@ namespace doris { -PausedQuery::PausedQuery(std::shared_ptr query_ctx, double cache_ratio) +PausedQuery::PausedQuery(std::shared_ptr query_ctx, double cache_ratio, + bool any_wg_exceed_limit) : query_ctx_(query_ctx), cache_ratio_(cache_ratio), + any_wg_exceed_limit_(any_wg_exceed_limit), query_id_(print_id(query_ctx->query_id())) { enqueue_at = std::chrono::system_clock::now(); } @@ -155,11 +157,16 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { // and calculate total memory used of all queries. int64_t all_workload_groups_mem_usage = 0; std::unordered_map wgs_mem_info; + bool has_wg_exceed_limit = false; for (auto& [wg_id, wg] : _workload_groups) { wgs_mem_info[wg_id].total_mem_used = wg->make_memory_tracker_snapshots(&wgs_mem_info[wg_id].tracker_snapshots); all_workload_groups_mem_usage += wgs_mem_info[wg_id].total_mem_used; + if (wg->exceed_limit()) { + has_wg_exceed_limit = true; + } } + doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit = has_wg_exceed_limit; if (all_workload_groups_mem_usage <= 0) { return; } @@ -333,13 +340,19 @@ void WorkloadGroupMgr::add_paused_query(const std::shared_ptr& que DCHECK(query_ctx != nullptr); auto wg = query_ctx->workload_group(); auto&& [it, inserted] = _paused_queries_list[wg].emplace( - query_ctx, doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted); + query_ctx, doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted, + doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit); if (inserted) { LOG(INFO) << "workload group " << wg->debug_string() << " insert one new paused query: " << it->query_id(); } } +void WorkloadGroupMgr::handle_paused_queries() { + handle_non_overcommit_wg_paused_queries(); + handle_overcommit_wg_paused_queries(); +} + /** * Strategy 1: A revocable query should not have any running task(PipelineTask). * strategy 2: If the workload group has any task exceed workload group memlimit, then set all queryctx's memlimit @@ -347,7 +360,7 @@ void WorkloadGroupMgr::add_paused_query(const std::shared_ptr& que * strategy 4: If any query exceed query's memlimit, then do spill disk or cancel it. * strategy 5: If any query exceed process's memlimit and cache is zero, then do spill disk or cancel it. */ -void WorkloadGroupMgr::handle_paused_queries() { +void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { std::unique_lock lock(_paused_queries_lock); for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { auto& queries_list = it->second; @@ -410,6 +423,8 @@ void WorkloadGroupMgr::handle_paused_queries() { } continue; } else { + // PROCESS Reserve logic using hard limit, if reached here, should try to spill or cancel. + // GC Logic also work at hard limit, so GC may cancel some query and could not spill here. // If wg's memlimit not exceed, but process memory exceed, it means cache or other metadata // used too much memory. Should clean all cache here. // 1. Check cache used, if cache is larger than > 0, then just return and wait for it to 0 to release some memory. @@ -494,6 +509,10 @@ bool WorkloadGroupMgr::spill_or_cancel_query(std::shared_ptr query return true; } +bool WorkloadGroupMgr::cancel_largest_query() { + std::shared_lock r_lock(_group_mutex); +} + void WorkloadGroupMgr::stop() { for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { iter->second->try_stop_schedulers(); diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index f84bf3a29ff7daf..d05858d3eae6611 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -45,6 +45,7 @@ class PausedQuery { std::chrono::system_clock::time_point enqueue_at; size_t last_mem_usage {0}; double cache_ratio_ {0.0}; + bool any_wg_exceed_limit_ {false}; PausedQuery(std::shared_ptr query_ctx, double cache_ratio); @@ -104,6 +105,8 @@ class WorkloadGroupMgr { private: bool spill_or_cancel_query(std::shared_ptr query_ctx, Status paused_reason); + void handle_non_overcommit_wg_paused_queries(); + void handle_overcommit_wg_paused_queries(); private: std::shared_mutex _group_mutex;