Skip to content

Commit

Permalink
support stream load
Browse files Browse the repository at this point in the history
  • Loading branch information
Doris-Extras committed Sep 26, 2024
1 parent 41bb04b commit 5d4e213
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 57 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ DEFINE_String(mem_limit, "90%");
DEFINE_Double(soft_mem_limit_frac, "0.9");

// Cache capacity reduce mem limit as a fraction of soft mem limit.
DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.6");
DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.7");

// Schema change memory limit as a fraction of soft memory limit.
DEFINE_Double(schema_change_mem_limit_frac, "0.6");
Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/memory/global_memory_arbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ std::atomic<double> GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted
std::atomic<double> GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted {1};
// The value that take affect
std::atomic<double> GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted {1};
std::atomic<bool> GlobalMemoryArbitrator::any_workload_group_exceed_limit {false};
std::mutex GlobalMemoryArbitrator::memtable_memory_refresh_lock;
std::condition_variable GlobalMemoryArbitrator::memtable_memory_refresh_cv;
std::atomic<bool> GlobalMemoryArbitrator::memtable_memory_refresh_notify {false};

bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) {
if (sys_mem_available() - bytes < MemInfo::sys_mem_available_warning_water_mark()) {
if (sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) {
return false;
}
int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed);
Expand All @@ -59,7 +60,7 @@ bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) {
if (UNLIKELY(vm_rss_sub_allocator_cache() +
refresh_interval_memory_growth.load(std::memory_order_relaxed) +
new_reserved_mem >=
MemInfo::soft_mem_limit())) {
MemInfo::mem_limit())) {
return false;
}
} while (!_s_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem,
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/memory/global_memory_arbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class GlobalMemoryArbitrator {
static std::atomic<double> last_wg_trigger_cache_capacity_adjust_weighted;
// The value that take affect
static std::atomic<double> last_affected_cache_capacity_adjust_weighted;
static std::atomic<bool> any_workload_group_exceed_limit;

static void notify_cache_adjust_capacity() {
cache_adjust_capacity_notify.store(true, std::memory_order_relaxed);
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ void QueryContext::_init_query_mem_tracker() {
<< " OR is -1. Using process memory limit instead.";
bytes_limit = MemInfo::mem_limit();
}
// If the query is a pure load task(streamload, routine load, group commit), then it should not use
// memlimit per query to limit their memory usage.
if (is_pure_load_task()) {
bytes_limit = MemInfo::mem_limit();
}
if (_query_options.query_type == TQueryType::SELECT) {
query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}", print_id(_query_id)),
Expand Down
7 changes: 7 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
bool is_low_wartermark = false;
bool is_high_wartermark = false;
_workload_group->check_mem_used(&is_low_wartermark, &is_high_wartermark);
// If the wg is not enable hard limit, this will also take effect to lower down the memory usage.
if (is_high_wartermark) {
LOG(INFO)
<< "Query " << print_id(_query_id)
Expand Down Expand Up @@ -349,6 +350,12 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
return _paused_reason;
}

bool is_pure_load_task() {
return _query_source == QuerySource::STREAM_LOAD ||
_query_source == QuerySource::ROUTINE_LOAD ||
_query_source == QuerySource::GROUP_COMMIT_LOAD;
}

private:
int _timeout_second;
TUniqueId _query_id;
Expand Down
22 changes: 22 additions & 0 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,28 @@ std::string WorkloadGroup::debug_string() const {
_remote_scan_bytes_per_second);
}

bool WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) {
// If a group is enable memory overcommit, then not need check the limit
// It is always true, and it will only fail when process memory is not
// enough.
if (_enable_memory_overcommit) {
if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(size)) {
return false;
} else {
return true;
}
}
auto realtime_total_mem_used =
_total_mem_used + _wg_refresh_interval_memory_growth.load() + size;
if ((realtime_total_mem_used >
((double)_memory_limit * _spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
return false;
} else {
_wg_refresh_interval_memory_growth.fetch_add(size);
return true;
}
}

std::string WorkloadGroup::memory_debug_string() const {
auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load();
auto mem_used_ratio = realtime_total_mem_used / ((double)_weighted_memory_limit + 1);
Expand Down
30 changes: 8 additions & 22 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
return _memory_limit;
};

int64_t weighted_memory_limit() const { return _weighted_memory_limit; };

void set_weighted_memory_limit(int64_t weighted_memory_limit) {
_weighted_memory_limit = weighted_memory_limit;
}
Expand All @@ -103,24 +101,7 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
return _total_query_slot_count.load(std::memory_order_relaxed);
}

bool add_wg_refresh_interval_memory_growth(int64_t size) {
// If a group is enable memory overcommit, then not need check the limit
// It is always true, and it will only fail when process memory is not
// enough.
if (_enable_memory_overcommit) {
return true;
}
auto realtime_total_mem_used =
_total_mem_used + _wg_refresh_interval_memory_growth.load() + size;
if ((realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
return false;
} else {
_wg_refresh_interval_memory_growth.fetch_add(size);
return true;
}
}
bool add_wg_refresh_interval_memory_growth(int64_t size);

void sub_wg_refresh_interval_memory_growth(int64_t size) {
_wg_refresh_interval_memory_growth.fetch_sub(size);
Expand All @@ -129,10 +110,10 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
void check_mem_used(bool* is_low_wartermark, bool* is_high_wartermark) const {
auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load();
*is_low_wartermark = (realtime_total_mem_used >
((double)_weighted_memory_limit *
((double)_memory_limit *
_spill_low_watermark.load(std::memory_order_relaxed) / 100));
*is_high_wartermark = (realtime_total_mem_used >
((double)_weighted_memory_limit *
((double)_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100));
}

Expand Down Expand Up @@ -166,6 +147,11 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
return _memory_limit > 0;
}

bool exceed_limit() {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _memory_limit > 0 ? _total_mem_used > _memory_limit : false;
}

Status add_query(TUniqueId query_id, std::shared_ptr<QueryContext> query_ctx) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
if (_is_shutdown) {
Expand Down
Loading

0 comments on commit 5d4e213

Please sign in to comment.