Skip to content

Commit

Permalink
fix query limit reserve bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Doris-Extras committed Sep 24, 2024
1 parent 8847eb6 commit 0dfc1db
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 36 deletions.
4 changes: 2 additions & 2 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ namespace ErrorCode {
E(ARITHMETIC_OVERFLOW_ERRROR, -255, false); \
E(PERMISSION_DENIED, -256, false); \
E(QUERY_MEMORY_EXCEED, -257, false); \
E(WORKLOAD_GROUP_MEMORY_EXCEED, -257, false); \
E(PROCESS_MEMORY_EXCEED, -257, false); \
E(WORKLOAD_GROUP_MEMORY_EXCEED, -258, false); \
E(PROCESS_MEMORY_EXCEED, -259, false); \
E(CE_CMD_PARAMS_ERROR, -300, true); \
E(CE_BUFFER_TOO_SMALL, -301, true); \
E(CE_CMD_NOT_VALID, -302, true); \
Expand Down
16 changes: 1 addition & 15 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,21 +398,7 @@ Status PipelineTask::execute(bool* eos) {
bool is_high_wartermark = false;
bool is_low_wartermark = false;
workload_group->check_mem_used(&is_low_wartermark, &is_high_wartermark);

/// The larger reserved memory size is likely due to a larger available revocable size.
/// If the available memory for revoking is large enough, here trigger revoking proactively.
bool need_to_pause = false;
const auto revocable_mem_size = _sink->revocable_mem_size(_state);
if (revocable_mem_size > 100L * 1024 * 1024) {
LOG(INFO) << "query: " << print_id(query_id)
<< ", task id: " << _state->task_id()
<< " has big memory to revoke: " << revocable_mem_size;
RETURN_IF_ERROR(_sink->revoke_memory(_state));
need_to_pause = true;
} else {
need_to_pause = is_low_wartermark || is_high_wartermark;
}
if (need_to_pause) {
if (is_low_wartermark || is_high_wartermark) {
_memory_sufficient_dependency->block();
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this());
Expand Down
26 changes: 8 additions & 18 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,6 @@ class MemTrackerLimiter final {

void release(int64_t bytes) { _mem_counter.sub(bytes); }

bool try_consume(int64_t bytes) {
if (UNLIKELY(bytes == 0)) {
return true;
}
bool rt = true;
if (is_overcommit_tracker() && !config::enable_query_memory_overcommit) {
rt = _mem_counter.try_add(bytes, _limit);
} else {
_mem_counter.add(bytes);
}
if (rt && _query_statistics) {
_query_statistics->set_max_peak_memory_bytes(peak_consumption());
_query_statistics->set_current_used_memory_bytes(consumption());
}
return rt;
}

void set_consumption(int64_t bytes) { _mem_counter.set(bytes); }

// Transfer 'bytes' of consumption from this tracker to 'dst'.
Expand All @@ -189,7 +172,14 @@ class MemTrackerLimiter final {
int64_t reserved_peak_consumption() const { return _reserved_counter.peak_value(); }

bool try_reserve(int64_t bytes) {
bool rt = try_consume(bytes);
if (UNLIKELY(bytes == 0)) {
return true;
}
bool rt = _mem_counter.try_add(bytes, _limit);
if (rt && _query_statistics) {
_query_statistics->set_max_peak_memory_bytes(peak_consumption());
_query_statistics->set_current_used_memory_bytes(consumption());
}
if (rt) {
_reserved_counter.add(bytes);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ bool WorkloadGroupMgr::spill_or_cancel_query(std::shared_ptr<QueryContext> query
query_ctx->get_revocable_info(&revocable_size, &memory_usage, &has_running_task);
if (has_running_task) {
LOG(INFO) << "query: " << print_id(query_ctx->query_id())
<< "is paused, but still has running task, skip it.";
<< " is paused, but still has running task, skip it.";
return false;
}

Expand Down

0 comments on commit 0dfc1db

Please sign in to comment.