Skip to content

Commit

Permalink
only spill largest task
Browse files Browse the repository at this point in the history
  • Loading branch information
Doris-Extras committed Oct 2, 2024
1 parent d8f61b5 commit fc3bcc6
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 19 deletions.
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
st = Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_hash_join_sink revoke_memory submit_func failed");
});

// For every stream, the task counter is increased +1
// so that when a stream finished, it should desc -1
state->get_query_ctx()->increase_revoking_tasks_count();
auto spill_runnable = std::make_shared<SpillRunnable>(
state, _shared_state->shared_from_this(),
Expand All @@ -349,6 +350,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
return Status::OK();
}();

_state->get_query_ctx()->decrease_revoking_tasks_count();
if (!status.ok()) {
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_dependency->set_ready();
Expand Down Expand Up @@ -458,7 +460,6 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(

if (num == 1) {
std::unique_lock<std::mutex> lock(_spill_lock);
_state->get_query_ctx()->decrease_revoking_tasks_count();
_spill_dependency->set_ready();
if (_child_eos) {
VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) << ", hash join sink "
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,8 @@ Status PipelineTask::execute(bool* eos) {
COUNTER_UPDATE(_yield_counts, 1);

LOG(INFO) << "query: " << print_id(query_id) << ", task: " << (void*)this
<< ", insufficient memory. reserve_size: " << reserve_size;
<< ", insufficient memory. reserve_size: "
<< PrettyPrinter::print(reserve_size, TUnit::BYTES);
_memory_sufficient_dependency->block();
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this(), reserve_size);
Expand Down
24 changes: 13 additions & 11 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,17 +470,19 @@ Status QueryContext::revoke_memory() {
// Do not use memlimit, use current memory usage.
// For example, if current limit is 1.6G, but current used is 1G, if reserve failed
// should free 200MB memory, not 300MB
const int64_t target_revoking_size = (int64_t)(query_mem_tracker->consumption() * 0.2);
//const int64_t target_revoking_size = (int64_t)(query_mem_tracker->consumption() * 0.2);
size_t revoked_size = 0;

std::vector<pipeline::PipelineTask*> chosen_tasks;
for (auto&& [revocable_size, task] : tasks) {
chosen_tasks.emplace_back(task);

revoked_size += revocable_size;
if (revoked_size >= target_revoking_size) {
break;
}
// Only revoke the largest task to ensure memory is used as much as possible
break;
//if (revoked_size >= target_revoking_size) {
// break;
//}
}

std::weak_ptr<QueryContext> this_ctx = shared_from_this();
Expand All @@ -491,18 +493,18 @@ Status QueryContext::revoke_memory() {
return;
}

LOG(INFO) << "query: " << print_id(query_context->_query_id)
<< ", context: " << ((void*)context)
<< " all revoking tasks done, resumt it.";
LOG(INFO) << query_context->debug_string() << ", context: " << ((void*)context)
<< " all spill tasks done, resume it.";
query_context->set_memory_sufficient(true);
});

LOG(INFO) << "query: " << print_id(_query_id) << ", context: " << ((void*)spill_context.get())
<< " total revoked size: " << revoked_size << ", tasks count: " << chosen_tasks.size()
<< "/" << tasks.size();
for (auto* task : chosen_tasks) {
RETURN_IF_ERROR(task->revoke_memory(spill_context));
}

LOG(INFO) << this->debug_string() << ", context: " << ((void*)spill_context.get())
<< " total revoked size: " << PrettyPrinter::print(revoked_size, TUnit::BYTES)
<< ", tasks count: " << chosen_tasks.size() << "/" << tasks.size();
return Status::OK();
}

Expand All @@ -526,7 +528,7 @@ std::vector<pipeline::PipelineTask*> QueryContext::get_revocable_tasks() const {
std::string QueryContext::debug_string() {
std::lock_guard l(_paused_mutex);
return fmt::format(
"MemTracker Label={}, Used={}, Limit={}, Peak={}, running revoke task count {}, "
"Label={}, Used={}, Limit={}, Peak={}, running revoke task count {}, "
"MemorySufficient={}, PausedReason={}",
query_mem_tracker->label(),
PrettyPrinter::print(query_mem_tracker->consumption(), TUnit::BYTES),
Expand Down
4 changes: 0 additions & 4 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,6 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
auto& queries_list = it->second;
const auto& wg = it->first;
if (queries_list.empty()) {
LOG(INFO) << "wg: " << wg->debug_string()
<< " has no paused query, update it to memory sufficent";
it = _paused_queries_list.erase(it);
continue;
}
Expand Down Expand Up @@ -639,8 +637,6 @@ void WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool enable_har
int64_t expected_query_weighted_mem_limit = 0;
// If the query enable hard limit, then it should not use the soft limit
if (query_ctx->enable_query_slot_hard_limit()) {
LOG(INFO) << "query info " << wg_high_water_mark_except_load << ","
<< query_ctx->get_slot_count() << "," << total_slot_count;
if (total_slot_count < 1) {
LOG(WARNING)
<< "query " << print_id(query_ctx->query_id())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public QueueToken getToken(int querySlotCount) throws UserException {
queueToken.complete();
return queueToken;
} else if (waitingQueryQueue.size() >= maxQueueSize) {
throw new UserException("query waiting queue is full, queue length=" + maxQueueSize);
throw new UserException("query waiting queue is full, queue capacity=" + maxQueueSize
+ ", waiting num=" + waitingQueryQueue.size());
} else {
if (!hasFreeSlot) {
queueToken.setQueueMsg("NO_FREE_SLOT");
Expand Down

0 comments on commit fc3bcc6

Please sign in to comment.