From fc3bcc673e517d00077ad99dcd5b8d6aee7c648d Mon Sep 17 00:00:00 2001 From: yiguolei Date: Wed, 2 Oct 2024 09:31:50 +0800 Subject: [PATCH] only spill largest task --- .../partitioned_hash_join_sink_operator.cpp | 5 ++-- be/src/pipeline/pipeline_task.cpp | 3 ++- be/src/runtime/query_context.cpp | 24 ++++++++++--------- .../workload_group/workload_group_manager.cpp | 4 ---- .../resource/workloadgroup/QueryQueue.java | 3 ++- 5 files changed, 20 insertions(+), 19 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 136466aa6b2199..f4f03cdb52f978 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -327,7 +327,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( st = Status::Error( "fault_inject partitioned_hash_join_sink revoke_memory submit_func failed"); }); - + // For every stream, the task counter is increased +1 + // so that when a stream finished, it should desc -1 state->get_query_ctx()->increase_revoking_tasks_count(); auto spill_runnable = std::make_shared( state, _shared_state->shared_from_this(), @@ -349,6 +350,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( return Status::OK(); }(); + _state->get_query_ctx()->decrease_revoking_tasks_count(); if (!status.ok()) { std::unique_lock lock(_spill_lock); _spill_dependency->set_ready(); @@ -458,7 +460,6 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk( if (num == 1) { std::unique_lock lock(_spill_lock); - _state->get_query_ctx()->decrease_revoking_tasks_count(); _spill_dependency->set_ready(); if (_child_eos) { VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) << ", hash join sink " diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 4d7fcc4b53bae9..400903add18794 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -443,7 +443,8 @@ Status PipelineTask::execute(bool* eos) { COUNTER_UPDATE(_yield_counts, 1); LOG(INFO) << "query: " << print_id(query_id) << ", task: " << (void*)this - << ", insufficient memory. reserve_size: " << reserve_size; + << ", insufficient memory. reserve_size: " + << PrettyPrinter::print(reserve_size, TUnit::BYTES); _memory_sufficient_dependency->block(); ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( _state->get_query_ctx()->shared_from_this(), reserve_size); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 5bba3d966420c0..3a40020677ce8d 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -470,7 +470,7 @@ Status QueryContext::revoke_memory() { // Do not use memlimit, use current memory usage. // For example, if current limit is 1.6G, but current used is 1G, if reserve failed // should free 200MB memory, not 300MB - const int64_t target_revoking_size = (int64_t)(query_mem_tracker->consumption() * 0.2); + //const int64_t target_revoking_size = (int64_t)(query_mem_tracker->consumption() * 0.2); size_t revoked_size = 0; std::vector chosen_tasks; @@ -478,9 +478,11 @@ Status QueryContext::revoke_memory() { chosen_tasks.emplace_back(task); revoked_size += revocable_size; - if (revoked_size >= target_revoking_size) { - break; - } + // Only revoke the largest task to ensure memory is used as much as possible + break; + //if (revoked_size >= target_revoking_size) { + // break; + //} } std::weak_ptr this_ctx = shared_from_this(); @@ -491,18 +493,18 @@ Status QueryContext::revoke_memory() { return; } - LOG(INFO) << "query: " << print_id(query_context->_query_id) - << ", context: " << ((void*)context) - << " all revoking tasks done, resumt it."; + LOG(INFO) << query_context->debug_string() << ", context: " << ((void*)context) + << " all spill tasks done, resume it."; query_context->set_memory_sufficient(true); }); - LOG(INFO) << "query: " << print_id(_query_id) << ", context: " << ((void*)spill_context.get()) - << " total revoked size: " << revoked_size << ", tasks count: " << chosen_tasks.size() - << "/" << tasks.size(); for (auto* task : chosen_tasks) { RETURN_IF_ERROR(task->revoke_memory(spill_context)); } + + LOG(INFO) << this->debug_string() << ", context: " << ((void*)spill_context.get()) + << " total revoked size: " << PrettyPrinter::print(revoked_size, TUnit::BYTES) + << ", tasks count: " << chosen_tasks.size() << "/" << tasks.size(); return Status::OK(); } @@ -526,7 +528,7 @@ std::vector QueryContext::get_revocable_tasks() const { std::string QueryContext::debug_string() { std::lock_guard l(_paused_mutex); return fmt::format( - "MemTracker Label={}, Used={}, Limit={}, Peak={}, running revoke task count {}, " + "Label={}, Used={}, Limit={}, Peak={}, running revoke task count {}, " "MemorySufficient={}, PausedReason={}", query_mem_tracker->label(), PrettyPrinter::print(query_mem_tracker->consumption(), TUnit::BYTES), diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index bc87f342ac05ac..947476faeb1eae 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -285,8 +285,6 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { auto& queries_list = it->second; const auto& wg = it->first; if (queries_list.empty()) { - LOG(INFO) << "wg: " << wg->debug_string() - << " has no paused query, update it to memory sufficent"; it = _paused_queries_list.erase(it); continue; } @@ -639,8 +637,6 @@ void WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool enable_har int64_t expected_query_weighted_mem_limit = 0; // If the query enable hard limit, then it should not use the soft limit if (query_ctx->enable_query_slot_hard_limit()) { - LOG(INFO) << "query info " << wg_high_water_mark_except_load << "," - << query_ctx->get_slot_count() << "," << total_slot_count; if (total_slot_count < 1) { LOG(WARNING) << "query " << print_id(query_ctx->query_id()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java index 0da702ad3f997d..82822c05a0d995 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java @@ -130,7 +130,8 @@ public QueueToken getToken(int querySlotCount) throws UserException { queueToken.complete(); return queueToken; } else if (waitingQueryQueue.size() >= maxQueueSize) { - throw new UserException("query waiting queue is full, queue length=" + maxQueueSize); + throw new UserException("query waiting queue is full, queue capacity=" + maxQueueSize + + ", waiting num=" + waitingQueryQueue.size()); } else { if (!hasFreeSlot) { queueToken.setQueueMsg("NO_FREE_SLOT");