Skip to content

Commit

Permalink
make overcommit workload group reserve using hard limit
Browse files Browse the repository at this point in the history
  • Loading branch information
Doris-Extras committed Sep 25, 2024
1 parent 476b9bb commit 033ee87
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 13 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ DEFINE_String(mem_limit, "90%");
DEFINE_Double(soft_mem_limit_frac, "0.9");

// Cache capacity reduce mem limit as a fraction of soft mem limit.
DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.6");
DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.7");

// Schema change memory limit as a fraction of soft memory limit.
DEFINE_Double(schema_change_mem_limit_frac, "0.6");
Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/memory/global_memory_arbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ std::atomic<double> GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted
std::atomic<double> GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted {1};
// The value that take affect
std::atomic<double> GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted {1};
std::atomic<bool> GlobalMemoryArbitrator::any_workload_group_exceed_limit {false};
std::mutex GlobalMemoryArbitrator::memtable_memory_refresh_lock;
std::condition_variable GlobalMemoryArbitrator::memtable_memory_refresh_cv;
std::atomic<bool> GlobalMemoryArbitrator::memtable_memory_refresh_notify {false};

bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) {
if (sys_mem_available() - bytes < MemInfo::sys_mem_available_warning_water_mark()) {
if (sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) {
return false;
}
int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed);
Expand All @@ -59,7 +60,7 @@ bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) {
if (UNLIKELY(vm_rss_sub_allocator_cache() +
refresh_interval_memory_growth.load(std::memory_order_relaxed) +
new_reserved_mem >=
MemInfo::soft_mem_limit())) {
MemInfo::mem_limit())) {
return false;
}
} while (!_s_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem,
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/memory/global_memory_arbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class GlobalMemoryArbitrator {
static std::atomic<double> last_wg_trigger_cache_capacity_adjust_weighted;
// The value that take affect
static std::atomic<double> last_affected_cache_capacity_adjust_weighted;
static std::atomic<bool> any_workload_group_exceed_limit;

static void notify_cache_adjust_capacity() {
cache_adjust_capacity_notify.store(true, std::memory_order_relaxed);
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
bool is_low_wartermark = false;
bool is_high_wartermark = false;
_workload_group->check_mem_used(&is_low_wartermark, &is_high_wartermark);
// If the wg is not enable hard limit, this will also take effect to lower down the memory usage.
if (is_high_wartermark) {
LOG(INFO)
<< "Query " << print_id(_query_id)
Expand Down
21 changes: 14 additions & 7 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
return _memory_limit;
};

int64_t weighted_memory_limit() const { return _weighted_memory_limit; };

void set_weighted_memory_limit(int64_t weighted_memory_limit) {
_weighted_memory_limit = weighted_memory_limit;
}
Expand Down Expand Up @@ -108,13 +106,17 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
// It is always true, and it will only fail when process memory is not
// enough.
if (_enable_memory_overcommit) {
return true;
if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(size)) {
return false;
} else {
return true;
}
}
auto realtime_total_mem_used =
_total_mem_used + _wg_refresh_interval_memory_growth.load() + size;
if ((realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
((double)_memory_limit * _spill_high_watermark.load(std::memory_order_relaxed) /
100))) {
return false;
} else {
_wg_refresh_interval_memory_growth.fetch_add(size);
Expand All @@ -129,10 +131,10 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
void check_mem_used(bool* is_low_wartermark, bool* is_high_wartermark) const {
auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load();
*is_low_wartermark = (realtime_total_mem_used >
((double)_weighted_memory_limit *
((double)_memory_limit *
_spill_low_watermark.load(std::memory_order_relaxed) / 100));
*is_high_wartermark = (realtime_total_mem_used >
((double)_weighted_memory_limit *
((double)_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100));
}

Expand Down Expand Up @@ -166,6 +168,11 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
return _memory_limit > 0;
}

bool exceed_limit() {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _memory_limit > 0 ? _total_mem_used > _memory_limit : false;
}

Status add_query(TUniqueId query_id, std::shared_ptr<QueryContext> query_ctx) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
if (_is_shutdown) {
Expand Down
25 changes: 22 additions & 3 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@

namespace doris {

PausedQuery::PausedQuery(std::shared_ptr<QueryContext> query_ctx, double cache_ratio)
PausedQuery::PausedQuery(std::shared_ptr<QueryContext> query_ctx, double cache_ratio,
bool any_wg_exceed_limit)
: query_ctx_(query_ctx),
cache_ratio_(cache_ratio),
any_wg_exceed_limit_(any_wg_exceed_limit),
query_id_(print_id(query_ctx->query_id())) {
enqueue_at = std::chrono::system_clock::now();
}
Expand Down Expand Up @@ -155,11 +157,16 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
// and calculate total memory used of all queries.
int64_t all_workload_groups_mem_usage = 0;
std::unordered_map<uint64_t, WorkloadGroupMemInfo> wgs_mem_info;
bool has_wg_exceed_limit = false;
for (auto& [wg_id, wg] : _workload_groups) {
wgs_mem_info[wg_id].total_mem_used =
wg->make_memory_tracker_snapshots(&wgs_mem_info[wg_id].tracker_snapshots);
all_workload_groups_mem_usage += wgs_mem_info[wg_id].total_mem_used;
if (wg->exceed_limit()) {
has_wg_exceed_limit = true;
}
}
doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit = has_wg_exceed_limit;
if (all_workload_groups_mem_usage <= 0) {
return;
}
Expand Down Expand Up @@ -333,21 +340,27 @@ void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& que
DCHECK(query_ctx != nullptr);
auto wg = query_ctx->workload_group();
auto&& [it, inserted] = _paused_queries_list[wg].emplace(
query_ctx, doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted);
query_ctx, doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit);
if (inserted) {
LOG(INFO) << "workload group " << wg->debug_string()
<< " insert one new paused query: " << it->query_id();
}
}

void WorkloadGroupMgr::handle_paused_queries() {
handle_non_overcommit_wg_paused_queries();
handle_overcommit_wg_paused_queries();
}

/**
* Strategy 1: A revocable query should not have any running task(PipelineTask).
* strategy 2: If the workload group has any task exceed workload group memlimit, then set all queryctx's memlimit
* strategy 3: If any query exceed process memlimit, then should clear all caches.
* strategy 4: If any query exceed query's memlimit, then do spill disk or cancel it.
* strategy 5: If any query exceed process's memlimit and cache is zero, then do spill disk or cancel it.
*/
void WorkloadGroupMgr::handle_paused_queries() {
void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
std::unique_lock<std::mutex> lock(_paused_queries_lock);
for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) {
auto& queries_list = it->second;
Expand Down Expand Up @@ -410,6 +423,8 @@ void WorkloadGroupMgr::handle_paused_queries() {
}
continue;
} else {
// PROCESS Reserve logic using hard limit, if reached here, should try to spill or cancel.
// GC Logic also work at hard limit, so GC may cancel some query and could not spill here.
// If wg's memlimit not exceed, but process memory exceed, it means cache or other metadata
// used too much memory. Should clean all cache here.
// 1. Check cache used, if cache is larger than > 0, then just return and wait for it to 0 to release some memory.
Expand Down Expand Up @@ -494,6 +509,10 @@ bool WorkloadGroupMgr::spill_or_cancel_query(std::shared_ptr<QueryContext> query
return true;
}

bool WorkloadGroupMgr::cancel_largest_query() {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
}

void WorkloadGroupMgr::stop() {
for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) {
iter->second->try_stop_schedulers();
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/workload_group/workload_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class PausedQuery {
std::chrono::system_clock::time_point enqueue_at;
size_t last_mem_usage {0};
double cache_ratio_ {0.0};
bool any_wg_exceed_limit_ {false};

PausedQuery(std::shared_ptr<QueryContext> query_ctx, double cache_ratio);

Expand Down Expand Up @@ -104,6 +105,8 @@ class WorkloadGroupMgr {

private:
bool spill_or_cancel_query(std::shared_ptr<QueryContext> query_ctx, Status paused_reason);
void handle_non_overcommit_wg_paused_queries();
void handle_overcommit_wg_paused_queries();

private:
std::shared_mutex _group_mutex;
Expand Down

0 comments on commit 033ee87

Please sign in to comment.