Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
Doris-Extras committed Aug 21, 2023
1 parent e292773 commit 37e8b7d
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 150 deletions.
5 changes: 4 additions & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,10 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
query_ctx->fragment_ids.push_back(fragment_instance_id);

auto fragment_executor = std::make_shared<PlanFragmentExecutor>(
_exec_env, query_ctx, params.params.fragment_instance_id, params.backend_num);
_exec_env, query_ctx, params.params.fragment_instance_id, params.fragment_id,
params.backend_num,
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
std::placeholders::_1));
if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) {
// set need_wait_execution_trigger means this instance will not actually being executed
// until the execPlanFragmentStart RPC trigger to start it.
Expand Down
162 changes: 14 additions & 148 deletions be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,18 @@ using namespace ErrorCode;

PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
std::shared_ptr<QueryContext> query_ctx,
const TUniqueId& instance_id, int backend_num)
const TUniqueId& instance_id, int fragment_id,
int backend_num,
const report_status_callback& report_status_cb)
: _exec_env(exec_env),
_plan(nullptr),
_query_ctx(query_ctx),
_query_id(query_ctx->query_id()()),
_query_id(query_ctx->query_id()),
_fragment_instance_id(instance_id),
_fragment_id(fragment_id),
_backend_num(backend_num),
_coord_addr(query_ctx->coord_addr),
_report_status_cb(report_status_cb),
_report_thread_active(false),
_done(false),
_prepared(false),
Expand Down Expand Up @@ -507,7 +511,14 @@ void PlanFragmentExecutor::send_report(bool done) {
if (!_is_report_success && !_is_report_on_cancel) {
return;
}

_report_status_cb({exec_status,
_is_report_success ? _runtime_state->runtime_profile() : nullptr,
_is_report_success ? _runtime_state->load_channel_profile() : nullptr,
done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id,
_fragment_instance_id, _backend_num, _runtime_state.get(),
std::bind(&PlanFragmentExecutor::update_status, this, std::placeholders::_1),
std::bind(&PlanFragmentExecutor::cancel, this, std::placeholders::_1,
std::placeholders::_2)});
// This will send a report even if we are cancelled. If the query completed correctly
// but fragments still need to be cancelled (e.g. limit reached), the coordinator will
// be waiting for a final report and profile.
Expand All @@ -528,151 +539,6 @@ Status PlanFragmentExecutor::update_status(const Status& status) {
return _status;
}

// There can only be one of these callbacks in-flight at any moment, because
// it is only invoked from the executor's reporting thread.
// Also, the reported status will always reflect the most recent execution status,
// including the final status when execution finishes.
void PlanFragmentExecutor::coordinator_callback(const Status& status, RuntimeProfile* query_profile,
RuntimeProfile* load_profile, bool done) {
Status exec_status = _status;
Status coord_status;
FrontendServiceConnection coord(_exec_env->frontend_client_cache(), _coord_addr, &coord_status);
if (!coord_status.ok()) {
std::stringstream ss;
ss << "couldn't get a client for " << _coord_addr << ", reason: " << coord_status;
LOG(WARNING) << "query_id: " << _query_id << ", " << ss.str();
update_status(Status::InternalError(ss.str()));
return;
}

TReportExecStatusParams params;
params.protocol_version = FrontendServiceVersion::V1;
params.__set_query_id(_query_id);
params.__set_backend_num(_backend_num);
params.__set_fragment_instance_id(_fragment_instance_id);
params.__set_fragment_id(req.fragment_id);
exec_status.set_t_status(&params);
params.__set_done(done);
params.__set_query_type(_runtime_state->query_type());
params.__set_finished_scan_ranges(_runtime_state->num_finished_range());

if (_runtime_state->query_type() == TQueryType::LOAD && !done && status.ok()) {
// this is a load plan, and load is not finished, just make a brief report
params.__set_loaded_rows(_runtime_state->num_rows_load_total());
params.__set_loaded_bytes(_runtime_state->num_bytes_load_total());
} else {
if (_runtime_state->query_type() == TQueryType::LOAD) {
params.__set_loaded_rows(_runtime_state->num_rows_load_total());
params.__set_loaded_bytes(_runtime_state->num_bytes_load_total());
}
if (query_profile == nullptr) {
params.__isset.profile = false;
} else {
query_profile->to_thrift(&params.profile);
if (load_profile) {
load_profile->to_thrift(&params.loadChannelProfile);
}
params.__isset.profile = true;
params.__isset.loadChannelProfile = true;
}

if (!_runtime_state->output_files().empty()) {
params.__isset.delta_urls = true;
for (auto& it : _runtime_state->output_files()) {
params.delta_urls.push_back(to_http_path(it));
}
}
if (_runtime_state->num_rows_load_total() > 0 ||
_runtime_state->num_rows_load_filtered() > 0) {
params.__isset.load_counters = true;

static std::string s_dpp_normal_all = "dpp.norm.ALL";
static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
static std::string s_unselected_rows = "unselected.rows";

params.load_counters.emplace(s_dpp_normal_all,
std::to_string(_runtime_state->num_rows_load_success()));
params.load_counters.emplace(s_dpp_abnormal_all,
std::to_string(_runtime_state->num_rows_load_filtered()));
params.load_counters.emplace(
s_unselected_rows, std::to_string(_runtime_state->num_rows_load_unselected()));
}
if (!_runtime_state->get_error_log_file_path().empty()) {
params.__set_tracking_url(
to_load_error_http_path(_runtime_state->get_error_log_file_path()));
}
if (!_runtime_state->export_output_files().empty()) {
params.__isset.export_files = true;
params.export_files = _runtime_state->export_output_files();
}
if (!_runtime_state->tablet_commit_infos().empty()) {
params.__isset.commitInfos = true;
params.commitInfos.reserve(_runtime_state->tablet_commit_infos().size());
for (auto& info : _runtime_state->tablet_commit_infos()) {
params.commitInfos.push_back(info);
}
}
if (!_runtime_state->error_tablet_infos().empty()) {
params.__isset.errorTabletInfos = true;
params.errorTabletInfos.reserve(_runtime_state->error_tablet_infos().size());
for (auto& info : _runtime_state->error_tablet_infos()) {
params.errorTabletInfos.push_back(info);
}
}

// Send new errors to coordinator
_runtime_state->get_unreported_errors(&(params.error_log));
params.__isset.error_log = (params.error_log.size() > 0);
}

if (_exec_env->master_info()->__isset.backend_id) {
params.__set_backend_id(_exec_env->master_info()->backend_id);
}

TReportExecStatusResult res;
Status rpc_status;

VLOG_DEBUG << "reportExecStatus params is "
<< apache::thrift::ThriftDebugString(params).c_str();
if (!exec_status.ok()) {
LOG(WARNING) << "report error status: " << exec_status.to_string()
<< " to coordinator: " << _coord_addr
<< ", query id: " << print_id(req.query_id)
<< ", instance id: " << print_id(req.fragment_instance_id);
}
try {
try {
coord->reportExecStatus(res, params);
} catch (TTransportException& e) {
LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
<< ", instance id: " << print_id(req.fragment_instance_id) << " to "
<< _coord_addr << ", err: " << e.what();
rpc_status = coord.reopen();

if (!rpc_status.ok()) {
// we need to cancel the execution of this fragment
update_status(rpc_status);
cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "report rpc fail");
return;
}
coord->reportExecStatus(res, params);
}

rpc_status = Status(Status::create(res.status));
} catch (TException& e) {
std::stringstream msg;
msg << "ReportExecStatus() to " << _coord_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
}

if (!rpc_status.ok()) {
// we need to cancel the execution of this fragment
update_status(rpc_status);
cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "rpc fail 2");
}
}

void PlanFragmentExecutor::stop_report_thread() {
if (!_report_thread_active) {
return;
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/plan_fragment_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ class Block;
// thread-safe.
class PlanFragmentExecutor {
public:
using report_status_callback = std::function<void(const ReportStatusRequest)>;
// report_status_cb, if !empty(), is used to report the accumulated profile
// information periodically during execution (open() or get_next()).
PlanFragmentExecutor(ExecEnv* exec_env, std::shared_ptr<QueryContext> query_ctx,
const TUniqueId& instance_id, int backend_num);
const TUniqueId& instance_id, int fragment_id, int backend_num,
const report_status_callback& report_status_cb);

// Closes the underlying plan fragment and frees up all resources allocated
// in open()/get_next().
Expand Down Expand Up @@ -158,11 +160,13 @@ class PlanFragmentExecutor {
TUniqueId _query_id;
// Id of this instance
TUniqueId _fragment_instance_id;
int _fragment_id;
// Used to report to coordinator which backend is over
int _backend_num;
TNetworkAddress _coord_addr;

// profile reporting-related
report_status_callback _report_status_cb;
std::promise<bool> _report_thread_promise;
std::future<bool> _report_thread_future;
std::mutex _report_thread_lock;
Expand Down

0 comments on commit 37e8b7d

Please sign in to comment.