Skip to content

Commit

Permalink
[pick]support cgroup v2 (#42465)
Browse files Browse the repository at this point in the history
## Proposed changes

pick #39991   #39374  #36663
  • Loading branch information
wangbo authored Oct 25, 2024
1 parent 4a62d9e commit 9eef393
Show file tree
Hide file tree
Showing 11 changed files with 405 additions and 166 deletions.
375 changes: 277 additions & 98 deletions be/src/agent/cgroup_cpu_ctl.cpp

Large diffs are not rendered by default.

95 changes: 83 additions & 12 deletions be/src/agent/cgroup_cpu_ctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ namespace doris {

// cgroup cpu.cfs_quota_us default value, it means disable cpu hard limit
const static int CGROUP_CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
const static std::string CGROUP_V2_CPU_HARD_LIMIT_DEFAULT_VALUE = "max 100000";

class CgroupCpuCtl {
public:
virtual ~CgroupCpuCtl() = default;
CgroupCpuCtl() = default;
CgroupCpuCtl(uint64_t wg_id) { _wg_id = wg_id; }

virtual Status init();
virtual Status init() = 0;

virtual Status add_thread_to_cgroup() = 0;

Expand All @@ -48,18 +48,44 @@ class CgroupCpuCtl {
// for log
void get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit);

virtual Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) = 0;
static void init_doris_cgroup_path();

protected:
Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append);
static Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids);

static std::unique_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);

static bool is_a_valid_cgroup_path(std::string cg_path);

static uint64_t cpu_soft_limit_default_value();

protected:
virtual Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) = 0;

virtual Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) = 0;

std::string _doris_cgroup_cpu_path;
uint64_t _cpu_core_num = CpuInfo::num_cores();
uint64_t _cpu_cfs_period_us = 100000;
Status add_thread_to_cgroup(std::string task_file);

static Status write_cg_sys_file(std::string file_path, std::string value, std::string msg,
bool is_append);

static Status init_cgroup_v2_query_path_public_file(std::string home_path,
std::string query_path);

protected:
inline static uint64_t _cpu_core_num;
const static uint64_t _cpu_cfs_period_us = 100000;
inline static std::string _doris_cgroup_cpu_path = "";
inline static std::string _doris_cgroup_cpu_query_path = "";
inline static bool _is_enable_cgroup_v1_in_env = false;
inline static bool _is_enable_cgroup_v2_in_env = false;
inline static bool _is_cgroup_query_path_valid = false;

// cgroup v2 public file
inline static std::string _doris_cgroup_cpu_path_subtree_ctl_file = "";
inline static std::string _cgroup_v2_query_path_subtree_ctl_file = "";
inline static std::string _doris_cg_v2_procs_file = "";

protected:
int _cpu_hard_limit = 0;
std::shared_mutex _lock_mutex;
bool _init_succ = false;
Expand Down Expand Up @@ -96,20 +122,65 @@ class CgroupCpuCtl {
class CgroupV1CpuCtl : public CgroupCpuCtl {
public:
CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
CgroupV1CpuCtl() = default;
Status init() override;
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
Status add_thread_to_cgroup() override;

Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) override;

private:
std::string _cgroup_v1_cpu_query_path;
std::string _cgroup_v1_cpu_tg_path; // workload group path
std::string _cgroup_v1_cpu_tg_quota_file;
std::string _cgroup_v1_cpu_tg_shares_file;
std::string _cgroup_v1_cpu_tg_task_file;
};

/*
NOTE: cgroup v2 directory structure
1 root path:
/sys/fs/cgroup
2 doris home path:
/sys/fs/cgroup/{doris_home}/
3 doris home subtree_control file:
/sys/fs/cgroup/{doris_home}/cgroup.subtree_control
4 query path:
/sys/fs/cgroup/{doris_home}/query/
5 query path subtree_control file:
/sys/fs/cgroup/{doris_home}/query/cgroup.subtree_control
6 query path procs file:
/sys/fs/cgroup/{doris_home}/query/cgroup.procs
7 workload group path:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}
8 workload grou cpu.max file:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cpu.max
9 workload grou cpu.weight file:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cpu.weight
10 workload group cgroup type file:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cgroup.type
*/
class CgroupV2CpuCtl : public CgroupCpuCtl {
public:
CgroupV2CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
Status init() override;
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
Status add_thread_to_cgroup() override;

private:
std::string _cgroup_v2_query_wg_path;
std::string _cgroup_v2_query_wg_cpu_max_file;
std::string _cgroup_v2_query_wg_cpu_weight_file;
std::string _cgroup_v2_query_wg_thread_file;
std::string _cgroup_v2_query_wg_type_file;
};

} // namespace doris
6 changes: 3 additions & 3 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class MemTracker;
class StorageEngine;
class ResultBufferMgr;
class ResultQueueMgr;
class RuntimeQueryStatiticsMgr;
class RuntimeQueryStatisticsMgr;
class TMasterInfo;
class LoadChannelMgr;
class LoadStreamMgr;
Expand Down Expand Up @@ -162,7 +162,7 @@ class ExecEnv {
pipeline::TaskScheduler* pipeline_task_scheduler() { return _without_group_task_scheduler; }
WorkloadGroupMgr* workload_group_mgr() { return _workload_group_manager; }
WorkloadSchedPolicyMgr* workload_sched_policy_mgr() { return _workload_sched_mgr; }
RuntimeQueryStatiticsMgr* runtime_query_statistics_mgr() {
RuntimeQueryStatisticsMgr* runtime_query_statistics_mgr() {
return _runtime_query_statistics_mgr;
}

Expand Down Expand Up @@ -458,7 +458,7 @@ class ExecEnv {

WorkloadSchedPolicyMgr* _workload_sched_mgr = nullptr;

RuntimeQueryStatiticsMgr* _runtime_query_statistics_mgr = nullptr;
RuntimeQueryStatisticsMgr* _runtime_query_statistics_mgr = nullptr;

std::unique_ptr<pipeline::PipelineTracerContext> _pipeline_tracer_ctx;
std::unique_ptr<segment_v2::TmpFileDirs> _tmp_file_dirs;
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,

// NOTE: runtime query statistics mgr could be visited by query and daemon thread
// so it should be created before all query begin and deleted after all query and daemon thread stoppped
_runtime_query_statistics_mgr = new RuntimeQueryStatiticsMgr();
_runtime_query_statistics_mgr = new RuntimeQueryStatisticsMgr();
CgroupCpuCtl::init_doris_cgroup_path();

std::vector<doris::CachePath> cache_paths;
init_file_cache_factory(cache_paths);
Expand Down
20 changes: 10 additions & 10 deletions be/src/runtime/runtime_query_statistics_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ void QueryStatisticsCtx::collect_query_statistics(TQueryStatistics* tq_s) {
tq_s->__set_workload_group_id(_wg_id);
}

void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id,
std::shared_ptr<QueryStatistics> qs_ptr,
TNetworkAddress fe_addr,
TQueryType::type query_type) {
void RuntimeQueryStatisticsMgr::register_query_statistics(std::string query_id,
std::shared_ptr<QueryStatistics> qs_ptr,
TNetworkAddress fe_addr,
TQueryType::type query_type) {
std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
if (_query_statistics_ctx_map.find(query_id) == _query_statistics_ctx_map.end()) {
_query_statistics_ctx_map[query_id] =
Expand All @@ -47,7 +47,7 @@ void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id,
_query_statistics_ctx_map.at(query_id)->_qs_list.push_back(qs_ptr);
}

void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
// 1 get query statistics map
std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>> fe_qs_map;
Expand Down Expand Up @@ -166,7 +166,7 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
}
}

void RuntimeQueryStatiticsMgr::set_query_finished(std::string query_id) {
void RuntimeQueryStatisticsMgr::set_query_finished(std::string query_id) {
// NOTE: here must be a write lock
std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
// when a query get query_ctx succ, but failed before create node/operator,
Expand All @@ -178,7 +178,7 @@ void RuntimeQueryStatiticsMgr::set_query_finished(std::string query_id) {
}
}

std::shared_ptr<QueryStatistics> RuntimeQueryStatiticsMgr::get_runtime_query_statistics(
std::shared_ptr<QueryStatistics> RuntimeQueryStatisticsMgr::get_runtime_query_statistics(
std::string query_id) {
std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
if (_query_statistics_ctx_map.find(query_id) == _query_statistics_ctx_map.end()) {
Expand All @@ -191,7 +191,7 @@ std::shared_ptr<QueryStatistics> RuntimeQueryStatiticsMgr::get_runtime_query_sta
return qs_ptr;
}

void RuntimeQueryStatiticsMgr::get_metric_map(
void RuntimeQueryStatisticsMgr::get_metric_map(
std::string query_id, std::map<WorkloadMetricType, std::string>& metric_map) {
QueryStatistics ret_qs;
int64_t query_time_ms = 0;
Expand All @@ -212,15 +212,15 @@ void RuntimeQueryStatiticsMgr::get_metric_map(
std::to_string(ret_qs.get_current_used_memory_bytes()));
}

void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64_t wg_id) {
void RuntimeQueryStatisticsMgr::set_workload_group_id(std::string query_id, int64_t wg_id) {
// wg id just need eventual consistency, read lock is ok
std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
if (_query_statistics_ctx_map.find(query_id) != _query_statistics_ctx_map.end()) {
_query_statistics_ctx_map.at(query_id)->_wg_id = wg_id;
}
}

void RuntimeQueryStatiticsMgr::get_active_be_tasks_block(vectorized::Block* block) {
void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* block) {
std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;

Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/runtime_query_statistics_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ class QueryStatisticsCtx {
int64_t _query_start_time;
};

class RuntimeQueryStatiticsMgr {
class RuntimeQueryStatisticsMgr {
public:
RuntimeQueryStatiticsMgr() = default;
~RuntimeQueryStatiticsMgr() = default;
RuntimeQueryStatisticsMgr() = default;
~RuntimeQueryStatisticsMgr() = default;

void register_query_statistics(std::string query_id, std::shared_ptr<QueryStatistics> qs_ptr,
TNetworkAddress fe_addr, TQueryType::type query_type);
Expand Down
25 changes: 14 additions & 11 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@

namespace doris {

const static uint64_t CPU_SHARE_DEFAULT_VALUE = 1024;
const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%";
const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024;
const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;

Expand Down Expand Up @@ -329,7 +327,7 @@ Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_g
workload_group_info->version = version;

// 4 cpu_share
uint64_t cpu_share = CPU_SHARE_DEFAULT_VALUE;
uint64_t cpu_share = CgroupCpuCtl::cpu_soft_limit_default_value();
if (tworkload_group_info.__isset.cpu_share) {
cpu_share = tworkload_group_info.cpu_share;
}
Expand Down Expand Up @@ -433,14 +431,18 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e

std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) {
std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_unique<CgroupV1CpuCtl>(tg_id);
Status ret = cgroup_cpu_ctl->init();
if (ret.ok()) {
_cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id;
std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = CgroupCpuCtl::create_cgroup_cpu_ctl(tg_id);
if (cgroup_cpu_ctl) {
Status ret = cgroup_cpu_ctl->init();
if (ret.ok()) {
_cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id;
} else {
LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id=" << tg_id
<< ", reason=" << ret.to_string();
}
} else {
LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id= " << tg_id
<< ", reason=" << ret.to_string();
LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl for " << tg_id << " failed";
}
}

Expand Down Expand Up @@ -533,7 +535,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
if (enable_cpu_hard_limit) {
if (cpu_hard_limit > 0) {
_cgroup_cpu_ctl->update_cpu_hard_limit(cpu_hard_limit);
_cgroup_cpu_ctl->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
_cgroup_cpu_ctl->update_cpu_soft_limit(
CgroupCpuCtl::cpu_soft_limit_default_value());
} else {
LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal: "
<< cpu_hard_limit << ", gid=" << tg_id;
Expand Down
27 changes: 7 additions & 20 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
}
// wg is shutdown and running rum = 0, its resource can be released in BE
if (workload_group_ptr->can_be_dropped()) {
LOG(INFO) << "[topic_publish_wg]There is no query in wg" << wg_id << ", delete it.";
LOG(INFO) << "[topic_publish_wg]There is no query in wg " << wg_id
<< ", delete it.";
deleted_task_groups.push_back(workload_group_ptr);
}
}
Expand Down Expand Up @@ -121,30 +122,16 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
// Using cgdelete has no such issue.
{
if (config::doris_cgroup_cpu_path != "") {
std::lock_guard<std::shared_mutex> write_lock(_init_cg_ctl_lock);
if (!_cg_cpu_ctl) {
_cg_cpu_ctl = std::make_unique<CgroupV1CpuCtl>();
}
if (!_is_init_succ) {
Status ret = _cg_cpu_ctl->init();
if (ret.ok()) {
_is_init_succ = true;
} else {
LOG(INFO) << "[topic_publish_wg]init workload group mgr cpu ctl failed, "
<< ret.to_string();
}
}
if (_is_init_succ) {
Status ret = _cg_cpu_ctl->delete_unused_cgroup_path(used_wg_id);
if (!ret.ok()) {
LOG(WARNING) << "[topic_publish_wg]" << ret.to_string();
}
std::lock_guard<std::shared_mutex> write_lock(_clear_cgroup_lock);
Status ret = CgroupCpuCtl::delete_unused_cgroup_path(used_wg_id);
if (!ret.ok()) {
LOG(WARNING) << "[topic_publish_wg]" << ret.to_string();
}
}
}
int64_t time_cost_ms = MonotonicMillis() - begin_time;
LOG(INFO) << "[topic_publish_wg]finish clear unused workload group, time cost: " << time_cost_ms
<< "ms, deleted group size:" << deleted_task_groups.size()
<< " ms, deleted group size:" << deleted_task_groups.size()
<< ", before wg size=" << old_wg_size << ", after wg size=" << new_wg_size;
}

Expand Down
4 changes: 1 addition & 3 deletions be/src/runtime/workload_group/workload_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ class WorkloadGroupMgr {
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;

std::shared_mutex _init_cg_ctl_lock;
std::unique_ptr<CgroupCpuCtl> _cg_cpu_ctl;
bool _is_init_succ = false;
std::shared_mutex _clear_cgroup_lock;
};

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ public void getProcNodeData(BaseProcResult result, QueryQueue qq) {
row.add(val + "%");
}
} else if (CPU_SHARE.equals(key) && !properties.containsKey(key)) {
row.add("1024");
row.add("-1");
} else if (MEMORY_LIMIT.equals(key) && !properties.containsKey(key)) {
row.add("0%");
} else if (ENABLE_MEMORY_OVERCOMMIT.equals(key) && !properties.containsKey(key)) {
Expand Down
Loading

0 comments on commit 9eef393

Please sign in to comment.