Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
Doris-Extras committed Sep 24, 2024
1 parent fafee7c commit 56d00b5
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 49 deletions.
114 changes: 65 additions & 49 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,53 +376,21 @@ void WorkloadGroupMgr::handle_paused_queries() {
query_it = queries_list.erase(query_it);
continue;
}
if (query_ctx->is_cancelled()) {
LOG(INFO) << "query: " << print_id(query_ctx->query_id())
<< "was canceled, remove from paused list";
query_it = queries_list.erase(query_it);
continue;
}
if (query_ctx->paused_status().is<ErrorCode::QUERY_MEMORY_EXCEED>()) {
// TODO
// If the query is an insert into select query, should consider memtable as revoke memory.
if (query_ctx->is_cancelled()) {
LOG(INFO) << "query: " << print_id(query_ctx->query_id())
<< "was canceled, remove from paused list";
query_it = queries_list.erase(query_it);
continue;
}

size_t revocable_size = 0;
size_t memory_usage = 0;
bool has_running_task = false;
query_ctx->get_revocable_info(&revocable_size, &memory_usage, &has_running_task);
if (has_running_task) {
LOG(INFO) << "query: " << print_id(query_ctx->query_id())
<< "is paused, but still has running task, skip it.";
bool spill_res = spill_or_cancel_query(query_ctx, query_ctx->paused_status());
if (!spill_res) {
++query_it;
continue;
}

auto revocable_tasks = query_ctx->get_revocable_tasks();
if (revocable_tasks.empty()) {
// Use MEM_LIMIT_EXCEEDED so that FE could parse the error code and do try logic
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
"query reserve memory failed, but could not find memory that "
"could "
"release or spill to disk"));
} else {
SCOPED_ATTACH_TASK(query_ctx.get());
// TODO, should spill the task that has max memory, not all
for (auto* task : revocable_tasks) {
auto st = task->revoke_memory();
if (!st.ok()) {
query_ctx->cancel(st);
break;
}
}
// The query is removed from paused queue and revoked memory, should clear the state
// to make sure it will be updated next time.
query_ctx->clear_paused_state();
LOG(INFO) << "query: " << print_id(query_ctx->query_id()) << ", has "
<< revocable_tasks.size()
<< " tasks to revoke memory, revocable size: " << revocable_size;
query_it = queries_list.erase(query_it);
continue;
}
query_it = queries_list.erase(query_it);
continue;
} else if (query_ctx->paused_status().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEED>()) {
if (wg->memory_sufficent()) {
wg->update_memory_sufficent(false);
Expand Down Expand Up @@ -459,13 +427,14 @@ void WorkloadGroupMgr::handle_paused_queries() {
"to 0 now";
}
if (query_it->cache_ratio_ < 0.001) {
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
"The query reserved memory failed because process limit exceeded, and "
"there is no cache now. Maybe you should set the workload group's "
"limit to a lower value."));

++query_it;
continue;
bool spill_res = spill_or_cancel_query(query_ctx, query_ctx->paused_status());
if (!spill_res) {
++query_it;
continue;
} else {
query_it = queries_list.erase(query_it);
continue;
}
}
if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
0.001 &&
Expand All @@ -481,6 +450,53 @@ void WorkloadGroupMgr::handle_paused_queries() {
}
}

bool WorkloadGroupMgr::spill_or_cancel_query(std::shared_ptr<QueryContext> query_ctx,
Status paused_reason) {
// TODO: If the query is an insert into select query, should consider memtable as revoke memory.
size_t revocable_size = 0;
size_t memory_usage = 0;
bool has_running_task = false;
query_ctx->get_revocable_info(&revocable_size, &memory_usage, &has_running_task);
if (has_running_task) {
LOG(INFO) << "query: " << print_id(query_ctx->query_id())
<< "is paused, but still has running task, skip it.";
return false;
}

auto revocable_tasks = query_ctx->get_revocable_tasks();
if (revocable_tasks.empty()) {
if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEED>()) {
// Use MEM_LIMIT_EXCEEDED so that FE could parse the error code and do try logic
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
"query reserve memory failed, but could not find memory that "
"could "
"release or spill to disk"));
} else {
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
"The query reserved memory failed because process limit exceeded, and "
"there is no cache now. And could not find task to spill. Maybe you should set "
"the workload group's limit to a lower value."));
}
} else {
SCOPED_ATTACH_TASK(query_ctx.get());
// TODO, should spill the task that has max memory, not all
for (auto* task : revocable_tasks) {
auto st = task->revoke_memory();
if (!st.ok()) {
query_ctx->cancel(st);
break;
}
}
// The query is removed from paused queue and revoked memory, should clear the state
// to make sure it will be updated next time.
query_ctx->clear_paused_state();
LOG(INFO) << "query: " << print_id(query_ctx->query_id()) << ", has "
<< revocable_tasks.size()
<< " tasks to revoke memory, revocable size: " << revocable_size;
}
return true;
}

void WorkloadGroupMgr::stop() {
for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) {
iter->second->try_stop_schedulers();
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/workload_group/workload_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class WorkloadGroupMgr {

void update_load_memtable_usage(const std::map<uint64_t, MemtableUsage>& wg_memtable_usages);

private:
bool spill_or_cancel_query(std::shared_ptr<QueryContext> query_ctx, Status paused_reason);

private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;
Expand Down

0 comments on commit 56d00b5

Please sign in to comment.