Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
Doris-Extras committed Sep 23, 2024
1 parent 2c6766d commit 81f3e59
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 10 deletions.
3 changes: 3 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ namespace ErrorCode {
E(BAD_CAST, -254, true); \
E(ARITHMETIC_OVERFLOW_ERRROR, -255, false); \
E(PERMISSION_DENIED, -256, false); \
E(QUERY_MEMORY_EXCEED, -257, false); \
E(WORKLOAD_GROUP_MEMORY_EXCEED, -257, false); \
E(PROCESS_MEMORY_EXCEED, -257, false); \
E(CE_CMD_PARAMS_ERROR, -300, true); \
E(CE_BUFFER_TOO_SMALL, -301, true); \
E(CE_CMD_NOT_VALID, -302, true); \
Expand Down
15 changes: 8 additions & 7 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,14 @@ Status PipelineTask::execute(bool* eos) {
if (workload_group && reserve_size > 0) {
auto st = thread_context()->try_reserve_memory(reserve_size);
if (!st.ok()) {
VLOG_DEBUG << "query: " << print_id(query_id)
<< ", try to reserve: " << reserve_size << "(sink reserve size:("
<< sink_reserve_size << " )"
<< ", sink name: " << _sink->get_name()
<< ", node id: " << _sink->node_id() << " failed: " << st.to_string()
<< ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str();
LOG(INFO) << "query: " << print_id(query_id)
<< ", try to reserve: " << reserve_size << "(sink reserve size:("
<< sink_reserve_size << " )"
<< ", sink name: " << _sink->get_name()
<< ", node id: " << _sink->node_id() << " failed: " << st.to_string()
<< ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str();

_state->get_query_ctx()->update_paused_state(st);
_state->get_query_ctx()->set_low_memory_mode();
bool is_high_wartermark = false;
bool is_low_wartermark = false;
Expand All @@ -410,7 +412,6 @@ Status PipelineTask::execute(bool* eos) {
} else {
need_to_pause = is_low_wartermark || is_high_wartermark;
}

if (need_to_pause) {
_memory_sufficient_dependency->block();
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) {
"reserve memory failed, size: {}, because memory tracker consumption: {}, limit: "
"{}",
size, _limiter_tracker->consumption(), _limiter_tracker->limit());
return doris::Status::MemoryLimitExceeded(err_msg);
return doris::Status::Error<QUERY_MEMORY_EXCEED>(err_msg);
}
auto wg_ptr = _wg_wptr.lock();
if (wg_ptr) {
Expand All @@ -299,7 +299,7 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) {
wg_ptr->memory_debug_string());
_limiter_tracker->release(size); // rollback
_limiter_tracker->release_reserved(size); // rollback
return doris::Status::MemoryLimitExceeded(err_msg);
return doris::Status::Error<WORKLOAD_GROUP_MEMORY_EXCEED>(err_msg);
}
}
if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
Expand All @@ -310,7 +310,7 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) {
if (wg_ptr) {
wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
}
return doris::Status::MemoryLimitExceeded(err_msg);
return doris::Status::Error<PROCESS_MEMORY_EXCEED>(err_msg);
}
_reserved_mem += size;
return doris::Status::OK();
Expand Down
28 changes: 28 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,27 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {

bool low_memory_mode() { return _low_memory_mode; }

void update_paused_state(const Status& st) {
std::lock_guard l(_paused_mutex);
if (_paused_error_status.is<ErrorCode::QUERY_MEMORY_EXCEED>) {
return;
} else {
_paused_error_status = st;
}
}

Status paused_status() const { return _paused_error_status; }

void clear_paused_state() {
std::lock_guard l(_paused_mutex);
_paused_error_status = Status::OK();
}

int32_t paused_times() const {
std::lock_guard l(_paused_mutex);
return _paused_times;
}

private:
int _timeout_second;
TUniqueId _query_id;
Expand Down Expand Up @@ -377,6 +398,13 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
// help us manage the query.
QuerySource _query_source;

// If the query is in paused list, then set it to true.
bool _paused = false;
// Record the time that the query is added to paused list.
int32_t _paused_times = 0;
Status _paused_error_status;
std::mutex _paused_mutex;

// when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile
// flatten profile of one fragment:
// Pipeline 0
Expand Down
43 changes: 43 additions & 0 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& que
std::lock_guard<std::mutex> lock(_paused_queries_lock);
DCHECK(query_ctx != nullptr);
auto wg = query_ctx->workload_group();
query_ctx->update_paused_state(true);
auto&& [it, inserted] = _paused_queries_list[wg].emplace(query_ctx);
if (inserted) {
LOG(INFO) << "here insert one new paused query: " << it->query_id()
Expand All @@ -343,6 +344,7 @@ void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& que
*/
void WorkloadGroupMgr::handle_paused_queries() {
std::unique_lock<std::mutex> lock(_paused_queries_lock);
// TODO: should also check if there is spill disk query.
if (_paused_queries_list.empty()) {
if (doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted != 1) {
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = 1;
Expand All @@ -366,6 +368,47 @@ void WorkloadGroupMgr::handle_paused_queries() {

wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);

// If the query is paused because its limit exceed the query itself's memlimit, then just spill disk.
// The query's memlimit is set using slot mechanism and its value is set using the user settings, not
// by weighted value. So if reserve failed, then it is actually exceed limit.
for (auto query_it = queries_list.begin(); query_it != queries_list.end();) {
auto query_ctx = query_it->query_ctx_.lock();
// The query is finished during in paused list.
if (query_ctx == nullptr) {
query_it = queries_list.erase(query_it);
continue;
}
if (query_ctx->paused_status().is<ErrorCode::QUERY_MEMORY_EXCEED>) {
// TODO
// If the query is an insert into select query, should consider memtable as revoke memory.
auto revocable_tasks = max_revocable_query->get_revocable_tasks();
if (revocable_tasks.empty()) {
query_ctx->cancel(doris::Status::Error<QUERY_MEMORY_EXCEED>(
"query reserve memory failed, but could not find memory that could "
"release or spill to disk"));
} else
LOG(INFO) << "query: " << print_id(max_revocable_query->query_id()) << ", has "
<< revocable_tasks.size()
<< " tasks to revoke memory, max revocable size: "
<< max_revocable_size;
SCOPED_ATTACH_TASK(max_revocable_query.get());
// TODO, should spill the task that has max memory, not all
for (auto* task : revocable_tasks) {
auto st = task->revoke_memory();
if (!st.ok()) {
query_ctx->cancel(st);
break;
}
}
}
query_it = queries_list.erase(query_it);
continue;
}

// If the query is added into paused list, too many times, then just spill it.
// If the query already used 50% Of the limit. For join, agg, it will at most double
// the memory of current used.

if (!is_low_wartermark && !is_high_wartermark) {
// TODO: should check if there is a large reserve size in the query's operators
// If it exist, then should find the query and spill it.
Expand Down

0 comments on commit 81f3e59

Please sign in to comment.