Skip to content

Commit

Permalink
add load buffer limit for load buffer management
Browse files Browse the repository at this point in the history
  • Loading branch information
Doris-Extras committed Sep 23, 2024
1 parent b2fa3e6 commit ce3edfe
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 17 deletions.
2 changes: 2 additions & 0 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
if (!topic_info.__isset.workload_group_info) {
continue;
}
LOG(INFO) << "Received publish workload group info request: "
<< apache::thrift::ThriftDebugString(topic_info).c_str();
is_set_workload_group_info = true;

// 1 parse topic info to group info
Expand Down
29 changes: 21 additions & 8 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
_spill_low_watermark(tg_info.spill_low_watermark),
_spill_high_watermark(tg_info.spill_high_watermark),
_scan_bytes_per_second(tg_info.read_bytes_per_second),
_remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second) {
_remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second),
_total_query_slot_count(tg_info.total_query_slot_count) {
std::vector<DataDirInfo>& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list;
for (const auto& data_dir : data_dir_list) {
_scan_io_throttle_map[data_dir.path] =
Expand All @@ -82,21 +83,33 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
std::make_unique<bvar::Adder<size_t>>(_name, "total_local_read_bytes");
_total_local_scan_io_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>(
_name, "total_local_read_bytes_per_second", _total_local_scan_io_adder.get(), 1);
_load_buffer_limit = (int64_t)(_memory_limit * 0.2);
}

std::string WorkloadGroup::debug_string() const {
std::shared_lock<std::shared_mutex> rl {_mutex};
auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load();
auto mem_used_ratio = realtime_total_mem_used / ((double)_weighted_memory_limit + 1);
return fmt::format(
"TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = "
"{}, version = {}, cpu_hard_limit = {}, scan_thread_num = "
"WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, "
"total_query_slot_count={}, "
"memory_limit = {}, "
"enable_memory_overcommit = {}, weighted_memory_limit = {}, total_mem_used = {},"
"wg_refresh_interval_memory_growth = {}, mem_used_ratio = {}, spill_low_watermark = "
"{}, spill_high_watermark = {},cpu_hard_limit = {}, scan_thread_num = "
"{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = {}, "
"spill_low_watermark={}, spill_high_watermark={}, is_shutdown={}, query_num={}, "
"is_shutdown={}, query_num={}, "
"read_bytes_per_second={}, remote_read_bytes_per_second={}]",
_id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES),
_enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit(),
_id, _name, _version, cpu_share(), _total_query_slot_count,
PrettyPrinter::print(_memory_limit, TUnit::BYTES),
_enable_memory_overcommit ? "true" : "false",
PrettyPrinter::print(_weighted_memory_limit.load(), TUnit::BYTES),
PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), TUnit::BYTES),
mem_used_ratio, _spill_low_watermark, _spill_high_watermark, cpu_hard_limit(),
_scan_thread_num, _max_remote_scan_thread_num, _min_remote_scan_thread_num,
_spill_low_watermark, _spill_high_watermark, _is_shutdown, _query_ctxs.size(),
_scan_bytes_per_second, _remote_scan_bytes_per_second);
_is_shutdown, _query_ctxs.size(), _scan_bytes_per_second,
_remote_scan_bytes_per_second);
}

std::string WorkloadGroup::memory_debug_string() const {
Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,18 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
}
int64_t get_remote_scan_bytes_per_second();

int64_t load_buffer_limit() { return _load_buffer_limit; }

private:
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit
const uint64_t _id;
std::string _name;
int64_t _version;
int64_t _memory_limit; // bytes
// For example, load memtable, write to parquet.
// If the wg's memory reached high water mark, then the load buffer
// will be restricted to this limit.
int64_t _load_buffer_limit;

// memory used by load memtable
int64_t _active_mem_usage = 0;
Expand Down
13 changes: 6 additions & 7 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
if (all_workload_groups_mem_usage < process_memory_usage) {
int64_t public_memory = process_memory_usage - all_workload_groups_mem_usage;
weighted_memory_limit_ratio = 1 - (double)public_memory / (double)process_memory_limit;
// Round the value from 1% to 100%.
weighted_memory_limit_ratio = std::floor(weighted_memory_limit_ratio * 100) / 100;
}

std::string debug_msg = fmt::format(
Expand Down Expand Up @@ -407,8 +409,6 @@ void WorkloadGroupMgr::handle_paused_queries() {
MemTableMemoryLimiter* memtable_limiter =
doris::ExecEnv::GetInstance()->memtable_memory_limiter();
// Not use memlimit, should use high water mark.
int64_t wg_high_water_mark_limit =
wg->memory_limit() * wg->spill_threshold_high_water_mark() / 100;
int64_t memtable_active_bytes = 0;
int64_t memtable_write_bytes = 0;
int64_t memtable_flush_bytes = 0;
Expand All @@ -418,21 +418,20 @@ void WorkloadGroupMgr::handle_paused_queries() {
// For example, streamload, it will not reserve many memory, but it will occupy many memtable memory.
// TODO: 0.2 should be a workload group properties. For example, the group is optimized for load,then the value
// should be larged, if the group is optimized for query, then the value should be smaller.
int64_t max_wg_memtable_bytes =
std::max<int64_t>(100 * 1024 * 1024, (int64_t)(wg_high_water_mark_limit * 0.2));
int64_t max_wg_memtable_bytes = wg->load_buffer_limit();
if (memtable_active_bytes + memtable_write_bytes + memtable_flush_bytes >
max_wg_memtable_bytes) {
// There are many table in flush queue, just waiting them flush finished.
if (memtable_write_bytes + memtable_flush_bytes > max_wg_memtable_bytes / 2) {
if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes * 0.8)) {
LOG_EVERY_T(INFO, 60)
<< wg->name() << " load memtable size is: " << memtable_active_bytes << ", "
<< memtable_write_bytes << ", " << memtable_flush_bytes
<< ", wait for flush finished to release more memory";
continue;
} else {
// Flush 50% active bytes, it means flush some memtables(currently written) to flush queue.
// Flush some memtables(currently written) to flush queue.
memtable_limiter->flush_workload_group_memtables(
wg->id(), (int64_t)(memtable_active_bytes * 0.5));
wg->id(), memtable_active_bytes - (int64_t)(max_wg_memtable_bytes * 0.8));
LOG_EVERY_T(INFO, 60)
<< wg->name() << " load memtable size is: " << memtable_active_bytes << ", "
<< memtable_write_bytes << ", " << memtable_flush_bytes
Expand Down
5 changes: 3 additions & 2 deletions be/src/util/mem_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,9 @@ void MemInfo::refresh_proc_meminfo() {
_s_cgroup_mem_usage = cgroup_mem_usage;
// wait 10s, 100 * 100ms, avoid too frequently.
_s_cgroup_mem_refresh_wait_times = -100;
LOG(INFO) << "Refresh cgroup memory win, refresh again after 10s, cgroup mem limit: "
<< _s_cgroup_mem_limit << ", cgroup mem usage: " << _s_cgroup_mem_usage;
LOG(INFO)
<< "Refresh cgroup memory success, refresh again after 10s, cgroup mem limit: "
<< _s_cgroup_mem_limit << ", cgroup mem usage: " << _s_cgroup_mem_usage;
} else {
// find cgroup failed, wait 300s, 1000 * 100ms.
_s_cgroup_mem_refresh_wait_times = -3000;
Expand Down

0 comments on commit ce3edfe

Please sign in to comment.