Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
Doris-Extras committed Aug 21, 2023
1 parent 1610d1d commit e292773
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 13 deletions.
20 changes: 10 additions & 10 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> exec_state,
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.erase(exec_state->fragment_instance_id());
if (all_done && query_ctx) {
_query_ctx_map.erase(query_ctx->query_id);
_query_ctx_map.erase(query_ctx->query_id());
}
}

Expand Down Expand Up @@ -482,7 +482,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
query_ctx->file_scan_range_params_map = params.file_scan_params;
}

LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi, query_ctx->query_id.lo)
LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id().hi, query_ctx->query_id().lo)
<< " coord_addr " << query_ctx->coord_addr
<< " total fragment num on current host: " << params.fragment_num_on_host;
query_ctx->query_globals = params.query_globals;
Expand Down Expand Up @@ -510,15 +510,15 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
if (params.query_options.query_type == TQueryType::SELECT) {
query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::QUERY,
fmt::format("Query#Id={}", print_id(query_ctx->query_id)), bytes_limit);
fmt::format("Query#Id={}", print_id(query_ctx->query_id())), bytes_limit);
} else if (params.query_options.query_type == TQueryType::LOAD) {
query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
fmt::format("Load#Id={}", print_id(query_ctx->query_id)), bytes_limit);
fmt::format("Load#Id={}", print_id(query_ctx->query_id())), bytes_limit);
} else { // EXTERNAL
query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
fmt::format("External#Id={}", print_id(query_ctx->query_id)), bytes_limit);
fmt::format("External#Id={}", print_id(query_ctx->query_id())), bytes_limit);
}
if (params.query_options.__isset.is_report_success &&
params.query_options.is_report_success) {
Expand All @@ -535,11 +535,11 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
task_group_info);
tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
query_ctx->set_task_group(tg);
LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id)
LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id())
<< " use task group: " << tg->debug_string();
}
} else {
VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id)
VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id())
<< " does not use task group.";
}
}
Expand All @@ -550,9 +550,9 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
std::lock_guard<std::mutex> lock(_lock);
auto search = _query_ctx_map.find(query_id);
if (search == _query_ctx_map.end()) {
_query_ctx_map.insert(std::make_pair(query_ctx->query_id, query_ctx));
_query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx));
LOG(INFO) << "Register query/load memory tracker, query/load id: "
<< print_id(query_ctx->query_id)
<< print_id(query_ctx->query_id())
<< " limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES);
} else {
// Already has a query fragments context, use it
Expand Down Expand Up @@ -681,7 +681,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
params, local_params, exec_state->executor()->runtime_state(), query_ctx.get());
std::shared_ptr<pipeline::PipelineFragmentContext> context =
std::make_shared<pipeline::PipelineFragmentContext>(
query_ctx->query_id, fragment_instance_id, params.fragment_id,
query_ctx->query_id(), fragment_instance_id, params.fragment_id,
local_params.backend_num, query_ctx, _exec_env, cb,
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
std::placeholders::_1));
Expand Down
5 changes: 2 additions & 3 deletions be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
: _exec_env(exec_env),
_plan(nullptr),
_query_ctx(query_ctx),
_query_id(query_ctx->query_id()),
_query_id(query_ctx->query_id()()),
_fragment_instance_id(instance_id),
_backend_num(backend_num),
_coord_addr(query_ctx->coord_addr),
Expand Down Expand Up @@ -539,9 +539,8 @@ void PlanFragmentExecutor::coordinator_callback(const Status& status, RuntimePro
FrontendServiceConnection coord(_exec_env->frontend_client_cache(), _coord_addr, &coord_status);
if (!coord_status.ok()) {
std::stringstream ss;
UniqueId uid(req.query_id.hi, req.query_id.lo);
ss << "couldn't get a client for " << _coord_addr << ", reason: " << coord_status;
LOG(WARNING) << "query_id: " << uid << ", " << ss.str();
LOG(WARNING) << "query_id: " << _query_id << ", " << ss.str();
update_status(Status::InternalError(ss.str()));
return;
}
Expand Down

0 comments on commit e292773

Please sign in to comment.