diff --git a/src/client.cc b/src/client.cc index 890ec6164..3a3883f68 100644 --- a/src/client.cc +++ b/src/client.cc @@ -17,7 +17,11 @@ #include "base_cmd.h" #include "config.h" +#include "env.h" #include "pikiwidb.h" +#include "pstd_string.h" +#include "slow_log.h" +#include "store.h" namespace pikiwidb { @@ -353,7 +357,8 @@ int PClient::handlePacket(const char* start, int bytes) { // executeCommand(); // return static_cast(ptr - start); // } - + auto now = std::chrono::steady_clock::now(); + time_stat_->SetEnqueueTs(now); g_pikiwidb->SubmitFast(std::make_shared(shared_from_this())); // check transaction @@ -424,6 +429,7 @@ PClient* PClient::Current() { return s_current; } PClient::PClient() : parser_(params_) { auth_ = false; reset(); + time_stat_.reset(new TimeStat()); } void PClient::OnConnect() { @@ -665,4 +671,8 @@ void PClient::SetKey(std::vector& names) { keys_ = std::move(names); // use std::move clear copy expense } +std::unordered_map* PClient::GetCommandStatMap() { return &cmdstat_map_; } + +std::shared_ptr PClient::GetTimeStat() { return time_stat_; } + } // namespace pikiwidb diff --git a/src/client.h b/src/client.h index 6cbcb7c8d..73f97a760 100644 --- a/src/client.h +++ b/src/client.h @@ -7,6 +7,7 @@ #pragma once +#include #include #include #include @@ -21,6 +22,41 @@ namespace pikiwidb { +struct CommandStatistics { + CommandStatistics() = default; + CommandStatistics(const CommandStatistics& other) + : cmd_count_(other.cmd_count_.load()), cmd_time_consuming_(other.cmd_time_consuming_.load()) {} + + std::atomic cmd_count_ = 0; + std::atomic cmd_time_consuming_ = 0; +}; + +struct TimeStat { + using TimePoint = std::chrono::time_point; + + TimeStat() = default; + + void Reset() { + enqueue_ts_ = TimePoint::min(); + dequeue_ts_ = TimePoint::min(); + process_done_ts_ = TimePoint::min(); + } + + uint64_t GetTotalTime() const { + return (process_done_ts_ > enqueue_ts_) + ? std::chrono::duration_cast(process_done_ts_ - enqueue_ts_).count() + : 0; + } + + void SetEnqueueTs(TimePoint now_time) { enqueue_ts_ = now_time; } + void SetDequeueTs(TimePoint now_time) { dequeue_ts_ = now_time; } + void SetProcessDoneTs(TimePoint now_time) { process_done_ts_ = now_time; } + + TimePoint enqueue_ts_ = TimePoint::min(); + TimePoint dequeue_ts_ = TimePoint::min(); + TimePoint process_done_ts_ = TimePoint::min(); +}; + class CmdRes { public: enum CmdRet { @@ -236,6 +272,10 @@ class PClient : public std::enable_shared_from_this, public CmdRes { // e.g:["set","key","value"] std::span argv_; + // Info Commandstats used + std::unordered_map* GetCommandStatMap(); + std::shared_ptr GetTimeStat(); + // std::shared_ptr getTcpConnection() const { return tcp_connection_.lock(); } int handlePacket(const char*, int); @@ -291,5 +331,11 @@ class PClient : public std::enable_shared_from_this, public CmdRes { net::SocketAddr addr_; static thread_local PClient* s_current; + + /* + * Info Commandstats used + */ + std::unordered_map cmdstat_map_; + std::shared_ptr time_stat_; }; } // namespace pikiwidb diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc index 0964bd821..d933c580f 100644 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -5,12 +5,19 @@ * of patent rights can be found in the PATENTS file in the same directory. */ -#include "cmd_admin.h" +#include +#include +#include +#include +#include +#include + #include #include #include #include #include +#include "cmd_admin.h" #include "db.h" #include "braft/raft.h" @@ -21,6 +28,8 @@ #include "praft/praft.h" #include "pstd/env.h" +#include "cmd_table_manager.h" +#include "slow_log.h" #include "store.h" namespace pikiwidb { @@ -144,24 +153,90 @@ bool PingCmd::DoInitial(PClient* client) { return true; } void PingCmd::DoCmd(PClient* client) { client->SetRes(CmdRes::kPong, "PONG"); } -InfoCmd::InfoCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsAdmin, kAclCategoryAdmin) {} +const std::string InfoCmd::kInfoSection = "info"; +const std::string InfoCmd::kAllSection = "all"; +const std::string InfoCmd::kServerSection = "server"; +const std::string InfoCmd::kStatsSection = "stats"; +const std::string InfoCmd::kCPUSection = "cpu"; +const std::string InfoCmd::kDataSection = "data"; +const std::string InfoCmd::kCommandStatsSection = "commandstats"; +const std::string InfoCmd::kRaftSection = "raft"; -bool InfoCmd::DoInitial(PClient* client) { return true; } +InfoCmd::InfoCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsAdmin, kAclCategoryAdmin) {} -// @todo The info raft command is only supported for the time being -void InfoCmd::DoCmd(PClient* client) { - if (client->argv_.size() <= 1) { - return client->SetRes(CmdRes::kWrongNum, client->CmdName()); +bool InfoCmd::DoInitial(PClient* client) { + size_t argc = client->argv_.size(); + if (argc == 1) { + info_section_ = kInfo; + return true; } - auto cmd = client->argv_[1]; - if (!strcasecmp(cmd.c_str(), "RAFT")) { - InfoRaft(client); - } else if (!strcasecmp(cmd.c_str(), "data")) { - InfoData(client); + std::string argv_ = client->argv_[1].data(); + // convert section to lowercase + std::transform(argv_.begin(), argv_.end(), argv_.begin(), [](unsigned char c) { return std::tolower(c); }); + if (argc == 2) { + auto it = sectionMap.find(argv_); + if (it != sectionMap.end()) { + info_section_ = it->second; + } else { + client->SetRes(CmdRes::kErrOther, "the cmd is not supported"); + return false; + } } else { - client->SetRes(CmdRes::kErrOther, "the cmd is not supported"); + client->SetRes(CmdRes::kSyntaxErr); + return false; + } + return true; +} + +void InfoCmd::DoCmd(PClient* client) { + std::string info; + switch (info_section_) { + case kInfo: + InfoServer(info); + info.append("\r\n"); + InfoData(info); + info.append("\r\n"); + InfoStats(info); + info.append("\r\n"); + InfoCPU(info); + info.append("\r\n"); + break; + case kInfoAll: + InfoServer(info); + info.append("\r\n"); + InfoData(info); + info.append("\r\n"); + InfoStats(info); + info.append("\r\n"); + InfoCommandStats(client, info); + info.append("\r\n"); + InfoCPU(info); + info.append("\r\n"); + break; + case kInfoServer: + InfoServer(info); + break; + case kInfoStats: + InfoStats(info); + break; + case kInfoCPU: + InfoCPU(info); + break; + case kInfoData: + InfoData(info); + break; + case kInfoCommandStats: + InfoCommandStats(client, info); + break; + case kInfoRaft: + InfoRaft(info); + break; + default: + break; } + + client->AppendString(info); } /* @@ -178,60 +253,154 @@ void InfoCmd::DoCmd(PClient* client) { raft_num_voting_nodes:2 raft_node1:id=1733428433,state=connected,voting=yes,addr=localhost,port=5001,last_conn_secs=5,conn_errors=0,conn_oks=1 */ -void InfoCmd::InfoRaft(PClient* client) { - if (client->argv_.size() != 2) { - return client->SetRes(CmdRes::kWrongNum, client->CmdName()); - } - +void InfoCmd::InfoRaft(std::string& message) { if (!PRAFT.IsInitialized()) { - return client->SetRes(CmdRes::kErrOther, "Don't already cluster member"); + message += "-ERR Not a cluster member.\r\n"; + return; } auto node_status = PRAFT.GetNodeStatus(); if (node_status.state == braft::State::STATE_END) { - return client->SetRes(CmdRes::kErrOther, "Node is not initialized"); + message += "-ERR Node is not initialized.\r\n"; + return; } - std::string message; - message += "raft_group_id:" + PRAFT.GetGroupID() + "\r\n"; - message += "raft_node_id:" + PRAFT.GetNodeID() + "\r\n"; - message += "raft_peer_id:" + PRAFT.GetPeerID() + "\r\n"; + std::stringstream tmp_stream; + + tmp_stream << "raft_group_id:" << PRAFT.GetGroupID() << "\r\n"; + tmp_stream << "raft_node_id:" << PRAFT.GetNodeID() << "\r\n"; + tmp_stream << "raft_peer_id:" << PRAFT.GetPeerID() << "\r\n"; if (braft::is_active_state(node_status.state)) { - message += "raft_state:up\r\n"; + tmp_stream << "raft_state:up\r\n"; } else { - message += "raft_state:down\r\n"; + tmp_stream << "raft_state:down\r\n"; } - message += "raft_role:" + std::string(braft::state2str(node_status.state)) + "\r\n"; - message += "raft_leader_id:" + node_status.leader_id.to_string() + "\r\n"; - message += "raft_current_term:" + std::to_string(node_status.term) + "\r\n"; + tmp_stream << "raft_role:" << std::string(braft::state2str(node_status.state)) << "\r\n"; + tmp_stream << "raft_leader_id:" << node_status.leader_id.to_string() << "\r\n"; + tmp_stream << "raft_current_term:" << std::to_string(node_status.term) << "\r\n"; if (PRAFT.IsLeader()) { std::vector peers; auto status = PRAFT.GetListPeers(&peers); if (!status.ok()) { - return client->SetRes(CmdRes::kErrOther, status.error_str()); + tmp_stream.str("-ERR "); + tmp_stream << status.error_str() << "\r\n"; + return; } for (int i = 0; i < peers.size(); i++) { - message += "raft_node" + std::to_string(i) + ":addr=" + butil::ip2str(peers[i].addr.ip).c_str() + - ",port=" + std::to_string(peers[i].addr.port) + "\r\n"; + tmp_stream << "raft_node" << std::to_string(i) << ":addr=" << butil::ip2str(peers[i].addr.ip).c_str() + << ",port=" << std::to_string(peers[i].addr.port) << "\r\n"; } } - client->AppendString(message); + message.append(tmp_stream.str()); } -void InfoCmd::InfoData(PClient* client) { - if (client->argv_.size() != 2) { - return client->SetRes(CmdRes::kWrongNum, client->CmdName()); +void InfoCmd::InfoServer(std::string& info) { + static struct utsname host_info; + static bool host_info_valid = false; + if (!host_info_valid) { + uname(&host_info); + host_info_valid = true; } - std::string message; + time_t current_time_s = time(nullptr); + std::stringstream tmp_stream; + char version[32]; + snprintf(version, sizeof(version), "%s", KPIKIWIDB_VERSION); + + tmp_stream << "# Server\r\n"; + tmp_stream << "PikiwiDB_version:" << version << "\r\n"; + tmp_stream << "PikiwiDB_build_git_sha:" << KPIKIWIDB_GIT_COMMIT_ID << "\r\n"; + tmp_stream << "Pikiwidb_build_compile_date: " << KPIKIWIDB_BUILD_DATE << "\r\n"; + tmp_stream << "os:" << host_info.sysname << " " << host_info.release << " " << host_info.machine << "\r\n"; + tmp_stream << "arch_bits:" << (reinterpret_cast(&host_info.machine) + strlen(host_info.machine) - 2) << "\r\n"; + tmp_stream << "process_id:" << getpid() << "\r\n"; + tmp_stream << "run_id:" << static_cast(g_config.run_id) << "\r\n"; + tmp_stream << "tcp_port:" << g_config.port << "\r\n"; + tmp_stream << "uptime_in_seconds:" << (current_time_s - g_pikiwidb->Start_time_s()) << "\r\n"; + tmp_stream << "uptime_in_days:" << (current_time_s / (24 * 3600) - g_pikiwidb->Start_time_s() / (24 * 3600) + 1) + << "\r\n"; + tmp_stream << "config_file:" << g_pikiwidb->GetConfigName() << "\r\n"; + + info.append(tmp_stream.str()); +} + +void InfoCmd::InfoStats(std::string& info) { + std::stringstream tmp_stream; + tmp_stream << "# Stats" + << "\r\n"; + + tmp_stream << "is_bgsaving:" << (PREPL.IsBgsaving() ? "Yes" : "No") << "\r\n"; + tmp_stream << "slow_logs_count:" << PSlowLog::Instance().GetLogsCount() << "\r\n"; + info.append(tmp_stream.str()); +} + +void InfoCmd::InfoCPU(std::string& info) { + struct rusage self_ru; + struct rusage c_ru; + getrusage(RUSAGE_SELF, &self_ru); + getrusage(RUSAGE_CHILDREN, &c_ru); + std::stringstream tmp_stream; + tmp_stream << "# CPU" + << "\r\n"; + tmp_stream << "used_cpu_sys:" << std::setiosflags(std::ios::fixed) << std::setprecision(2) + << static_cast(self_ru.ru_stime.tv_sec) + static_cast(self_ru.ru_stime.tv_usec) / 1000000 + << "\r\n"; + tmp_stream << "used_cpu_user:" << std::setiosflags(std::ios::fixed) << std::setprecision(2) + << static_cast(self_ru.ru_utime.tv_sec) + static_cast(self_ru.ru_utime.tv_usec) / 1000000 + << "\r\n"; + tmp_stream << "used_cpu_sys_children:" << std::setiosflags(std::ios::fixed) << std::setprecision(2) + << static_cast(c_ru.ru_stime.tv_sec) + static_cast(c_ru.ru_stime.tv_usec) / 1000000 + << "\r\n"; + tmp_stream << "used_cpu_user_children:" << std::setiosflags(std::ios::fixed) << std::setprecision(2) + << static_cast(c_ru.ru_utime.tv_sec) + static_cast(c_ru.ru_utime.tv_usec) / 1000000 + << "\r\n"; + info.append(tmp_stream.str()); +} + +void InfoCmd::InfoData(std::string& message) { message += DATABASES_NUM + std::string(":") + std::to_string(pikiwidb::g_config.databases) + "\r\n"; message += ROCKSDB_NUM + std::string(":") + std::to_string(pikiwidb::g_config.db_instance_num) + "\r\n"; message += ROCKSDB_VERSION + std::string(":") + ROCKSDB_NAMESPACE::GetRocksVersionAsString() + "\r\n"; +} - client->AppendString(message); +double InfoCmd::MethodofTotalTimeCalculation(const uint64_t time_consuming) { + return static_cast(time_consuming) / 1000.0; +} + +double InfoCmd::MethodofCommandStatistics(const uint64_t time_consuming, const uint64_t frequency) { + return (static_cast(time_consuming) / 1000.0) / static_cast(frequency); +} + +void InfoCmd::InfoCommandStats(PClient* client, std::string& info) { + std::stringstream tmp_stream; + tmp_stream.precision(2); + tmp_stream.setf(std::ios::fixed); + tmp_stream << "# Commandstats" + << "\r\n"; + auto cmdstat_map = client->GetCommandStatMap(); + for (auto iter : *cmdstat_map) { + if (iter.second.cmd_count_ != 0) { + tmp_stream << iter.first << ":" << FormatCommandStatLine(iter.second); + } + } + info.append(tmp_stream.str()); +} + +std::string InfoCmd::FormatCommandStatLine(const CommandStatistics& stats) { + std::stringstream stream; + stream.precision(2); + stream.setf(std::ios::fixed); + stream << "calls=" << stats.cmd_count_ << ", usec=" << MethodofTotalTimeCalculation(stats.cmd_time_consuming_) + << ", usec_per_call="; + if (!stats.cmd_time_consuming_) { + stream << 0 << "\r\n"; + } else { + stream << MethodofCommandStatistics(stats.cmd_time_consuming_, stats.cmd_count_) << "\r\n"; + } + return stream.str(); } CmdDebug::CmdDebug(const std::string& name, int arity) : BaseCmdGroup(name, kCmdFlagsAdmin, kAclCategoryAdmin) {} diff --git a/src/cmd_admin.h b/src/cmd_admin.h index 3c846d19e..8dd195b40 100644 --- a/src/cmd_admin.h +++ b/src/cmd_admin.h @@ -124,8 +124,45 @@ class InfoCmd : public BaseCmd { private: void DoCmd(PClient* client) override; - void InfoRaft(PClient* client); - void InfoData(PClient* client); + enum InfoSection { + kInfoErr = 0x0, + kInfoServer, + kInfoStats, + kInfoCPU, + kInfoData, + kInfo, + kInfoAll, + kInfoCommandStats, + kInfoRaft + }; + + InfoSection info_section_; + const static std::string kInfoSection; + const static std::string kAllSection; + const static std::string kServerSection; + const static std::string kStatsSection; + const static std::string kCPUSection; + const static std::string kDataSection; + const static std::string kCommandStatsSection; + const static std::string kRaftSection; + + const std::unordered_map sectionMap = {{kAllSection, kInfoAll}, + {kServerSection, kInfoServer}, + {kStatsSection, kInfoStats}, + {kCPUSection, kInfoCPU}, + {kDataSection, kInfoData}, + {kRaftSection, kInfoRaft}, + {kCommandStatsSection, kInfoCommandStats}}; + + void InfoServer(std::string& info); + void InfoStats(std::string& info); + void InfoCPU(std::string& info); + void InfoRaft(std::string& info); + void InfoData(std::string& info); + void InfoCommandStats(PClient* client, std::string& info); + std::string FormatCommandStatLine(const CommandStatistics& stats); + double MethodofTotalTimeCalculation(const uint64_t time_consuming); + double MethodofCommandStatistics(const uint64_t time_consuming, const uint64_t frequency); }; class CmdDebug : public BaseCmdGroup { diff --git a/src/cmd_table_manager.cc b/src/cmd_table_manager.cc index b9f398319..72edb7d43 100644 --- a/src/cmd_table_manager.cc +++ b/src/cmd_table_manager.cc @@ -209,4 +209,5 @@ bool CmdTableManager::CmdExist(const std::string& cmd) const { } uint32_t CmdTableManager::GetCmdId() { return ++cmdId_; } + } // namespace pikiwidb diff --git a/src/cmd_thread_pool_worker.cc b/src/cmd_thread_pool_worker.cc index 7de20a520..487c7fe54 100644 --- a/src/cmd_thread_pool_worker.cc +++ b/src/cmd_thread_pool_worker.cc @@ -6,6 +6,8 @@ */ #include "cmd_thread_pool_worker.h" +#include "client.h" +#include "env.h" #include "log.h" #include "pikiwidb.h" @@ -37,7 +39,22 @@ void CmdWorkThreadPoolWorker::Work() { g_pikiwidb->PushWriteTask(task->Client()); continue; } + + auto cmdstat_map = task->Client()->GetCommandStatMap(); + CommandStatistics statistics; + if (cmdstat_map->find(task->CmdName()) == cmdstat_map->end()) { + cmdstat_map->emplace(task->CmdName(), statistics); + } + auto now = std::chrono::steady_clock::now(); + task->Client()->GetTimeStat()->SetDequeueTs(now); task->Run(cmdPtr); + + // Info Commandstats used + now = std::chrono::steady_clock::now(); + task->Client()->GetTimeStat()->SetProcessDoneTs(now); + (*cmdstat_map)[task->CmdName()].cmd_count_.fetch_add(1); + (*cmdstat_map)[task->CmdName()].cmd_time_consuming_.fetch_add(task->Client()->GetTimeStat()->GetTotalTime()); + g_pikiwidb->PushWriteTask(task->Client()); } self_task_.clear(); diff --git a/src/pikiwidb.cc b/src/pikiwidb.cc index 0880549ad..a3f21e289 100644 --- a/src/pikiwidb.cc +++ b/src/pikiwidb.cc @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -174,6 +175,8 @@ bool PikiwiDB::Init() { timerTask->SetCallback([]() { PREPL.Cron(); }); event_server_->AddTimerTask(timerTask); + time(&start_time_s_); + return true; } diff --git a/src/pikiwidb.h b/src/pikiwidb.h index a9112ba21..4bba614dc 100644 --- a/src/pikiwidb.h +++ b/src/pikiwidb.h @@ -63,6 +63,8 @@ class PikiwiDB final { const std::function&, const net::SocketAddr&)>& onConnect, const std::function& cb); + time_t Start_time_s() { return start_time_s_; } + public: PString cfg_file_; uint16_t port_{0}; @@ -78,6 +80,8 @@ class PikiwiDB final { std::unique_ptr>> event_server_; uint32_t cmd_id_ = 0; + + time_t start_time_s_ = 0; }; extern std::unique_ptr g_pikiwidb;