diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 6ce09b7dd0e014..ac05a0701537e7 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -337,6 +337,13 @@ class QueryContext : public std::enable_shared_from_this { bool low_memory_mode() { return _low_memory_mode; } + void register_writer(std::weak_ptr writer) { + std::lock_guard l(_memtable_writer_lock); + _writers.push_back(writer); + } + + void get_load_mem_usage(int64_t* active_bytes, int64_t* queue_bytes, int64_t* flush_bytes) {} + void update_paused_reason(const Status& st) { std::lock_guard l(_paused_mutex); if (_paused_reason.is()) { @@ -418,6 +425,8 @@ class QueryContext : public std::enable_shared_from_this { std::atomic _low_memory_mode = false; int64_t _user_set_mem_limit = 0; std::atomic _expected_mem_limit = 0; + std::mutex _memtable_writer_lock; + std::vector> _writers; std::mutex _profile_mutex; timespec _query_arrival_timestamp; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 95951b96bab390..5a91111d040158 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -430,23 +430,24 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { "capacity " "to 0 now"; } + // If the query is a pure load task: streamload, kafka load, group commit, if we + // meet process memory not sufficient, just clear the cache and set it to ready. + // because memlimiter has a simple logic to flush memory. + if (query_ctx->is_pure_load_task()) { + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } if (query_it->cache_ratio_ < 0.001) { if (query_it->any_wg_exceed_limit_) { - if (wg->enable_memory_overcommit()) { - if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) { - resume_after_gc.push_back(query_ctx); - query_it = queries_list.erase(query_it); - continue; - } else { - ++query_it; - continue; - } - } else { - // current workload group is hard limit, should not wait other wg with - // soft limit, just cancel + // TODO add a session variable in query ctx to wait in queue. + if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) { resume_after_gc.push_back(query_ctx); query_it = queries_list.erase(query_it); continue; + } else { + ++query_it; + continue; } } else { // TODO: Find other exceed limit workload group and cancel query. @@ -534,24 +535,62 @@ void WorkloadGroupMgr::handle_overcommit_wg_paused_queries() { // If the query is not ready to do these tasks, it means just wait. bool WorkloadGroupMgr::handle_single_query(std::shared_ptr query_ctx, size_t size_to_reserve, Status paused_reason) { - // TODO: If the query is an insert into select query, should consider memtable as revoke memory. + CHECK(paused_reason.is() || + paused_reason.is()) + << paused_reason.to_string(); + CHECK(!query_ctx->is_pure_load_task()) << query_ctx.debug_string(); + // This method may encounter serveral times because when the query is waiting in queue. + // During waiting all task not running, may release some memory and the memory is enough now + // should resume the query. + // The error code has to be query memory exceed, if process memory exceed, then could not resume. + // because if process limit exceed, then query limit is not reached. + if (paused_reason.is()) { + const auto limit = query_ctx->get_mem_limit(); + if ((memory_usage + size_to_reserve) < limit * 0.95) { + LOG(INFO) << "query: " << query_id << ", usage(" << memory_usage << " + " + << size_to_reserve << ") less than limit(" << limit << "), resume it."; + query_ctx->set_memory_sufficient(true); + return true; + } + } + size_t revocable_size = 0; size_t memory_usage = 0; bool has_running_task = false; const auto query_id = print_id(query_ctx->query_id()); query_ctx->get_revocable_info(&revocable_size, &memory_usage, &has_running_task); + + // If the query is an insert into select query, should consider memtable as revoke memory. + int64_t memtable_active_bytes = 0; + int64_t memtable_queue_bytes = 0; + int64_t memtable_flush_bytes = 0; + query_ctx->get_load_mem_usage(&memtable_active_bytes, &memtable_queue_bytes, + &memtable_flush_bytes); + + // If it is a insert into select or streamload and the memtable is very large + // we should flush some memtable first. + // TODO workload group should set a percent, and refer it here. + if (memtable_active_bytes > query_ctx->get_mem_limit() * 0.2) { + // TODO should flush some memtable + // return true; + } + if (memtable_flush_bytes + memtable_queue_bytes > 100 * 1024 * 1024) { + LOG(INFO) << "query: " << query_ctx->debug_string() + << ", still has memtable during flush memtable queue size: " + << memtable_queue_bytes << ", memtable flush size: " << memtable_flush_bytes + << ", waiting for memtable flush finished."; + return false; + } + if (has_running_task) { LOG(INFO) << "query: " << print_id(query_ctx->query_id()) << " is paused, but still has running task, skip it."; return false; } - // During waiting all task not running, may release some memory and the memory is enough now - // should resume the query. - if (query_ctx->get_mem_tracker()->limit() > - query_ctx->get_mem_tracker()->consumption() + size_to_reserve) { - query_ctx->set_memory_sufficient(true); - return true; + // If it is a pure load task, then wait, not cancel it here. + if (paused_reason.is()) { + return false; } auto revocable_tasks = query_ctx->get_revocable_tasks();