diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp index b141676545e697..e68535a708c49b 100644 --- a/be/src/agent/cgroup_cpu_ctl.cpp +++ b/be/src/agent/cgroup_cpu_ctl.cpp @@ -19,33 +19,154 @@ #include #include +#include #include +#include "util/cgroup_util.h" #include "util/defer_op.h" namespace doris { -Status CgroupCpuCtl::init() { - _doris_cgroup_cpu_path = config::doris_cgroup_cpu_path; - if (_doris_cgroup_cpu_path.empty()) { - LOG(INFO) << "doris cgroup cpu path is not specify, path=" << _doris_cgroup_cpu_path; - return Status::InternalError("doris cgroup cpu path {} is not specify.", - _doris_cgroup_cpu_path); +bool CgroupCpuCtl::is_a_valid_cgroup_path(std::string cg_path) { + if (!cg_path.empty()) { + if (cg_path.back() != '/') { + cg_path = cg_path + "/"; + } + if (_is_enable_cgroup_v2_in_env) { + std::string query_path_cg_type = cg_path + "cgroup.type"; + std::string query_path_ctl = cg_path + "cgroup.subtree_control"; + std::string query_path_procs = cg_path + "cgroup.procs"; + if (access(query_path_cg_type.c_str(), F_OK) != 0 || + access(query_path_ctl.c_str(), F_OK) != 0 || + access(query_path_procs.c_str(), F_OK) != 0) { + LOG(WARNING) << "[cgroup_init_path]invalid cgroup v2 path, access neccessary file " + "failed"; + } else { + return true; + } + } else if (_is_enable_cgroup_v1_in_env) { + std::string query_path_tasks = cg_path + "tasks"; + std::string query_path_cpu_shares = cg_path + "cpu.shares"; + std::string query_path_quota = cg_path + "cpu.cfs_quota_us"; + if (access(query_path_tasks.c_str(), F_OK) != 0 || + access(query_path_cpu_shares.c_str(), F_OK) != 0 || + access(query_path_quota.c_str(), F_OK) != 0) { + LOG(WARNING) << "[cgroup_init_path]invalid cgroup v1 path, access neccessary file " + "failed"; + } else { + return true; + } + } + } + return false; +} + +void CgroupCpuCtl::init_doris_cgroup_path() { + std::string conf_path = config::doris_cgroup_cpu_path; + if (conf_path.empty()) { + LOG(INFO) << "[cgroup_init_path]doris cgroup home path is not specify, if you not use " + "workload group, you can ignore this log."; + return; + } + + if (access(conf_path.c_str(), F_OK) != 0) { + LOG(INFO) << "[cgroup_init_path]doris cgroup home path not exists, path=" << conf_path; + return; + } + + if (conf_path.back() != '/') { + conf_path = conf_path + "/"; + } + + // check whether current user specified path is a valid cgroup path + std::string cg_msg = "not set cgroup in env"; + if (CGroupUtil::cgroupsv2_enable()) { + _is_enable_cgroup_v2_in_env = true; + cg_msg = "cgroup v2 is enabled in env"; + } else if (CGroupUtil::cgroupsv1_enable()) { + _is_enable_cgroup_v1_in_env = true; + cg_msg = "cgroup v1 is enabled in env"; + } + bool is_cgroup_path_valid = CgroupCpuCtl::is_a_valid_cgroup_path(conf_path); + + std::string tmp_query_path = conf_path + "query"; + if (is_cgroup_path_valid) { + if (access(tmp_query_path.c_str(), F_OK) != 0) { + int ret = mkdir(tmp_query_path.c_str(), S_IRWXU); + if (ret != 0) { + LOG(ERROR) << "[cgroup_init_path]cgroup mkdir query failed, path=" + << tmp_query_path; + } + } + _is_cgroup_query_path_valid = CgroupCpuCtl::is_a_valid_cgroup_path(tmp_query_path); + } + + _doris_cgroup_cpu_path = conf_path; + _doris_cgroup_cpu_query_path = tmp_query_path; + std::string query_path_msg = _is_cgroup_query_path_valid ? "cgroup query path is valid" + : "cgroup query path is not valid"; + _cpu_core_num = CpuInfo::num_cores(); + + std::string init_cg_v2_msg = ""; + if (_is_enable_cgroup_v2_in_env && _is_cgroup_query_path_valid) { + Status ret = init_cgroup_v2_query_path_public_file(_doris_cgroup_cpu_path, + _doris_cgroup_cpu_query_path); + if (!ret.ok()) { + init_cg_v2_msg = " write cgroup v2 file failed, err=" + ret.to_string_no_stack() + ". "; + } else { + init_cg_v2_msg = "write cgroup v2 public file succ."; + } + } + + LOG(INFO) << "[cgroup_init_path]init cgroup home path finish, home path=" + << _doris_cgroup_cpu_path << ", query path=" << _doris_cgroup_cpu_query_path << ", " + << cg_msg << ", " << query_path_msg << ", core_num=" << _cpu_core_num << ". " + << init_cg_v2_msg; +} + +Status CgroupCpuCtl::init_cgroup_v2_query_path_public_file(std::string home_path, + std::string query_path) { + // 1 enable cpu controller for home path's child + _doris_cgroup_cpu_path_subtree_ctl_file = home_path + "cgroup.subtree_control"; + if (access(_doris_cgroup_cpu_path_subtree_ctl_file.c_str(), F_OK) != 0) { + return Status::InternalError("not find cgroup v2 doris home's subtree control file"); } + RETURN_IF_ERROR(CgroupCpuCtl::write_cg_sys_file(_doris_cgroup_cpu_path_subtree_ctl_file, "+cpu", + "set cpu controller", false)); - if (access(_doris_cgroup_cpu_path.c_str(), F_OK) != 0) { - LOG(INFO) << "doris cgroup cpu path not exists, path=" << _doris_cgroup_cpu_path; - return Status::InternalError("doris cgroup cpu path {} not exists.", - _doris_cgroup_cpu_path); + // 2 enable cpu controller for query path's child + _cgroup_v2_query_path_subtree_ctl_file = query_path + "/cgroup.subtree_control"; + if (access(_cgroup_v2_query_path_subtree_ctl_file.c_str(), F_OK) != 0) { + return Status::InternalError("not find cgroup v2 query path's subtree control file"); } + RETURN_IF_ERROR(CgroupCpuCtl::write_cg_sys_file(_cgroup_v2_query_path_subtree_ctl_file, "+cpu", + "set cpu controller", false)); - if (_doris_cgroup_cpu_path.back() != '/') { - _doris_cgroup_cpu_path = _doris_cgroup_cpu_path + "/"; + // 3 write cgroup.procs + _doris_cg_v2_procs_file = query_path + "/cgroup.procs"; + if (access(_doris_cg_v2_procs_file.c_str(), F_OK) != 0) { + return Status::InternalError("not find cgroup v2 cgroup.procs file"); } + RETURN_IF_ERROR(CgroupCpuCtl::write_cg_sys_file(_doris_cg_v2_procs_file, + std::to_string(getpid()), + "set pid to cg v2 procs file", false)); return Status::OK(); } +uint64_t CgroupCpuCtl::cpu_soft_limit_default_value() { + return _is_enable_cgroup_v2_in_env ? 100 : 1024; +} + +std::unique_ptr CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) { + if (_is_enable_cgroup_v2_in_env) { + return std::make_unique(wg_id); + } else if (_is_enable_cgroup_v1_in_env) { + return std::make_unique(wg_id); + } + return nullptr; +} + void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit) { std::lock_guard w_lock(_lock_mutex); *cpu_shares = this->_cpu_shares; @@ -78,7 +199,7 @@ void CgroupCpuCtl::update_cpu_soft_limit(int cpu_shares) { } } -Status CgroupCpuCtl::write_cg_sys_file(std::string file_path, int value, std::string msg, +Status CgroupCpuCtl::write_cg_sys_file(std::string file_path, std::string value, std::string msg, bool is_append) { int fd = open(file_path.c_str(), is_append ? O_RDWR | O_APPEND : O_RDWR); if (fd == -1) { @@ -102,82 +223,7 @@ Status CgroupCpuCtl::write_cg_sys_file(std::string file_path, int value, std::st return Status::OK(); } -Status CgroupV1CpuCtl::init() { - RETURN_IF_ERROR(CgroupCpuCtl::init()); - - // query path - _cgroup_v1_cpu_query_path = _doris_cgroup_cpu_path + "query"; - if (access(_cgroup_v1_cpu_query_path.c_str(), F_OK) != 0) { - int ret = mkdir(_cgroup_v1_cpu_query_path.c_str(), S_IRWXU); - if (ret != 0) { - LOG(ERROR) << "cgroup v1 mkdir query failed, path=" << _cgroup_v1_cpu_query_path; - return Status::InternalError("cgroup v1 mkdir query failed, path={}", - _cgroup_v1_cpu_query_path); - } - } - - // check whether current user specified path is a valid cgroup path - std::string query_path_tasks = _cgroup_v1_cpu_query_path + "/tasks"; - std::string query_path_cpu_shares = _cgroup_v1_cpu_query_path + "/cpu.shares"; - std::string query_path_quota = _cgroup_v1_cpu_query_path + "/cpu.cfs_quota_us"; - if (access(query_path_tasks.c_str(), F_OK) != 0) { - return Status::InternalError("invalid cgroup path, not find task file"); - } - if (access(query_path_cpu_shares.c_str(), F_OK) != 0) { - return Status::InternalError("invalid cgroup path, not find cpu share file"); - } - if (access(query_path_quota.c_str(), F_OK) != 0) { - return Status::InternalError("invalid cgroup path, not find cpu quota file"); - } - - if (_wg_id == -1) { - // means current cgroup cpu ctl is just used to clear dir, - // it does not contains workload group. - // todo(wb) rethinking whether need to refactor cgroup_cpu_ctl - _init_succ = true; - LOG(INFO) << "init cgroup cpu query path succ, path=" << _cgroup_v1_cpu_query_path; - return Status::OK(); - } - - // workload group path - _cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + std::to_string(_wg_id); - if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) { - int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU); - if (ret != 0) { - LOG(ERROR) << "cgroup v1 mkdir workload group failed, path=" << _cgroup_v1_cpu_tg_path; - return Status::InternalError("cgroup v1 mkdir workload group failed, path=", - _cgroup_v1_cpu_tg_path); - } - } - - // quota file - _cgroup_v1_cpu_tg_quota_file = _cgroup_v1_cpu_tg_path + "/cpu.cfs_quota_us"; - // cpu.shares file - _cgroup_v1_cpu_tg_shares_file = _cgroup_v1_cpu_tg_path + "/cpu.shares"; - // task file - _cgroup_v1_cpu_tg_task_file = _cgroup_v1_cpu_tg_path + "/tasks"; - LOG(INFO) << "cgroup v1 cpu path init success" - << ", query tg path=" << _cgroup_v1_cpu_tg_path - << ", query tg quota file path=" << _cgroup_v1_cpu_tg_quota_file - << ", query tg tasks file path=" << _cgroup_v1_cpu_tg_task_file - << ", core num=" << _cpu_core_num; - _init_succ = true; - return Status::OK(); -} - -Status CgroupV1CpuCtl::modify_cg_cpu_soft_limit_no_lock(int cpu_shares) { - std::string msg = "modify cpu shares to " + std::to_string(cpu_shares); - return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_shares_file, cpu_shares, msg, false); -} - -Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) { - int val = cpu_hard_limit > 0 ? (_cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100) - : CGROUP_CPU_HARD_LIMIT_DEFAULT_VALUE; - std::string msg = "modify cpu quota value to " + std::to_string(val); - return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_quota_file, val, msg, false); -} - -Status CgroupV1CpuCtl::add_thread_to_cgroup() { +Status CgroupCpuCtl::add_thread_to_cgroup(std::string task_path) { if (!_init_succ) { return Status::OK(); } @@ -189,18 +235,17 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() { std::string msg = "add thread " + std::to_string(tid) + " to group" + " " + std::to_string(_wg_id); std::lock_guard w_lock(_lock_mutex); - return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, msg, true); + return CgroupCpuCtl::write_cg_sys_file(task_path, std::to_string(tid), msg, true); #endif } -Status CgroupV1CpuCtl::delete_unused_cgroup_path(std::set& used_wg_ids) { - if (!_init_succ) { - return Status::InternalError( - "cgroup cpu ctl init failed, delete can not be executed"); +Status CgroupCpuCtl::delete_unused_cgroup_path(std::set& used_wg_ids) { + if (!_is_cgroup_query_path_valid) { + return Status::InternalError("not find a valid cgroup query path"); } // 1 get unused wg id std::set unused_wg_ids; - for (const auto& entry : std::filesystem::directory_iterator(_cgroup_v1_cpu_query_path)) { + for (const auto& entry : std::filesystem::directory_iterator(_doris_cgroup_cpu_query_path)) { const std::string dir_name = entry.path().string(); struct stat st; // == 0 means exists @@ -222,9 +267,9 @@ Status CgroupV1CpuCtl::delete_unused_cgroup_path(std::set& used_wg_ids // 2 delete unused cgroup path int failed_count = 0; - std::string query_path = _cgroup_v1_cpu_query_path.back() != '/' - ? _cgroup_v1_cpu_query_path + "/" - : _cgroup_v1_cpu_query_path; + std::string query_path = _doris_cgroup_cpu_query_path.back() != '/' + ? _doris_cgroup_cpu_query_path + "/" + : _doris_cgroup_cpu_query_path; for (const std::string& unused_wg_id : unused_wg_ids) { std::string wg_path = query_path + unused_wg_id; int ret = rmdir(wg_path.c_str()); @@ -240,4 +285,138 @@ Status CgroupV1CpuCtl::delete_unused_cgroup_path(std::set& used_wg_ids return Status::OK(); } +Status CgroupV1CpuCtl::init() { + if (!_is_cgroup_query_path_valid) { + return Status::InternalError("cgroup query path is not valid"); + } + + if (_wg_id <= 0) { + return Status::InternalError("find an invalid wg_id {}", _wg_id); + } + + // workload group path + _cgroup_v1_cpu_tg_path = _doris_cgroup_cpu_query_path + "/" + std::to_string(_wg_id); + if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) { + int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU); + if (ret != 0) { + LOG(ERROR) << "cgroup v1 mkdir workload group failed, path=" << _cgroup_v1_cpu_tg_path; + return Status::InternalError("cgroup v1 mkdir workload group failed, path={}", + _cgroup_v1_cpu_tg_path); + } + } + + _cgroup_v1_cpu_tg_quota_file = _cgroup_v1_cpu_tg_path + "/cpu.cfs_quota_us"; + if (access(_cgroup_v1_cpu_tg_quota_file.c_str(), F_OK) != 0) { + return Status::InternalError("not find cgroup v1 cpu.cfs_quota_us file"); + } + _cgroup_v1_cpu_tg_shares_file = _cgroup_v1_cpu_tg_path + "/cpu.shares"; + if (access(_cgroup_v1_cpu_tg_shares_file.c_str(), F_OK) != 0) { + return Status::InternalError("not find cgroup v1 cpu.shares file"); + } + _cgroup_v1_cpu_tg_task_file = _cgroup_v1_cpu_tg_path + "/tasks"; + if (access(_cgroup_v1_cpu_tg_task_file.c_str(), F_OK) != 0) { + return Status::InternalError("not find cgroup v1 cpu.shares file"); + } + LOG(INFO) << "cgroup v1 cpu path init success" + << ", query tg path=" << _cgroup_v1_cpu_tg_path + << ", query wg quota file path=" << _cgroup_v1_cpu_tg_quota_file + << ", query wg share file path=" << _cgroup_v1_cpu_tg_shares_file + << ", query wg tasks file path=" << _cgroup_v1_cpu_tg_task_file + << ", core num=" << _cpu_core_num; + _init_succ = true; + return Status::OK(); +} + +Status CgroupV1CpuCtl::modify_cg_cpu_soft_limit_no_lock(int cpu_shares) { + std::string cpu_share_str = std::to_string(cpu_shares); + std::string msg = "modify cpu shares to " + cpu_share_str; + return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_shares_file, cpu_share_str, msg, + false); +} + +Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) { + int val = cpu_hard_limit > 0 ? (_cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100) + : CGROUP_CPU_HARD_LIMIT_DEFAULT_VALUE; + std::string str_val = std::to_string(val); + std::string msg = "modify cpu quota value to " + str_val; + return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_quota_file, str_val, msg, false); +} + +Status CgroupV1CpuCtl::add_thread_to_cgroup() { + return CgroupCpuCtl::add_thread_to_cgroup(_cgroup_v1_cpu_tg_task_file); +} + +Status CgroupV2CpuCtl::init() { + if (!_is_cgroup_query_path_valid) { + return Status::InternalError(" cgroup query path is empty"); + } + + if (_wg_id <= 0) { + return Status::InternalError("find an invalid wg_id {}", _wg_id); + } + + // wg path + _cgroup_v2_query_wg_path = _doris_cgroup_cpu_query_path + "/" + std::to_string(_wg_id); + if (access(_cgroup_v2_query_wg_path.c_str(), F_OK) != 0) { + int ret = mkdir(_cgroup_v2_query_wg_path.c_str(), S_IRWXU); + if (ret != 0) { + return Status::InternalError("cgroup v2 mkdir wg failed, path={}", + _cgroup_v2_query_wg_path); + } + } + + _cgroup_v2_query_wg_cpu_max_file = _cgroup_v2_query_wg_path + "/cpu.max"; + if (access(_cgroup_v2_query_wg_cpu_max_file.c_str(), F_OK) != 0) { + return Status::InternalError("not find cgroup v2 wg cpu.max file"); + } + + _cgroup_v2_query_wg_cpu_weight_file = _cgroup_v2_query_wg_path + "/cpu.weight"; + if (access(_cgroup_v2_query_wg_cpu_weight_file.c_str(), F_OK) != 0) { + return Status::InternalError("not find cgroup v2 wg cpu.weight file"); + } + + _cgroup_v2_query_wg_thread_file = _cgroup_v2_query_wg_path + "/cgroup.threads"; + if (access(_cgroup_v2_query_wg_thread_file.c_str(), F_OK) != 0) { + return Status::InternalError("not find cgroup v2 wg cgroup.threads file"); + } + + _cgroup_v2_query_wg_type_file = _cgroup_v2_query_wg_path + "/cgroup.type"; + if (access(_cgroup_v2_query_wg_type_file.c_str(), F_OK) != 0) { + return Status::InternalError("not find cgroup v2 wg cgroup.type file"); + } + RETURN_IF_ERROR(CgroupCpuCtl::write_cg_sys_file(_cgroup_v2_query_wg_type_file, "threaded", + "set cgroup type", false)); + + LOG(INFO) << "cgroup v2 cpu path init success" + << ", query wg path=" << _cgroup_v2_query_wg_path + << ", cpu.max file = " << _cgroup_v2_query_wg_cpu_max_file + << ", cgroup.threads file = " << _cgroup_v2_query_wg_thread_file + << ", core num=" << _cpu_core_num; + _init_succ = true; + return Status::OK(); +} + +Status CgroupV2CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) { + std::string value = ""; + if (cpu_hard_limit > 0) { + uint64_t int_val = _cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100; + value = std::to_string(int_val) + " 100000"; + } else { + value = CGROUP_V2_CPU_HARD_LIMIT_DEFAULT_VALUE; + } + std::string msg = "modify cpu.max to [" + value + "]"; + return CgroupCpuCtl::write_cg_sys_file(_cgroup_v2_query_wg_cpu_max_file, value, msg, false); +} + +Status CgroupV2CpuCtl::modify_cg_cpu_soft_limit_no_lock(int cpu_weight) { + std::string cpu_weight_str = std::to_string(cpu_weight); + std::string msg = "modify cpu.weight to " + cpu_weight_str; + return CgroupCpuCtl::write_cg_sys_file(_cgroup_v2_query_wg_cpu_weight_file, cpu_weight_str, msg, + false); +} + +Status CgroupV2CpuCtl::add_thread_to_cgroup() { + return CgroupCpuCtl::add_thread_to_cgroup(_cgroup_v2_query_wg_thread_file); +} + } // namespace doris diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h index b5f8d2d5d80e67..84e191159f15f1 100644 --- a/be/src/agent/cgroup_cpu_ctl.h +++ b/be/src/agent/cgroup_cpu_ctl.h @@ -30,14 +30,14 @@ namespace doris { // cgroup cpu.cfs_quota_us default value, it means disable cpu hard limit const static int CGROUP_CPU_HARD_LIMIT_DEFAULT_VALUE = -1; +const static std::string CGROUP_V2_CPU_HARD_LIMIT_DEFAULT_VALUE = "max 100000"; class CgroupCpuCtl { public: virtual ~CgroupCpuCtl() = default; - CgroupCpuCtl() = default; CgroupCpuCtl(uint64_t wg_id) { _wg_id = wg_id; } - virtual Status init(); + virtual Status init() = 0; virtual Status add_thread_to_cgroup() = 0; @@ -48,18 +48,44 @@ class CgroupCpuCtl { // for log void get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit); - virtual Status delete_unused_cgroup_path(std::set& used_wg_ids) = 0; + static void init_doris_cgroup_path(); -protected: - Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append); + static Status delete_unused_cgroup_path(std::set& used_wg_ids); + + static std::unique_ptr create_cgroup_cpu_ctl(uint64_t wg_id); + + static bool is_a_valid_cgroup_path(std::string cg_path); + static uint64_t cpu_soft_limit_default_value(); + +protected: virtual Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) = 0; virtual Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) = 0; - std::string _doris_cgroup_cpu_path; - uint64_t _cpu_core_num = CpuInfo::num_cores(); - uint64_t _cpu_cfs_period_us = 100000; + Status add_thread_to_cgroup(std::string task_file); + + static Status write_cg_sys_file(std::string file_path, std::string value, std::string msg, + bool is_append); + + static Status init_cgroup_v2_query_path_public_file(std::string home_path, + std::string query_path); + +protected: + inline static uint64_t _cpu_core_num; + const static uint64_t _cpu_cfs_period_us = 100000; + inline static std::string _doris_cgroup_cpu_path = ""; + inline static std::string _doris_cgroup_cpu_query_path = ""; + inline static bool _is_enable_cgroup_v1_in_env = false; + inline static bool _is_enable_cgroup_v2_in_env = false; + inline static bool _is_cgroup_query_path_valid = false; + + // cgroup v2 public file + inline static std::string _doris_cgroup_cpu_path_subtree_ctl_file = ""; + inline static std::string _cgroup_v2_query_path_subtree_ctl_file = ""; + inline static std::string _doris_cg_v2_procs_file = ""; + +protected: int _cpu_hard_limit = 0; std::shared_mutex _lock_mutex; bool _init_succ = false; @@ -96,20 +122,65 @@ class CgroupCpuCtl { class CgroupV1CpuCtl : public CgroupCpuCtl { public: CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {} - CgroupV1CpuCtl() = default; Status init() override; Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override; Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override; Status add_thread_to_cgroup() override; - Status delete_unused_cgroup_path(std::set& used_wg_ids) override; - private: - std::string _cgroup_v1_cpu_query_path; std::string _cgroup_v1_cpu_tg_path; // workload group path std::string _cgroup_v1_cpu_tg_quota_file; std::string _cgroup_v1_cpu_tg_shares_file; std::string _cgroup_v1_cpu_tg_task_file; }; +/* + NOTE: cgroup v2 directory structure + 1 root path: + /sys/fs/cgroup + + 2 doris home path: + /sys/fs/cgroup/{doris_home}/ + + 3 doris home subtree_control file: + /sys/fs/cgroup/{doris_home}/cgroup.subtree_control + + 4 query path: + /sys/fs/cgroup/{doris_home}/query/ + + 5 query path subtree_control file: + /sys/fs/cgroup/{doris_home}/query/cgroup.subtree_control + + 6 query path procs file: + /sys/fs/cgroup/{doris_home}/query/cgroup.procs + + 7 workload group path: + /sys/fs/cgroup/{doris_home}/query/{workload_group_id} + + 8 workload grou cpu.max file: + /sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cpu.max + + 9 workload grou cpu.weight file: + /sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cpu.weight + + 10 workload group cgroup type file: + /sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cgroup.type + +*/ +class CgroupV2CpuCtl : public CgroupCpuCtl { +public: + CgroupV2CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {} + Status init() override; + Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override; + Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override; + Status add_thread_to_cgroup() override; + +private: + std::string _cgroup_v2_query_wg_path; + std::string _cgroup_v2_query_wg_cpu_max_file; + std::string _cgroup_v2_query_wg_cpu_weight_file; + std::string _cgroup_v2_query_wg_thread_file; + std::string _cgroup_v2_query_wg_type_file; +}; + } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 18061e04528c05..3d4a122bda2bc3 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -80,7 +80,7 @@ class MemTracker; class StorageEngine; class ResultBufferMgr; class ResultQueueMgr; -class RuntimeQueryStatiticsMgr; +class RuntimeQueryStatisticsMgr; class TMasterInfo; class LoadChannelMgr; class LoadStreamMgr; @@ -162,7 +162,7 @@ class ExecEnv { pipeline::TaskScheduler* pipeline_task_scheduler() { return _without_group_task_scheduler; } WorkloadGroupMgr* workload_group_mgr() { return _workload_group_manager; } WorkloadSchedPolicyMgr* workload_sched_policy_mgr() { return _workload_sched_mgr; } - RuntimeQueryStatiticsMgr* runtime_query_statistics_mgr() { + RuntimeQueryStatisticsMgr* runtime_query_statistics_mgr() { return _runtime_query_statistics_mgr; } @@ -458,7 +458,7 @@ class ExecEnv { WorkloadSchedPolicyMgr* _workload_sched_mgr = nullptr; - RuntimeQueryStatiticsMgr* _runtime_query_statistics_mgr = nullptr; + RuntimeQueryStatisticsMgr* _runtime_query_statistics_mgr = nullptr; std::unique_ptr _pipeline_tracer_ctx; std::unique_ptr _tmp_file_dirs; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 84d0684f17d8db..12d0b3bf9994af 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -211,7 +211,8 @@ Status ExecEnv::_init(const std::vector& store_paths, // NOTE: runtime query statistics mgr could be visited by query and daemon thread // so it should be created before all query begin and deleted after all query and daemon thread stoppped - _runtime_query_statistics_mgr = new RuntimeQueryStatiticsMgr(); + _runtime_query_statistics_mgr = new RuntimeQueryStatisticsMgr(); + CgroupCpuCtl::init_doris_cgroup_path(); std::vector cache_paths; init_file_cache_factory(cache_paths); diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 95b6ee907e60ad..1d9bb34d09d7fe 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -35,10 +35,10 @@ void QueryStatisticsCtx::collect_query_statistics(TQueryStatistics* tq_s) { tq_s->__set_workload_group_id(_wg_id); } -void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id, - std::shared_ptr qs_ptr, - TNetworkAddress fe_addr, - TQueryType::type query_type) { +void RuntimeQueryStatisticsMgr::register_query_statistics(std::string query_id, + std::shared_ptr qs_ptr, + TNetworkAddress fe_addr, + TQueryType::type query_type) { std::lock_guard write_lock(_qs_ctx_map_lock); if (_query_statistics_ctx_map.find(query_id) == _query_statistics_ctx_map.end()) { _query_statistics_ctx_map[query_id] = @@ -47,7 +47,7 @@ void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id, _query_statistics_ctx_map.at(query_id)->_qs_list.push_back(qs_ptr); } -void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { +void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() { int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; // 1 get query statistics map std::map> fe_qs_map; @@ -166,7 +166,7 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { } } -void RuntimeQueryStatiticsMgr::set_query_finished(std::string query_id) { +void RuntimeQueryStatisticsMgr::set_query_finished(std::string query_id) { // NOTE: here must be a write lock std::lock_guard write_lock(_qs_ctx_map_lock); // when a query get query_ctx succ, but failed before create node/operator, @@ -178,7 +178,7 @@ void RuntimeQueryStatiticsMgr::set_query_finished(std::string query_id) { } } -std::shared_ptr RuntimeQueryStatiticsMgr::get_runtime_query_statistics( +std::shared_ptr RuntimeQueryStatisticsMgr::get_runtime_query_statistics( std::string query_id) { std::shared_lock read_lock(_qs_ctx_map_lock); if (_query_statistics_ctx_map.find(query_id) == _query_statistics_ctx_map.end()) { @@ -191,7 +191,7 @@ std::shared_ptr RuntimeQueryStatiticsMgr::get_runtime_query_sta return qs_ptr; } -void RuntimeQueryStatiticsMgr::get_metric_map( +void RuntimeQueryStatisticsMgr::get_metric_map( std::string query_id, std::map& metric_map) { QueryStatistics ret_qs; int64_t query_time_ms = 0; @@ -212,7 +212,7 @@ void RuntimeQueryStatiticsMgr::get_metric_map( std::to_string(ret_qs.get_current_used_memory_bytes())); } -void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64_t wg_id) { +void RuntimeQueryStatisticsMgr::set_workload_group_id(std::string query_id, int64_t wg_id) { // wg id just need eventual consistency, read lock is ok std::shared_lock read_lock(_qs_ctx_map_lock); if (_query_statistics_ctx_map.find(query_id) != _query_statistics_ctx_map.end()) { @@ -220,7 +220,7 @@ void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64 } } -void RuntimeQueryStatiticsMgr::get_active_be_tasks_block(vectorized::Block* block) { +void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* block) { std::shared_lock read_lock(_qs_ctx_map_lock); int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index f9a21abf1bb649..a0c187d3ef5f62 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -54,10 +54,10 @@ class QueryStatisticsCtx { int64_t _query_start_time; }; -class RuntimeQueryStatiticsMgr { +class RuntimeQueryStatisticsMgr { public: - RuntimeQueryStatiticsMgr() = default; - ~RuntimeQueryStatiticsMgr() = default; + RuntimeQueryStatisticsMgr() = default; + ~RuntimeQueryStatisticsMgr() = default; void register_query_statistics(std::string query_id, std::shared_ptr qs_ptr, TNetworkAddress fe_addr, TQueryType::type query_type); diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 8b0985d4ecf380..971750eb097aa1 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -44,11 +44,9 @@ namespace doris { -const static uint64_t CPU_SHARE_DEFAULT_VALUE = 1024; const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%"; const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true; const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1; -const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024; const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50; const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80; @@ -329,7 +327,7 @@ Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_g workload_group_info->version = version; // 4 cpu_share - uint64_t cpu_share = CPU_SHARE_DEFAULT_VALUE; + uint64_t cpu_share = CgroupCpuCtl::cpu_soft_limit_default_value(); if (tworkload_group_info.__isset.cpu_share) { cpu_share = tworkload_group_info.cpu_share; } @@ -433,14 +431,18 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e std::lock_guard wlock(_task_sched_lock); if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) { - std::unique_ptr cgroup_cpu_ctl = std::make_unique(tg_id); - Status ret = cgroup_cpu_ctl->init(); - if (ret.ok()) { - _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl); - LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id; + std::unique_ptr cgroup_cpu_ctl = CgroupCpuCtl::create_cgroup_cpu_ctl(tg_id); + if (cgroup_cpu_ctl) { + Status ret = cgroup_cpu_ctl->init(); + if (ret.ok()) { + _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl); + LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id; + } else { + LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id=" << tg_id + << ", reason=" << ret.to_string(); + } } else { - LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id= " << tg_id - << ", reason=" << ret.to_string(); + LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl for " << tg_id << " failed"; } } @@ -533,7 +535,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e if (enable_cpu_hard_limit) { if (cpu_hard_limit > 0) { _cgroup_cpu_ctl->update_cpu_hard_limit(cpu_hard_limit); - _cgroup_cpu_ctl->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE); + _cgroup_cpu_ctl->update_cpu_soft_limit( + CgroupCpuCtl::cpu_soft_limit_default_value()); } else { LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal: " << cpu_hard_limit << ", gid=" << tg_id; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 6df393e19e7136..145754dd357658 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -92,7 +92,8 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set used_wg_i } // wg is shutdown and running rum = 0, its resource can be released in BE if (workload_group_ptr->can_be_dropped()) { - LOG(INFO) << "[topic_publish_wg]There is no query in wg" << wg_id << ", delete it."; + LOG(INFO) << "[topic_publish_wg]There is no query in wg " << wg_id + << ", delete it."; deleted_task_groups.push_back(workload_group_ptr); } } @@ -121,30 +122,16 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set used_wg_i // Using cgdelete has no such issue. { if (config::doris_cgroup_cpu_path != "") { - std::lock_guard write_lock(_init_cg_ctl_lock); - if (!_cg_cpu_ctl) { - _cg_cpu_ctl = std::make_unique(); - } - if (!_is_init_succ) { - Status ret = _cg_cpu_ctl->init(); - if (ret.ok()) { - _is_init_succ = true; - } else { - LOG(INFO) << "[topic_publish_wg]init workload group mgr cpu ctl failed, " - << ret.to_string(); - } - } - if (_is_init_succ) { - Status ret = _cg_cpu_ctl->delete_unused_cgroup_path(used_wg_id); - if (!ret.ok()) { - LOG(WARNING) << "[topic_publish_wg]" << ret.to_string(); - } + std::lock_guard write_lock(_clear_cgroup_lock); + Status ret = CgroupCpuCtl::delete_unused_cgroup_path(used_wg_id); + if (!ret.ok()) { + LOG(WARNING) << "[topic_publish_wg]" << ret.to_string(); } } } int64_t time_cost_ms = MonotonicMillis() - begin_time; LOG(INFO) << "[topic_publish_wg]finish clear unused workload group, time cost: " << time_cost_ms - << "ms, deleted group size:" << deleted_task_groups.size() + << " ms, deleted group size:" << deleted_task_groups.size() << ", before wg size=" << old_wg_size << ", after wg size=" << new_wg_size; } diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index 15740d061adc94..d8547c3383e219 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -66,9 +66,7 @@ class WorkloadGroupMgr { std::shared_mutex _group_mutex; std::unordered_map _workload_groups; - std::shared_mutex _init_cg_ctl_lock; - std::unique_ptr _cg_cpu_ctl; - bool _is_init_succ = false; + std::shared_mutex _clear_cgroup_lock; }; } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index cab8045e98c208..aa9bed42d7d125 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -479,7 +479,7 @@ public void getProcNodeData(BaseProcResult result, QueryQueue qq) { row.add(val + "%"); } } else if (CPU_SHARE.equals(key) && !properties.containsKey(key)) { - row.add("1024"); + row.add("-1"); } else if (MEMORY_LIMIT.equals(key) && !properties.containsKey(key)) { row.add("0%"); } else if (ENABLE_MEMORY_OVERCOMMIT.equals(key) && !properties.containsKey(key)) { diff --git a/regression-test/data/workload_manager_p0/test_curd_wlg.out b/regression-test/data/workload_manager_p0/test_curd_wlg.out index 5d9629aa440367..0914cd53ea4d97 100644 --- a/regression-test/data/workload_manager_p0/test_curd_wlg.out +++ b/regression-test/data/workload_manager_p0/test_curd_wlg.out @@ -54,16 +54,16 @@ normal 20 50% true 2147483647 0 0 1% 16 test_group 10 11% false 100 0 0 20% -1 -- !show_spill_1 -- -spill_group_test 1024 0% true 2147483647 0 0 -1 -1 10% 10% +spill_group_test -1 0% true 2147483647 0 0 -1 -1 10% 10% -- !show_spill_1 -- -spill_group_test 1024 0% true 2147483647 0 0 -1 -1 -1 10% +spill_group_test -1 0% true 2147483647 0 0 -1 -1 -1 10% -- !show_spill_2 -- -spill_group_test 1024 0% true 2147483647 0 0 -1 -1 5% 10% +spill_group_test -1 0% true 2147483647 0 0 -1 -1 5% 10% -- !show_spill_3 -- -spill_group_test 1024 0% true 2147483647 0 0 -1 -1 5% 40% +spill_group_test -1 0% true 2147483647 0 0 -1 -1 5% 40% -- !show_wg_tag -- tag1_mem_wg1 50% -1 mem_tag1