Skip to content

Commit

Permalink
add writer in query ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
Doris-Extras committed Sep 30, 2024
1 parent 82761ed commit 1b9038e
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 19 deletions.
9 changes: 9 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,13 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {

bool low_memory_mode() { return _low_memory_mode; }

void register_writer(std::weak_ptr<MemTableWriter> writer) {
std::lock_guard<std::mutex> l(_memtable_writer_lock);
_writers.push_back(writer);
}

void get_load_mem_usage(int64_t* active_bytes, int64_t* queue_bytes, int64_t* flush_bytes) {}

void update_paused_reason(const Status& st) {
std::lock_guard l(_paused_mutex);
if (_paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
Expand Down Expand Up @@ -418,6 +425,8 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
std::atomic<bool> _low_memory_mode = false;
int64_t _user_set_mem_limit = 0;
std::atomic<int64_t> _expected_mem_limit = 0;
std::mutex _memtable_writer_lock;
std::vector<std::weak_ptr<MemTableWriter>> _writers;

std::mutex _profile_mutex;
timespec _query_arrival_timestamp;
Expand Down
77 changes: 58 additions & 19 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,23 +430,24 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
"capacity "
"to 0 now";
}
// If the query is a pure load task: streamload, kafka load, group commit, if we
// meet process memory not sufficient, just clear the cache and set it to ready.
// because memlimiter has a simple logic to flush memory.
if (query_ctx->is_pure_load_task()) {
query_ctx->set_memory_sufficient(true);
query_it = queries_list.erase(query_it);
continue;
}
if (query_it->cache_ratio_ < 0.001) {
if (query_it->any_wg_exceed_limit_) {
if (wg->enable_memory_overcommit()) {
if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
resume_after_gc.push_back(query_ctx);
query_it = queries_list.erase(query_it);
continue;
} else {
++query_it;
continue;
}
} else {
// current workload group is hard limit, should not wait other wg with
// soft limit, just cancel
// TODO add a session variable in query ctx to wait in queue.
if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
resume_after_gc.push_back(query_ctx);
query_it = queries_list.erase(query_it);
continue;
} else {
++query_it;
continue;
}
} else {
// TODO: Find other exceed limit workload group and cancel query.
Expand Down Expand Up @@ -534,24 +535,62 @@ void WorkloadGroupMgr::handle_overcommit_wg_paused_queries() {
// If the query is not ready to do these tasks, it means just wait.
bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_ctx,
size_t size_to_reserve, Status paused_reason) {
// TODO: If the query is an insert into select query, should consider memtable as revoke memory.
CHECK(paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>() ||
paused_reason.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>())
<< paused_reason.to_string();
CHECK(!query_ctx->is_pure_load_task()) << query_ctx.debug_string();
// This method may encounter serveral times because when the query is waiting in queue.
// During waiting all task not running, may release some memory and the memory is enough now
// should resume the query.
// The error code has to be query memory exceed, if process memory exceed, then could not resume.
// because if process limit exceed, then query limit is not reached.
if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
const auto limit = query_ctx->get_mem_limit();
if ((memory_usage + size_to_reserve) < limit * 0.95) {
LOG(INFO) << "query: " << query_id << ", usage(" << memory_usage << " + "
<< size_to_reserve << ") less than limit(" << limit << "), resume it.";
query_ctx->set_memory_sufficient(true);
return true;
}
}

size_t revocable_size = 0;
size_t memory_usage = 0;
bool has_running_task = false;
const auto query_id = print_id(query_ctx->query_id());
query_ctx->get_revocable_info(&revocable_size, &memory_usage, &has_running_task);

// If the query is an insert into select query, should consider memtable as revoke memory.
int64_t memtable_active_bytes = 0;
int64_t memtable_queue_bytes = 0;
int64_t memtable_flush_bytes = 0;
query_ctx->get_load_mem_usage(&memtable_active_bytes, &memtable_queue_bytes,
&memtable_flush_bytes);

// If it is a insert into select or streamload and the memtable is very large
// we should flush some memtable first.
// TODO workload group should set a percent, and refer it here.
if (memtable_active_bytes > query_ctx->get_mem_limit() * 0.2) {
// TODO should flush some memtable
// return true;
}
if (memtable_flush_bytes + memtable_queue_bytes > 100 * 1024 * 1024) {
LOG(INFO) << "query: " << query_ctx->debug_string()
<< ", still has memtable during flush memtable queue size: "
<< memtable_queue_bytes << ", memtable flush size: " << memtable_flush_bytes
<< ", waiting for memtable flush finished.";
return false;
}

if (has_running_task) {
LOG(INFO) << "query: " << print_id(query_ctx->query_id())
<< " is paused, but still has running task, skip it.";
return false;
}

// During waiting all task not running, may release some memory and the memory is enough now
// should resume the query.
if (query_ctx->get_mem_tracker()->limit() >
query_ctx->get_mem_tracker()->consumption() + size_to_reserve) {
query_ctx->set_memory_sufficient(true);
return true;
// If it is a pure load task, then wait, not cancel it here.
if (paused_reason.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
return false;
}

auto revocable_tasks = query_ctx->get_revocable_tasks();
Expand Down

0 comments on commit 1b9038e

Please sign in to comment.