From afb64fcd5db6e4cabd7fb936f98c9152c5731a11 Mon Sep 17 00:00:00 2001 From: Sanal P Date: Wed, 4 Sep 2024 17:51:21 -0700 Subject: [PATCH] Add support for raft repl dev replace member. When replacing a member, add the new member, sync raft log for replace and finally remove the old member. Once we add new member, baseline or incremental resync will start. Remove the old member will cause nuraft mesg to exit the group and we periodically gc the destroyed group. Made the repl dev base test common so that both tests files can use. Tests by default create repl group with num_replica's. Dynamic tests create additional spare replica's which can be added to the test dynamically by calling replace member. --- conanfile.py | 2 +- .../homestore/replication/repl_decls.h | 19 +- src/include/homestore/replication/repl_dev.h | 6 +- src/lib/replication/repl_dev/common.cpp | 3 +- .../replication/repl_dev/raft_repl_dev.cpp | 109 ++- src/lib/replication/repl_dev/raft_repl_dev.h | 7 + .../replication/service/raft_repl_service.cpp | 16 +- .../replication/service/raft_repl_service.h | 1 - src/tests/CMakeLists.txt | 6 + src/tests/test_common/hs_repl_test_common.hpp | 19 +- src/tests/test_common/raft_repl_test_base.hpp | 629 ++++++++++++++++++ src/tests/test_raft_repl_dev.cpp | 608 +---------------- src/tests/test_raft_repl_dev_dynamic.cpp | 133 ++++ src/tests/test_solo_repl_dev.cpp | 1 + 14 files changed, 930 insertions(+), 629 deletions(-) create mode 100644 src/tests/test_common/raft_repl_test_base.hpp create mode 100644 src/tests/test_raft_repl_dev_dynamic.cpp diff --git a/conanfile.py b/conanfile.py index be8689f9b..524cd6a1d 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.61" + version = "6.4.62" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/replication/repl_decls.h b/src/include/homestore/replication/repl_decls.h index ac15a53af..24c6a7571 100644 --- a/src/include/homestore/replication/repl_decls.h +++ b/src/include/homestore/replication/repl_decls.h @@ -19,17 +19,18 @@ namespace homestore { VENUM(ReplServiceError, int32_t, OK = 0, // Everything OK CANCELLED = -1, // Request was cancelled - TIMEOUT = -2, - NOT_LEADER = -3, - BAD_REQUEST = -4, - SERVER_ALREADY_EXISTS = -5, + TIMEOUT = -2, + NOT_LEADER = -3, + BAD_REQUEST = -4, + SERVER_ALREADY_EXISTS = -5, CONFIG_CHANGING = -6, - SERVER_IS_JOINING = -7, - SERVER_NOT_FOUND = -8, - CANNOT_REMOVE_LEADER = -9, + SERVER_IS_JOINING = -7, + SERVER_NOT_FOUND = -8, + CANNOT_REMOVE_LEADER = -9, SERVER_IS_LEAVING = -10, - TERM_MISMATCH = -11, - RESULT_NOT_EXIST_YET = -10000, + TERM_MISMATCH = -11, + RETRY_REQUEST = -12, + RESULT_NOT_EXIST_YET = -10000, NOT_IMPLEMENTED = -10001, NO_SPACE_LEFT = -20000, DRIVE_WRITE_ERROR = -20001, diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 9965ada5d..15dc4872a 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -42,7 +42,8 @@ VENUM(repl_req_state_t, uint32_t, VENUM(journal_type_t, uint16_t, HS_DATA_LINKED = 0, // Linked data where each entry will store physical blkid where data reside HS_DATA_INLINED = 1, // Data is inlined in the header of journal entry - HS_CTRL_DESTROY = 2 // Control message to destroy the repl_dev + HS_CTRL_DESTROY = 2, // Control message to destroy the repl_dev + HS_CTRL_REPLACE = 3, // Control message to replace a member ) struct repl_key { @@ -346,6 +347,9 @@ class ReplDevListener { /// after restart in case crash happened during the destroy. virtual void on_destroy() = 0; + /// @brief Called when replace member is performed. + virtual void replace_member(replica_id_t member_out, replica_id_t member_in) = 0; + /// @brief Called when the snapshot is being created by nuraft virtual AsyncReplResult<> create_snapshot(shared< snapshot_context > context) = 0; diff --git a/src/lib/replication/repl_dev/common.cpp b/src/lib/replication/repl_dev/common.cpp index 71927a3ad..b8800afea 100644 --- a/src/lib/replication/repl_dev/common.cpp +++ b/src/lib/replication/repl_dev/common.cpp @@ -192,9 +192,10 @@ std::string repl_req_ctx::to_string() const { } std::string repl_req_ctx::to_compact_string() const { - if (m_op_code == journal_type_t::HS_CTRL_DESTROY) { + if (m_op_code == journal_type_t::HS_CTRL_DESTROY || m_op_code == journal_type_t::HS_CTRL_REPLACE) { return fmt::format("term={} lsn={} op={}", m_rkey.term, m_lsn, enum_name(m_op_code)); } + return fmt::format("dsn={} term={} lsn={} op={} local_blkid={} state=[{}]", m_rkey.dsn, m_rkey.term, m_lsn, enum_name(m_op_code), m_local_blkid.to_string(), req_state_name(uint32_cast(state()))); } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 45a018d92..e928f8996 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -107,13 +107,94 @@ bool RaftReplDev::join_group() { m_msg_mgr.join_group(m_group_id, "homestore_replication", std::dynamic_pointer_cast< nuraft_mesg::mesg_state_mgr >(shared_from_this())); if (!raft_result) { - HS_DBG_ASSERT(false, "Unable to join the group_id={} with error={}", boost::uuids::to_string(m_group_id), - raft_result.error()); + HS_DBG_ASSERT(false, "Unable to join the group_id={} with error={}", group_id_str(), raft_result.error()); return false; } return true; } +AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, replica_id_t member_in_uuid) { + LOGINFO("Replace member group_id={} member_out={} member_in={}", group_id_str(), + boost::uuids::to_string(member_out_uuid), boost::uuids::to_string(member_in_uuid)); + + // Step 1: Check if leader itself is requested to move out. + if (m_my_repl_id == member_out_uuid && m_my_repl_id == get_leader_id()) { + // If leader is the member requested to move out, then give up leadership and return error. + // Client will retry replace_member request to the new leader. + raft_server()->yield_leadership(true /* immediate */, -1 /* successor */); + RD_LOGI("Replace member leader is the member_out so yield leadership"); + return make_async_error<>(ReplServiceError::NOT_LEADER); + } + + // Step 2. Add the new member. + return m_msg_mgr.add_member(m_group_id, member_in_uuid) + .via(&folly::InlineExecutor::instance()) + .thenValue([this, member_in_uuid, member_out_uuid](auto&& e) -> AsyncReplResult<> { + // TODO Currently we ignore the cancelled, fix nuraft_mesg to not timeout + // when adding member. Member is added to cluster config until member syncs fully + // with atleast stop gap. This will take a lot of time for block or + // object storage. + if (e.hasError()) { + // Ignore the server already exists as server already added to the cluster. + // The pg member change requests from control path are idemepotent and request + // can be resend and one of the add or remove can failed and has to retried. + if (e.error() == nuraft::cmd_result_code::CANCELLED || + e.error() == nuraft::cmd_result_code::SERVER_ALREADY_EXISTS) { + RD_LOGW("Ignoring error returned from nuraft add_member {}", e.error()); + } else { + RD_LOGE("Replace member error in add member : {}", e.error()); + return make_async_error<>(RaftReplService::to_repl_error(e.error())); + } + } + auto member_out = boost::uuids::to_string(member_out_uuid); + auto member_in = boost::uuids::to_string(member_in_uuid); + + RD_LOGI("Replace member added member={} to group_id={}", member_in, group_id_str()); + + // Step 3. Append log entry to mark the old member is out and new member is added. + auto rreq = repl_req_ptr_t(new repl_req_ctx{}); + replace_members_ctx members; + std::copy(member_in_uuid.begin(), member_in_uuid.end(), members.in_replica_id.begin()); + std::copy(member_out_uuid.begin(), member_out_uuid.end(), members.out_replica_id.begin()); + sisl::blob header(r_cast< uint8_t* >(&members), + members.in_replica_id.size() + members.out_replica_id.size()); + rreq->init( + repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)}, + journal_type_t::HS_CTRL_REPLACE, true, header, sisl::blob{}, 0); + + auto err = m_state_machine->propose_to_raft(std::move(rreq)); + if (err != ReplServiceError::OK) { + LOGERROR("Replace member propose to raft failed {}", err); + return make_async_error<>(std::move(err)); + } + + RD_LOGI("Replace member proposed to raft group_id={}", group_id_str()); + + // Step 4. Remove the old member. Even if the old member is temporarily + // down and recovers, nuraft mesg see member remove from cluster log + // entry and call exit_group() and leave(). + return m_msg_mgr.rem_member(m_group_id, member_out_uuid) + .via(&folly::InlineExecutor::instance()) + .thenValue([this, member_out](auto&& e) -> AsyncReplResult<> { + if (e.hasError()) { + // Ignore the server not found as server removed from the cluster + // as requests are idempotent and can be resend. + if (e.error() == nuraft::cmd_result_code::SERVER_NOT_FOUND) { + RD_LOGW("Remove member not found in group error, ignoring"); + } else { + // Its ok to retry this request as the request + // of replace member is idempotent. + RD_LOGE("Replace member failed to remove member : {}", e.error()); + return make_async_error<>(ReplServiceError::RETRY_REQUEST); + } + } else { + RD_LOGI("Replace member removed member={} from group_id={}", member_out, group_id_str()); + } + return make_async_success<>(); + }); + }); +} + folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() { // Set the intent to destroy the group m_stage.update([](auto* stage) { *stage = repl_dev_stage_t::DESTROYING; }); @@ -141,7 +222,7 @@ folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() { LOGERROR("RaftReplDev::destroy_group failed {}", err); } - LOGINFO("Raft repl dev destroy_group={}", boost::uuids::to_string(m_group_id)); + LOGINFO("Raft repl dev destroy_group={}", group_id_str()); return m_destroy_promise.getSemiFuture(); } @@ -786,6 +867,8 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) { RD_LOGD("Raft channel: Commit rreq=[{}]", rreq->to_string()); if (rreq->op_code() == journal_type_t::HS_CTRL_DESTROY) { leave(); + } else if (rreq->op_code() == journal_type_t::HS_CTRL_REPLACE) { + replace_member(rreq); } else { m_listener->on_commit(rreq->lsn(), rreq->header(), rreq->key(), rreq->local_blkid(), rreq); } @@ -820,7 +903,8 @@ void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err) blkid.to_string()); }); } - } else if (rreq->op_code() == journal_type_t::HS_CTRL_DESTROY) { + } else if (rreq->op_code() == journal_type_t::HS_CTRL_DESTROY || + rreq->op_code() == journal_type_t::HS_CTRL_REPLACE) { if (rreq->is_proposer()) { m_destroy_promise.setValue(err); } } @@ -836,6 +920,17 @@ void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err) rreq->clear(); } +void RaftReplDev::replace_member(repl_req_ptr_t rreq) { + auto members = r_cast< const replace_members_ctx* >(rreq->header().cbytes()); + replica_id_t member_in, member_out; + std::copy(members->out_replica_id.begin(), members->out_replica_id.end(), member_out.begin()); + std::copy(members->in_replica_id.begin(), members->in_replica_id.end(), member_in.begin()); + RD_LOGI("Raft repl replace_member member_out={} member_in={}", boost::uuids::to_string(member_out), + boost::uuids::to_string(member_in)); + + m_listener->replace_member(member_out, member_in); +} + static bool blob_equals(sisl::blob const& a, sisl::blob const& b) { if (a.size() != b.size()) { return false; } return (std::memcmp(a.cbytes(), b.cbytes(), a.size()) == 0); @@ -971,12 +1066,14 @@ void RaftReplDev::save_config(const nuraft::cluster_config& config) { std::unique_lock lg{m_config_mtx}; (*m_raft_config_sb)["config"] = serialize_cluster_config(config); m_raft_config_sb.write(); + RD_LOGI("Saved config {}", (*m_raft_config_sb)["config"].dump()); } void RaftReplDev::save_state(const nuraft::srv_state& state) { std::unique_lock lg{m_config_mtx}; (*m_raft_config_sb)["state"] = nlohmann::json{{"term", state.get_term()}, {"voted_for", state.get_voted_for()}}; m_raft_config_sb.write(); + RD_LOGI("Saved state {}", (*m_raft_config_sb)["state"].dump()); } nuraft::ptr< nuraft::srv_state > RaftReplDev::read_state() { @@ -1013,7 +1110,7 @@ uint32_t RaftReplDev::get_logstore_id() const { return m_data_journal->logstore_ std::shared_ptr< nuraft::state_machine > RaftReplDev::get_state_machine() { return m_state_machine; } void RaftReplDev::permanent_destroy() { - RD_LOGI("Permanent destroy for raft repl dev"); + RD_LOGI("Permanent destroy for raft repl dev group_id={}", group_id_str()); m_rd_sb.destroy(); m_raft_config_sb.destroy(); m_data_journal->remove_store(); @@ -1035,7 +1132,7 @@ void RaftReplDev::leave() { m_rd_sb->destroy_pending = 0x1; m_rd_sb.write(); - RD_LOGI("RaftReplDev leave group"); + RD_LOGI("RaftReplDev leave group_id={}", group_id_str()); m_destroy_promise.setValue(ReplServiceError::OK); // In case proposer is waiting for the destroy to complete } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 41594b528..82fdcaa23 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -35,6 +35,11 @@ using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >; ENUM(repl_dev_stage_t, uint8_t, INIT, ACTIVE, DESTROYING, DESTROYED, PERMANENT_DESTROYED); +struct replace_members_ctx { + std::array< uint8_t, 16 > out_replica_id; + std::array< uint8_t, 16 > in_replica_id; +}; + class RaftReplDevMetrics : public sisl::MetricsGroup { public: explicit RaftReplDevMetrics(const char* inst_name) : sisl::MetricsGroup("RaftReplDev", inst_name) { @@ -150,6 +155,7 @@ class RaftReplDev : public ReplDev, virtual ~RaftReplDev() = default; bool join_group(); + AsyncReplResult<> replace_member(replica_id_t member_out, replica_id_t member_in); folly::SemiFuture< ReplServiceError > destroy_group(); //////////////// All ReplDev overrides/implementation /////////////////////// @@ -268,6 +274,7 @@ class RaftReplDev : public ReplDev, bool wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms); void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx); void commit_blk(repl_req_ptr_t rreq); + void replace_member(repl_req_ptr_t rreq); }; } // namespace homestore diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index 65d928390..bbf921685 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -93,7 +93,12 @@ void RaftReplService::start() { .with_hb_interval(HS_DYNAMIC_CONFIG(consensus.heartbeat_period_ms)) .with_max_append_size(HS_DYNAMIC_CONFIG(consensus.max_append_batch_size)) .with_log_sync_batch_size(HS_DYNAMIC_CONFIG(consensus.log_sync_batch_size)) + // TODO to fix the log_gap thresholds when adding new member. + // When the option is enabled, new member is doing log sync is stuck after the first batch + // where if the option is disabled, new member is going through append entries and it works. +#if 0 .with_log_sync_stopping_gap(HS_DYNAMIC_CONFIG(consensus.min_log_gap_to_join)) +#endif .with_stale_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_hi_threshold)) .with_fresh_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_lo_threshold)) .with_snapshot_enabled(HS_DYNAMIC_CONFIG(consensus.snapshot_freq_distance)) @@ -327,7 +332,16 @@ void RaftReplService::load_repl_dev(sisl::byte_view const& buf, void* meta_cooki AsyncReplResult<> RaftReplService::replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in) const { - return make_async_error<>(ReplServiceError::NOT_IMPLEMENTED); + auto rdev_result = get_repl_dev(group_id); + if (!rdev_result) { return make_async_error<>(ReplServiceError::SERVER_NOT_FOUND); } + + return std::dynamic_pointer_cast< RaftReplDev >(rdev_result.value()) + ->replace_member(member_out, member_in) + .via(&folly::InlineExecutor::instance()) + .thenValue([this](auto&& e) mutable { + if (e.hasError()) { return make_async_error<>(e.error()); } + return make_async_success<>(); + }); } ////////////////////// Reaper Thread related ////////////////////////////////// diff --git a/src/lib/replication/service/raft_repl_service.h b/src/lib/replication/service/raft_repl_service.h index a38cbbccb..cba90e2e0 100644 --- a/src/lib/replication/service/raft_repl_service.h +++ b/src/lib/replication/service/raft_repl_service.h @@ -80,7 +80,6 @@ class RaftReplService : public GenericReplService, void gc_repl_devs(); void gc_repl_reqs(); void flush_durable_commit_lsn(); - }; class RaftReplServiceCPHandler : public CPCallbacks { diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index ddbac4c94..ce8ccb422 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -118,6 +118,10 @@ if (${io_tests}) target_sources(test_raft_repl_dev PRIVATE test_raft_repl_dev.cpp) target_link_libraries(test_raft_repl_dev homestore ${COMMON_TEST_DEPS} GTest::gmock) + add_executable(test_raft_repl_dev_dynamic) + target_sources(test_raft_repl_dev_dynamic PRIVATE test_raft_repl_dev_dynamic.cpp) + target_link_libraries(test_raft_repl_dev_dynamic homestore ${COMMON_TEST_DEPS} GTest::gmock) + can_build_epoll_io_tests(epoll_tests) if(${epoll_tests}) add_test(NAME LogDev-Epoll COMMAND test_log_dev) @@ -126,6 +130,7 @@ if (${io_tests}) add_test(NAME MetaBlkMgr-Epoll COMMAND test_meta_blk_mgr) add_test(NAME DataService-Epoll COMMAND test_data_service) add_test(NAME RaftReplDev-Epoll COMMAND test_raft_repl_dev) + add_test(NAME RaftReplDevDynamic-Epoll COMMAND test_raft_repl_dev_dynamic) # add_test(NAME SoloReplDev-Epoll COMMAND test_solo_repl_dev) endif() @@ -138,6 +143,7 @@ if (${io_tests}) add_test(NAME SoloReplDev-Spdk COMMAND test_solo_repl_dev -- --spdk "true") add_test(NAME HomeRaftLogStore-Spdk COMMAND test_home_raft_logstore -- --spdk "true") add_test(NAME RaftReplDev-Spdk COMMAND test_raft_repl_dev -- --spdk "true") + add_test(NAME RaftReplDevDynamic-Spdk COMMAND test_raft_repl_dev_dynamic -- --spdk "true") if(${epoll_tests}) SET_TESTS_PROPERTIES(MetaBlkMgr-Spdk PROPERTIES DEPENDS LogStore-Spdk) SET_TESTS_PROPERTIES(DataService-Spdk PROPERTIES DEPENDS MetaBlkMgr-Spdk) diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index 67abe2f8e..672acffcb 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -38,6 +38,8 @@ SISL_OPTION_GROUP(test_repl_common_setup, (replicas, "", "replicas", "Total number of replicas", ::cxxopts::value< uint32_t >()->default_value("3"), "number"), + (spare_replicas, "", "spare_replicas", "Additional number of spare replicas not part of repldev", + ::cxxopts::value< uint32_t >()->default_value("1"), "number"), (base_port, "", "base_port", "Port number of first replica", ::cxxopts::value< uint16_t >()->default_value("4000"), "number"), (replica_num, "", "replica_num", @@ -134,11 +136,12 @@ class HSReplTestHelper : public HSTestHelper { HSReplTestHelper(std::string const& name, std::vector< std::string > const& args, char** argv) : name_{name}, args_{args}, argv_{argv} {} - void setup() { + void setup(uint32_t num_replicas) { + num_replicas_ = num_replicas; replica_num_ = SISL_OPTIONS["replica_num"].as< uint16_t >(); + sisl::logging::SetLogger(name_ + std::string("_replica_") + std::to_string(replica_num_)); sisl::logging::SetLogPattern("[%D %T%z] [%^%L%$] [%n] [%t] %v"); - auto const num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >(); boost::uuids::string_generator gen; for (uint32_t i{0}; i < num_replicas; ++i) { @@ -226,7 +229,7 @@ class HSReplTestHelper : public HSTestHelper { void reset_setup() { teardown(); - setup(); + setup(num_replicas_); } void restart(uint32_t shutdown_delay_secs = 5u) { @@ -273,8 +276,12 @@ class HSReplTestHelper : public HSTestHelper { if (replica_num_ == 0) { std::set< homestore::replica_id_t > members; - std::transform(members_.begin(), members_.end(), std::inserter(members, members.end()), - [](auto const& p) { return p.first; }); + // By default we create repl dev with number of members equal to replicas argument. + // We dont add spare replica's to the group by default. + for (auto& m : members_) { + if (m.second < SISL_OPTIONS["replicas"].as< uint32_t >()) { members.insert(m.first); } + } + group_id_t repl_group_id = hs_utils::gen_random_uuid(); { std::unique_lock lg(groups_mtx_); @@ -299,6 +306,7 @@ class HSReplTestHelper : public HSTestHelper { auto listener = std::move(pending_listeners_[0]); repl_groups_.insert(std::pair(group_id, listener)); pending_listeners_.erase(pending_listeners_.begin()); + LOGINFO("Got listener for group_id={} replica={}", boost::uuids::to_string(group_id), replica_num_); return listener; } @@ -346,6 +354,7 @@ class HSReplTestHelper : public HSTestHelper { std::string name_; std::vector< std::string > args_; char** argv_; + uint32_t num_replicas_; std::vector< homestore::dev_info > dev_list_; diff --git a/src/tests/test_common/raft_repl_test_base.hpp b/src/tests/test_common/raft_repl_test_base.hpp new file mode 100644 index 000000000..7b96afa4c --- /dev/null +++ b/src/tests/test_common/raft_repl_test_base.hpp @@ -0,0 +1,629 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include "common/homestore_config.hpp" +#include "common/homestore_assert.hpp" +#include "common/homestore_utils.hpp" + +#define private public +#include "test_common/hs_repl_test_common.hpp" +#include "replication/service/raft_repl_service.h" +#include "replication/repl_dev/raft_repl_dev.h" + +using namespace homestore; + +SISL_LOGGING_DEF(test_raft_repl_dev) +SISL_LOGGING_INIT(HOMESTORE_LOG_MODS, nuraft_mesg) + +SISL_OPTION_GROUP(test_raft_repl_dev, + (block_size, "", "block_size", "block size to io", + ::cxxopts::value< uint32_t >()->default_value("4096"), "number"), + (num_raft_groups, "", "num_raft_groups", "number of raft groups per test", + ::cxxopts::value< uint32_t >()->default_value("1"), "number"), + // for below replication parameter, their default value always get from dynamic config, only used + // when specified by user + (snapshot_distance, "", "snapshot_distance", "distance between snapshots", + ::cxxopts::value< uint32_t >()->default_value("0"), "number"), + (num_raft_logs_resv, "", "num_raft_logs_resv", "number of raft logs reserved", + ::cxxopts::value< uint32_t >()->default_value("0"), "number"), + (res_mgr_audit_timer_ms, "", "res_mgr_audit_timer_ms", "resource manager audit timer", + ::cxxopts::value< uint32_t >()->default_value("0"), "number")); + +SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, config, test_common_setup, test_repl_common_setup) + +static std::unique_ptr< test_common::HSReplTestHelper > g_helper; +static std::random_device g_rd{}; +static std::default_random_engine g_re{g_rd()}; + +class TestReplicatedDB : public homestore::ReplDevListener { +public: + struct Key { + uint64_t id_; + bool operator<(Key const& other) const { return id_ < other.id_; } + }; + + struct Value { + int64_t lsn_; + uint64_t data_size_; + uint64_t data_pattern_; + MultiBlkId blkid_; + uint64_t id_; + }; + + struct KeyValuePair { + Key key; + Value value; + }; + + struct test_req : public repl_req_ctx { + struct journal_header { + uint64_t data_size; + uint64_t data_pattern; + }; + + journal_header jheader; + uint64_t key_id; + sisl::sg_list write_sgs; + sisl::sg_list read_sgs; + + sisl::blob header_blob() { return sisl::blob(uintptr_cast(&jheader), sizeof(journal_header)); } + sisl::blob key_blob() { return sisl::blob{uintptr_cast(&key_id), sizeof(uint64_t)}; } + + test_req() { + write_sgs.size = 0; + read_sgs.size = 0; + key_id = (uint64_t)rand() << 32 | rand(); + } + + ~test_req() { + for (auto const& iov : write_sgs.iovs) { + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + + for (auto const& iov : read_sgs.iovs) { + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + } + }; + + TestReplicatedDB() = default; + virtual ~TestReplicatedDB() = default; + + void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids, + cintrusive< repl_req_ctx >& ctx) override { + ASSERT_EQ(header.size(), sizeof(test_req::journal_header)); + + auto jheader = r_cast< test_req::journal_header const* >(header.cbytes()); + Key k{.id_ = *(r_cast< uint64_t const* >(key.cbytes()))}; + Value v{.lsn_ = lsn, + .data_size_ = jheader->data_size, + .data_pattern_ = jheader->data_pattern, + .blkid_ = blkids, + .id_ = k.id_}; + + LOGINFOMOD(replication, "[Replica={}] Received commit on lsn={} dsn={} key={} value[blkid={} pattern={}]", + g_helper->replica_num(), lsn, ctx->dsn(), k.id_, v.blkid_.to_string(), v.data_pattern_); + + { + std::unique_lock lk(db_mtx_); + inmem_db_.insert_or_assign(k, v); + lsn_index_.emplace(lsn, v); + last_committed_lsn = lsn; + ++commit_count_; + } + + if (ctx->is_proposer()) { g_helper->runner().next_task(); } + } + + bool on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) override { + LOGINFOMOD(replication, "[Replica={}] Received pre-commit on lsn={} dsn={}", g_helper->replica_num(), lsn, + ctx->dsn()); + return true; + } + + void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) override { + LOGINFOMOD(replication, "[Replica={}] Received rollback on lsn={}", g_helper->replica_num(), lsn); + } + + void on_restart() { + LOGINFOMOD(replication, "restarted repl dev for [Replica={}] Group={}", g_helper->replica_num(), + boost::uuids::to_string(repl_dev()->group_id())); + } + + void on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) override { + LOGINFOMOD(replication, "[Replica={}] Received error={} on key={}", g_helper->replica_num(), enum_name(error), + *(r_cast< uint64_t const* >(key.cbytes()))); + } + + AsyncReplResult<> create_snapshot(shared< snapshot_context > context) override { + std::lock_guard< std::mutex > lock(m_snapshot_lock); + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); + LOGINFOMOD(replication, "[Replica={}] Got snapshot callback term={} idx={}", g_helper->replica_num(), + s->get_last_log_term(), s->get_last_log_idx()); + m_last_snapshot = context; + return make_async_success<>(); + } + + int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); + + if (snp_data->offset == 0) { + snp_data->is_last_obj = false; + snp_data->blob = sisl::io_blob_safe(sizeof(ulong)); + LOGINFOMOD(replication, + "[Replica={}] Read logical snapshot callback first message obj_id={} term={} idx={}", + g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx()); + return 0; + } + + int64_t next_lsn = snp_data->offset; + std::vector< KeyValuePair > kv_snapshot_data; + // we can not use find to get the next element, since if the next lsn is a config lsn , it will not be put into + // lsn_index_ and as a result, the find will return the end of the map. so here we use lower_bound to get the + // first element to be read and transfered. + for (auto iter = lsn_index_.lower_bound(next_lsn); iter != lsn_index_.end(); iter++) { + auto& v = iter->second; + kv_snapshot_data.emplace_back(Key{v.id_}, v); + LOGTRACEMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} size={} pattern={}", + g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_); + if (kv_snapshot_data.size() >= 1000) { break; } + } + + if (kv_snapshot_data.size() == 0) { + snp_data->is_last_obj = true; + LOGINFOMOD(replication, "Snapshot is_last_obj is true"); + return 0; + } + + int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size(); + sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)}; + std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size); + snp_data->blob = std::move(blob); + snp_data->is_last_obj = false; + LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={} num_items={}", + g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), + kv_snapshot_data.size()); + + return 0; + } + + void snapshot_data_write(uint64_t data_size, uint64_t data_pattern, MultiBlkId& out_blkids) { + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + auto write_sgs = test_common::HSTestHelper::create_sgs(data_size, block_size, data_pattern); + auto fut = homestore::data_service().async_alloc_write(write_sgs, blk_alloc_hints{}, out_blkids); + std::move(fut).get(); + for (auto const& iov : write_sgs.iovs) { + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + } + + void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); + auto last_committed_idx = + std::dynamic_pointer_cast< RaftReplDev >(repl_dev())->raft_server()->get_committed_log_idx(); + if (snp_data->offset == 0) { + snp_data->offset = last_committed_lsn + 1; + LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}", + g_helper->replica_num(), snp_data->offset); + return; + } + + size_t kv_snapshot_data_size = snp_data->blob.size(); + if (kv_snapshot_data_size == 0) return; + + size_t num_items = kv_snapshot_data_size / sizeof(KeyValuePair); + std::unique_lock lk(db_mtx_); + auto ptr = r_cast< const KeyValuePair* >(snp_data->blob.bytes()); + for (size_t i = 0; i < num_items; i++) { + auto key = ptr->key; + auto value = ptr->value; + LOGTRACEMOD(replication, "[Replica={}] Save logical snapshot got lsn={} data_size={} data_pattern={}", + g_helper->replica_num(), value.lsn_, value.data_size_, value.data_pattern_); + + // Write to data service and inmem map. + MultiBlkId out_blkids; + if (value.data_size_ != 0) { + snapshot_data_write(value.data_size_, value.data_pattern_, out_blkids); + value.blkid_ = out_blkids; + } + inmem_db_.insert_or_assign(key, value); + last_committed_lsn = value.lsn_; + ++commit_count_; + ptr++; + } + + snp_data->offset = last_committed_lsn + 1; + LOGINFOMOD(replication, + "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={} num_items={}", + g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), + snp_data->is_last_obj, num_items); + } + + bool apply_snapshot(shared< snapshot_context > context) override { + std::lock_guard< std::mutex > lock(m_snapshot_lock); + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); + LOGINFOMOD(replication, "[Replica={}] Apply snapshot term={} idx={}", g_helper->replica_num(), + s->get_last_log_term(), s->get_last_log_idx()); + m_last_snapshot = context; + return true; + } + + shared< snapshot_context > last_snapshot() override { + std::lock_guard< std::mutex > lock(m_snapshot_lock); + if (!m_last_snapshot) return nullptr; + + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(m_last_snapshot)->nuraft_snapshot(); + LOGINFOMOD(replication, "[Replica={}] Last snapshot term={} idx={}", g_helper->replica_num(), + s->get_last_log_term(), s->get_last_log_idx()); + return m_last_snapshot; + } + + void free_user_snp_ctx(void*& user_snp_ctx) override {} + + ReplResult< blk_alloc_hints > get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) override { + return blk_alloc_hints{}; + } + void replace_member(replica_id_t member_out, replica_id_t member_in) override {} + + void on_destroy() override { + LOGINFOMOD(replication, "[Replica={}] Group={} is being destroyed", g_helper->replica_num(), + boost::uuids::to_string(repl_dev()->group_id())); + g_helper->unregister_listener(repl_dev()->group_id()); + } + + void db_write(uint64_t data_size, uint32_t max_size_per_iov) { + static std::atomic< uint32_t > s_uniq_num{0}; + auto req = intrusive< test_req >(new test_req()); + req->jheader.data_size = data_size; + req->jheader.data_pattern = ((long long)rand() << 32) | ++s_uniq_num; + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + + LOGINFOMOD(replication, "[Replica={}] Db write key={} data_size={} pattern={} block_size={}", + g_helper->replica_num(), req->key_id, data_size, req->jheader.data_pattern, block_size); + + if (data_size != 0) { + req->write_sgs = + test_common::HSTestHelper::create_sgs(data_size, max_size_per_iov, req->jheader.data_pattern); + } + + repl_dev()->async_alloc_write(req->header_blob(), req->key_blob(), req->write_sgs, req); + } + + void validate_db_data() { + g_helper->runner().set_num_tasks(inmem_db_.size()); + + LOGINFOMOD(replication, "[{}]: Total {} keys committed, validating them", + boost::uuids::to_string(repl_dev()->group_id()), inmem_db_.size()); + auto it = inmem_db_.begin(); + g_helper->runner().set_task([this, &it]() { + Key k; + Value v; + { + std::unique_lock lk(db_mtx_); + std::tie(k, v) = *it; + ++it; + } + + if (v.data_size_ != 0) { + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + auto read_sgs = test_common::HSTestHelper::create_sgs(v.data_size_, block_size); + + repl_dev()->async_read(v.blkid_, read_sgs, v.data_size_).thenValue([read_sgs, k, v](auto const ec) { + LOGINFOMOD(replication, "Validating key={} value[blkid={} pattern={}]", k.id_, v.blkid_.to_string(), + v.data_pattern_); + RELEASE_ASSERT(!ec, "Read of blkid={} for key={} error={}", v.blkid_.to_string(), k.id_, + ec.message()); + for (auto const& iov : read_sgs.iovs) { + test_common::HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, + v.data_pattern_); + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + g_helper->runner().next_task(); + }); + } else { + g_helper->runner().next_task(); + } + }); + g_helper->runner().execute().get(); + } + + uint64_t db_commit_count() const { + std::shared_lock lk(db_mtx_); + return commit_count_; + } + + uint64_t db_size() const { + std::shared_lock lk(db_mtx_); + return inmem_db_.size(); + } + + void create_snapshot() { + auto raft_repl_dev = std::dynamic_pointer_cast< RaftReplDev >(repl_dev()); + ulong snapshot_idx = raft_repl_dev->raft_server()->create_snapshot(); + LOGINFO("Manually create snapshot got index {}", snapshot_idx); + } + + void truncate(int num_reserved_entries) { + auto raft_repl_dev = std::dynamic_pointer_cast< RaftReplDev >(repl_dev()); + raft_repl_dev->truncate(num_reserved_entries); + LOGINFO("Manually truncated"); + } + + void set_zombie() { zombie_ = true; } + bool is_zombie() { + // Wether a group is zombie(non recoverable) + return zombie_; + } + +private: + std::map< Key, Value > inmem_db_; + std::map< int64_t, Value > lsn_index_; + uint64_t commit_count_{0}; + std::shared_mutex db_mtx_; + uint64_t last_committed_lsn{0}; + std::shared_ptr< snapshot_context > m_last_snapshot{nullptr}; + std::mutex m_snapshot_lock; + bool zombie_{false}; +}; + +class RaftReplDevTestBase : public testing::Test { +public: + void SetUp() override { + // By default it will create one db + for (uint32_t i{0}; i < SISL_OPTIONS["num_raft_groups"].as< uint32_t >(); ++i) { + auto db = std::make_shared< TestReplicatedDB >(); + g_helper->register_listener(db); + dbs_.emplace_back(std::move(db)); + } + } + + void TearDown() override { + for (auto const& db : dbs_) { + if (db->is_zombie()) { continue; } + run_on_leader(db, [this, db]() { + auto err = hs()->repl_service().remove_repl_dev(db->repl_dev()->group_id()).get(); + ASSERT_EQ(err, ReplServiceError::OK) << "Error in destroying the group"; + }); + } + + for (auto const& db : dbs_) { + if (db->is_zombie()) { continue; } + auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev()); + int i = 0; + bool force_leave = false; + do { + std::this_thread::sleep_for(std::chrono::seconds(1)); + auto& raft_repl_svc = dynamic_cast< RaftReplService& >(hs()->repl_service()); + raft_repl_svc.gc_repl_devs(); + LOGINFO("Waiting for repl dev to get destroyed"); + + // TODO: if leader is destroyed, but the follower does not receive the notification, it will not be + // destroyed for ever. we need handle this in raft_repl_dev. revisit here after making changes at + // raft_repl_dev side to hanle this case. this is a workaround to avoid the infinite loop for now. + if (i++ > 10 && !force_leave) { + LOGWARN("has already waited for repl dev to get destroyed for 10 times, so do a force leave"); + repl_dev->force_leave(); + force_leave = true; + } + + } while (!repl_dev->is_destroyed()); + } + } + + void generate_writes(uint64_t data_size, uint32_t max_size_per_iov, shared< TestReplicatedDB > db = nullptr) { + if (db == nullptr) { db = pick_one_db(); } + // LOGINFO("Writing on group_id={}", db->repl_dev()->group_id()); + db->db_write(data_size, max_size_per_iov); + } + + void wait_for_all_commits() { wait_for_commits(written_entries_); } + + void wait_for_commits(uint64_t exp_writes) { + uint64_t total_writes{0}; + while (true) { + total_writes = 0; + for (auto const& db : dbs_) { + total_writes += db->db_commit_count(); + } + + if (total_writes >= exp_writes) { break; } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + LOGINFO("Replica={} received {} commits but expected {}", g_helper->replica_num(), total_writes, + exp_writes); + } + LOGINFO("Replica={} has received {} commits as expected", g_helper->replica_num(), total_writes); + } + + void validate_data() { + for (auto const& db : dbs_) { + db->validate_db_data(); + } + } + + shared< TestReplicatedDB > pick_one_db() { return dbs_[0]; } + + void assign_leader(uint16_t replica) { + LOGINFO("Switch the leader to replica_num = {}", replica); + if (g_helper->replica_num() == replica) { + for (auto const& db : dbs_) { + do { + auto result = db->repl_dev()->become_leader().get(); + if (result.hasError()) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } else { + break; + } + } while (true); + } + } else { + for (auto const& db : dbs_) { + homestore::replica_id_t leader_uuid; + while (true) { + leader_uuid = db->repl_dev()->get_leader_id(); + if (!leader_uuid.is_nil() && (g_helper->member_id(leader_uuid) == replica)) { break; } + + LOGINFO("Waiting for replica={} to become leader", replica); + std::this_thread::sleep_for(std::chrono::milliseconds{500}); + } + } + } + } + + void run_on_leader(std::shared_ptr< TestReplicatedDB > db, auto&& lambda) { + do { + auto leader_uuid = db->repl_dev()->get_leader_id(); + + if (leader_uuid.is_nil()) { + LOGINFO("Waiting for leader to be elected for group={}", db->repl_dev()->group_id()); + std::this_thread::sleep_for(std::chrono::milliseconds{500}); + } else if (leader_uuid == g_helper->my_replica_id()) { + lambda(); + break; + } else { + break; + } + } while (true); + } + + void write_on_leader(uint32_t num_entries, bool wait_for_commit = true, shared< TestReplicatedDB > db = nullptr) { + do { + auto leader_uuid = dbs_[0]->repl_dev()->get_leader_id(); + + if (leader_uuid.is_nil()) { + LOGINFO("Waiting for leader to be elected"); + std::this_thread::sleep_for(std::chrono::milliseconds{500}); + } else if (leader_uuid == g_helper->my_replica_id()) { + LOGINFO("Writing {} entries since I am the leader my_uuid={}", num_entries, + boost::uuids::to_string(g_helper->my_replica_id())); + auto const block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + g_helper->runner().set_num_tasks(num_entries); + + LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); + g_helper->runner().set_task([this, block_size, db]() { + static std::normal_distribution<> num_blks_gen{3.0, 2.0}; + this->generate_writes(std::abs(std::lround(num_blks_gen(g_re))) * block_size, block_size, db); + }); + if (wait_for_commit) { g_helper->runner().execute().get(); } + break; + } else { + LOGINFO("{} entries were written on the leader_uuid={} my_uuid={}", num_entries, + boost::uuids::to_string(leader_uuid), boost::uuids::to_string(g_helper->my_replica_id())); + break; + } + } while (true); + + written_entries_ += num_entries; + if (wait_for_commit) { this->wait_for_all_commits(); } + } + + void remove_db(std::shared_ptr< TestReplicatedDB > db, bool wait_for_removal) { + this->run_on_leader(db, [this, db]() { + auto err = hs()->repl_service().remove_repl_dev(db->repl_dev()->group_id()).get(); + ASSERT_EQ(err, ReplServiceError::OK) << "Error in destroying the group"; + }); + + // Remove the db from the dbs_ list and check if count matches with repl_device + for (auto it = dbs_.begin(); it != dbs_.end(); ++it) { + if (*it == db) { + dbs_.erase(it); + break; + } + } + + if (wait_for_removal) { wait_for_listener_destroy(dbs_.size()); } + } + + void wait_for_listener_destroy(uint64_t exp_listeners) { + while (true) { + auto total_listeners = g_helper->num_listeners(); + if (total_listeners == exp_listeners) { break; } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } + + void restart_replica(uint16_t replica, uint32_t shutdown_delay_sec = 5u) { + if (g_helper->replica_num() == replica) { + LOGINFO("Restart homestore: replica_num = {}", replica); + g_helper->restart(shutdown_delay_sec); + // g_helper->sync_for_test_start(); + } else { + LOGINFO("Wait for replica={} to completely go down and removed from alive raft-groups", replica); + std::this_thread::sleep_for(std::chrono::seconds{5}); + } + } + + void shutdown_replica(uint16_t replica) { + if (g_helper->replica_num() == replica) { + LOGINFO("Shutdown homestore: replica_num = {}", replica); + g_helper->shutdown(); + } else { + LOGINFO("Wait for replica={} to completely go down and removed from alive raft-groups", replica); + std::this_thread::sleep_for(std::chrono::seconds{5}); + } + } + + void start_replica(uint16_t replica) { + if (g_helper->replica_num() == replica) { + LOGINFO("Start homestore: replica_num = {}", replica); + g_helper->start(); + } + } + + void create_snapshot() { dbs_[0]->create_snapshot(); } + void truncate(int num_reserved_entries) { dbs_[0]->truncate(num_reserved_entries); } + + void replace_member(std::shared_ptr< TestReplicatedDB > db, replica_id_t member_out, replica_id_t member_in) { + this->run_on_leader(db, [this, db, member_out, member_in]() { + LOGINFO("Replace member out={} in={}", boost::uuids::to_string(member_out), + boost::uuids::to_string(member_in)); + auto v = hs()->repl_service().replace_member(db->repl_dev()->group_id(), member_out, member_in).get(); + ASSERT_EQ(v.hasError(), false) << "Error in replacing member"; + }); + } + +protected: + std::vector< std::shared_ptr< TestReplicatedDB > > dbs_; + uint32_t written_entries_{0}; + +#ifdef _PRERELEASE + flip::FlipClient m_fc{iomgr_flip::instance()}; +#endif +}; diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index caf28e0ee..9ccc40dfc 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -12,609 +12,9 @@ * specific language governing permissions and limitations under the License. * *********************************************************************************/ -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include "common/homestore_config.hpp" -#include "common/homestore_assert.hpp" -#include "common/homestore_utils.hpp" - -#define private public -#include "test_common/hs_repl_test_common.hpp" -#include "replication/service/raft_repl_service.h" -#include "replication/repl_dev/raft_repl_dev.h" - -using namespace homestore; - -SISL_LOGGING_DEF(test_raft_repl_dev) -SISL_LOGGING_INIT(HOMESTORE_LOG_MODS, nuraft_mesg) - -SISL_OPTION_GROUP(test_raft_repl_dev, - (block_size, "", "block_size", "block size to io", - ::cxxopts::value< uint32_t >()->default_value("4096"), "number"), - (num_raft_groups, "", "num_raft_groups", "number of raft groups per test", - ::cxxopts::value< uint32_t >()->default_value("1"), "number"), - // for below replication parameter, their default value always get from dynamic config, only used - // when specified by user - (snapshot_distance, "", "snapshot_distance", "distance between snapshots", - ::cxxopts::value< uint32_t >()->default_value("0"), "number"), - (num_raft_logs_resv, "", "num_raft_logs_resv", "number of raft logs reserved", - ::cxxopts::value< uint32_t >()->default_value("0"), "number"), - (res_mgr_audit_timer_ms, "", "res_mgr_audit_timer_ms", "resource manager audit timer", - ::cxxopts::value< uint32_t >()->default_value("0"), "number")); - -SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, config, test_common_setup, test_repl_common_setup) - -static std::unique_ptr< test_common::HSReplTestHelper > g_helper; -static std::random_device g_rd{}; -static std::default_random_engine g_re{g_rd()}; - -class TestReplicatedDB : public homestore::ReplDevListener { -public: - struct Key { - uint64_t id_; - bool operator<(Key const& other) const { return id_ < other.id_; } - }; - - struct Value { - int64_t lsn_; - uint64_t data_size_; - uint64_t data_pattern_; - MultiBlkId blkid_; - uint64_t id_; - }; - - struct KeyValuePair { - Key key; - Value value; - }; - - struct test_req : public repl_req_ctx { - struct journal_header { - uint64_t data_size; - uint64_t data_pattern; - }; - - journal_header jheader; - uint64_t key_id; - sisl::sg_list write_sgs; - sisl::sg_list read_sgs; - - sisl::blob header_blob() { return sisl::blob(uintptr_cast(&jheader), sizeof(journal_header)); } - sisl::blob key_blob() { return sisl::blob{uintptr_cast(&key_id), sizeof(uint64_t)}; } - - test_req() { - write_sgs.size = 0; - read_sgs.size = 0; - key_id = (uint64_t)rand() << 32 | rand(); - } - - ~test_req() { - for (auto const& iov : write_sgs.iovs) { - iomanager.iobuf_free(uintptr_cast(iov.iov_base)); - } - - for (auto const& iov : read_sgs.iovs) { - iomanager.iobuf_free(uintptr_cast(iov.iov_base)); - } - } - }; - - TestReplicatedDB() = default; - virtual ~TestReplicatedDB() = default; - - void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids, - cintrusive< repl_req_ctx >& ctx) override { - ASSERT_EQ(header.size(), sizeof(test_req::journal_header)); - - auto jheader = r_cast< test_req::journal_header const* >(header.cbytes()); - Key k{.id_ = *(r_cast< uint64_t const* >(key.cbytes()))}; - Value v{.lsn_ = lsn, - .data_size_ = jheader->data_size, - .data_pattern_ = jheader->data_pattern, - .blkid_ = blkids, - .id_ = k.id_}; - - LOGINFOMOD(replication, "[Replica={}] Received commit on lsn={} dsn={} key={} value[blkid={} pattern={}]", - g_helper->replica_num(), lsn, ctx->dsn(), k.id_, v.blkid_.to_string(), v.data_pattern_); - - { - std::unique_lock lk(db_mtx_); - inmem_db_.insert_or_assign(k, v); - lsn_index_.emplace(lsn, v); - last_data_committed_lsn = lsn; - ++commit_count_; - } - - if (ctx->is_proposer()) { g_helper->runner().next_task(); } - } - - bool on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, - cintrusive< repl_req_ctx >& ctx) override { - LOGINFOMOD(replication, "[Replica={}] Received pre-commit on lsn={} dsn={}", g_helper->replica_num(), lsn, - ctx->dsn()); - return true; - } - - void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, - cintrusive< repl_req_ctx >& ctx) override { - LOGINFOMOD(replication, "[Replica={}] Received rollback on lsn={}", g_helper->replica_num(), lsn); - } - - void on_restart() { - LOGINFOMOD(replication, "restarted repl dev for [Replica={}] Group={}", g_helper->replica_num(), - boost::uuids::to_string(repl_dev()->group_id())); - } - - void on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key, - cintrusive< repl_req_ctx >& ctx) override { - LOGINFOMOD(replication, "[Replica={}] Received error={} on key={}", g_helper->replica_num(), enum_name(error), - *(r_cast< uint64_t const* >(key.cbytes()))); - } - - AsyncReplResult<> create_snapshot(shared< snapshot_context > context) override { - std::lock_guard< std::mutex > lock(m_snapshot_lock); - auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); - LOGINFOMOD(replication, "[Replica={}] Got snapshot callback term={} idx={}", g_helper->replica_num(), - s->get_last_log_term(), s->get_last_log_idx()); - m_last_snapshot = context; - return make_async_success<>(); - } - - int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { - auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); - - if (snp_data->offset == 0) { - snp_data->is_last_obj = false; - snp_data->blob = sisl::io_blob_safe(sizeof(ulong)); - LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={}", - g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx()); - return 0; - } - - int64_t next_lsn = snp_data->offset; - std::vector< KeyValuePair > kv_snapshot_data; - // we can not use find to get the next element, since if the next lsn is a config lsn , it will not be put into - // lsn_index_ and as a result, the find will return the end of the map. so here we use lower_bound to get the - // first element to be read and transfered. - for (auto iter = lsn_index_.lower_bound(next_lsn); iter != lsn_index_.end(); iter++) { - auto& v = iter->second; - kv_snapshot_data.emplace_back(Key{v.id_}, v); - LOGTRACEMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} size={} pattern={}", - g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_); - if (kv_snapshot_data.size() >= 1000) { break; } - } - - if (kv_snapshot_data.size() == 0) { - snp_data->is_last_obj = true; - LOGINFOMOD(replication, "Snapshot is_last_obj is true"); - return 0; - } - - int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size(); - sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)}; - std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size); - snp_data->blob = std::move(blob); - snp_data->is_last_obj = false; - LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={} num_items={}", - g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), - kv_snapshot_data.size()); - - return 0; - } - - void snapshot_data_write(uint64_t data_size, uint64_t data_pattern, MultiBlkId& out_blkids) { - auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); - auto write_sgs = test_common::HSTestHelper::create_sgs(data_size, block_size, data_pattern); - auto fut = homestore::data_service().async_alloc_write(write_sgs, blk_alloc_hints{}, out_blkids); - std::move(fut).get(); - for (auto const& iov : write_sgs.iovs) { - iomanager.iobuf_free(uintptr_cast(iov.iov_base)); - } - } - - void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { - auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); - if (snp_data->offset == 0) { - snp_data->offset = last_data_committed_lsn + 1; - LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}", - g_helper->replica_num(), snp_data->offset); - return; - } - - size_t kv_snapshot_data_size = snp_data->blob.size(); - if (kv_snapshot_data_size == 0) return; - - size_t num_items = kv_snapshot_data_size / sizeof(KeyValuePair); - std::unique_lock lk(db_mtx_); - auto ptr = r_cast< const KeyValuePair* >(snp_data->blob.bytes()); - for (size_t i = 0; i < num_items; i++) { - auto key = ptr->key; - auto value = ptr->value; - LOGTRACEMOD(replication, "[Replica={}] Save logical snapshot got lsn={} data_size={} data_pattern={}", - g_helper->replica_num(), value.lsn_, value.data_size_, value.data_pattern_); - - // Write to data service and inmem map. - MultiBlkId out_blkids; - if (value.data_size_ != 0) { - snapshot_data_write(value.data_size_, value.data_pattern_, out_blkids); - value.blkid_ = out_blkids; - } - last_data_committed_lsn = value.lsn_; - inmem_db_.insert_or_assign(key, value); - ++commit_count_; - ptr++; - } - - LOGINFOMOD(replication, - "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={} num_items={}", - g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), - snp_data->is_last_obj, num_items); - - // before we finish install snapshot, raft_server()->get_committed_log_idx() will always be the same. so we need - // last_data_committed_lsn to notify leader to transfer new data to follower. - snp_data->offset = last_data_committed_lsn + 1; - } - - bool apply_snapshot(shared< snapshot_context > context) override { - std::lock_guard< std::mutex > lock(m_snapshot_lock); - auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); - LOGINFOMOD(replication, "[Replica={}] Apply snapshot term={} idx={}", g_helper->replica_num(), - s->get_last_log_term(), s->get_last_log_idx()); - m_last_snapshot = context; - return true; - } - - shared< snapshot_context > last_snapshot() override { - std::lock_guard< std::mutex > lock(m_snapshot_lock); - if (!m_last_snapshot) return nullptr; - - auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(m_last_snapshot)->nuraft_snapshot(); - LOGINFOMOD(replication, "[Replica={}] Last snapshot term={} idx={}", g_helper->replica_num(), - s->get_last_log_term(), s->get_last_log_idx()); - return m_last_snapshot; - } - - void free_user_snp_ctx(void*& user_snp_ctx) override {} - - ReplResult< blk_alloc_hints > get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) override { - return blk_alloc_hints{}; - } - - void on_destroy() override { - LOGINFOMOD(replication, "[Replica={}] Group={} is being destroyed", g_helper->replica_num(), - boost::uuids::to_string(repl_dev()->group_id())); - g_helper->unregister_listener(repl_dev()->group_id()); - } - - void db_write(uint64_t data_size, uint32_t max_size_per_iov) { - static std::atomic< uint32_t > s_uniq_num{0}; - auto req = intrusive< test_req >(new test_req()); - req->jheader.data_size = data_size; - req->jheader.data_pattern = ((long long)rand() << 32) | ++s_uniq_num; - auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); - - LOGINFOMOD(replication, "[Replica={}] Db write key={} data_size={} pattern={} block_size={}", - g_helper->replica_num(), req->key_id, data_size, req->jheader.data_pattern, block_size); - - if (data_size != 0) { - req->write_sgs = - test_common::HSTestHelper::create_sgs(data_size, max_size_per_iov, req->jheader.data_pattern); - } - - repl_dev()->async_alloc_write(req->header_blob(), req->key_blob(), req->write_sgs, req); - } - - void validate_db_data() { - g_helper->runner().set_num_tasks(inmem_db_.size()); - - LOGINFOMOD(replication, "[{}]: Total {} keys committed, validating them", - boost::uuids::to_string(repl_dev()->group_id()), inmem_db_.size()); - auto it = inmem_db_.begin(); - g_helper->runner().set_task([this, &it]() { - Key k; - Value v; - { - std::unique_lock lk(db_mtx_); - std::tie(k, v) = *it; - ++it; - } - - if (v.data_size_ != 0) { - auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); - auto read_sgs = test_common::HSTestHelper::create_sgs(v.data_size_, block_size); - - repl_dev()->async_read(v.blkid_, read_sgs, v.data_size_).thenValue([read_sgs, k, v](auto const ec) { - LOGINFOMOD(replication, "Validating key={} value[blkid={} pattern={}]", k.id_, v.blkid_.to_string(), - v.data_pattern_); - RELEASE_ASSERT(!ec, "Read of blkid={} for key={} error={}", v.blkid_.to_string(), k.id_, - ec.message()); - for (auto const& iov : read_sgs.iovs) { - test_common::HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, - v.data_pattern_); - iomanager.iobuf_free(uintptr_cast(iov.iov_base)); - } - g_helper->runner().next_task(); - }); - } else { - g_helper->runner().next_task(); - } - }); - g_helper->runner().execute().get(); - } +#include "test_common/raft_repl_test_base.hpp" - uint64_t db_commit_count() const { - std::shared_lock lk(db_mtx_); - return commit_count_; - } - - uint64_t db_size() const { - std::shared_lock lk(db_mtx_); - return inmem_db_.size(); - } - - void create_snapshot() { - auto raft_repl_dev = std::dynamic_pointer_cast< RaftReplDev >(repl_dev()); - ulong snapshot_idx = raft_repl_dev->raft_server()->create_snapshot(); - LOGINFO("Manually create snapshot got index {}", snapshot_idx); - } - - void truncate(int num_reserved_entries) { - auto raft_repl_dev = std::dynamic_pointer_cast< RaftReplDev >(repl_dev()); - raft_repl_dev->truncate(num_reserved_entries); - LOGINFO("Manually truncated"); - } - - void set_zombie() { zombie_ = true; } - bool is_zombie() { - // Wether a group is zombie(non recoverable) - return zombie_; - } - -private: - std::map< Key, Value > inmem_db_; - std::map< int64_t, Value > lsn_index_; - uint64_t commit_count_{0}; - // this is the last lsn for data, might not be the same with the real last committed lsn - // which should be get by raft_server()->get_committed_log_idx() - uint64_t last_data_committed_lsn{0}; - std::shared_mutex db_mtx_; - std::shared_ptr< snapshot_context > m_last_snapshot{nullptr}; - std::mutex m_snapshot_lock; - bool zombie_{false}; -}; - -class RaftReplDevTest : public testing::Test { -public: - void SetUp() override { - // By default it will create one db - for (uint32_t i{0}; i < SISL_OPTIONS["num_raft_groups"].as< uint32_t >(); ++i) { - auto db = std::make_shared< TestReplicatedDB >(); - g_helper->register_listener(db); - dbs_.emplace_back(std::move(db)); - } - } - - void TearDown() override { - for (auto const& db : dbs_) { - if (db->is_zombie()) { continue; } - run_on_leader(db, [this, db]() { - auto err = hs()->repl_service().remove_repl_dev(db->repl_dev()->group_id()).get(); - ASSERT_EQ(err, ReplServiceError::OK) << "Error in destroying the group"; - }); - } - - for (auto const& db : dbs_) { - if (db->is_zombie()) { continue; } - auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev()); - int i = 0; - bool force_leave = false; - do { - std::this_thread::sleep_for(std::chrono::seconds(1)); - auto& raft_repl_svc = dynamic_cast< RaftReplService& >(hs()->repl_service()); - raft_repl_svc.gc_repl_devs(); - LOGINFO("Waiting for repl dev to get destroyed"); - - // TODO: if leader is destroyed, but the follower does not receive the notification, it will not be - // destroyed for ever. we need handle this in raft_repl_dev. revisit here after making changes at - // raft_repl_dev side to hanle this case. this is a workaround to avoid the infinite loop for now. - if (i++ > 10 && !force_leave) { - LOGWARN("has already waited for repl dev to get destroyed for 10 times, so do a force leave"); - repl_dev->force_leave(); - force_leave = true; - } - - } while (!repl_dev->is_destroyed()); - } - } - - void generate_writes(uint64_t data_size, uint32_t max_size_per_iov, shared< TestReplicatedDB > db = nullptr) { - if (db == nullptr) { db = pick_one_db(); } - // LOGINFO("Writing on group_id={}", db->repl_dev()->group_id()); - db->db_write(data_size, max_size_per_iov); - } - - void wait_for_all_commits() { wait_for_commits(written_entries_); } - - void wait_for_commits(uint64_t exp_writes) { - uint64_t total_writes{0}; - while (true) { - total_writes = 0; - for (auto const& db : dbs_) { - total_writes += db->db_commit_count(); - } - - if (total_writes >= exp_writes) { break; } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - LOGINFO("Replica={} has received {} commits as expected", g_helper->replica_num(), total_writes); - } - - void validate_data() { - for (auto const& db : dbs_) { - db->validate_db_data(); - } - } - - shared< TestReplicatedDB > pick_one_db() { return dbs_[0]; } - - void assign_leader(uint16_t replica) { - LOGINFO("Switch the leader to replica_num = {}", replica); - if (g_helper->replica_num() == replica) { - for (auto const& db : dbs_) { - do { - auto result = db->repl_dev()->become_leader().get(); - if (result.hasError()) { - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - } else { - break; - } - } while (true); - } - } else { - for (auto const& db : dbs_) { - homestore::replica_id_t leader_uuid; - while (true) { - leader_uuid = db->repl_dev()->get_leader_id(); - if (!leader_uuid.is_nil() && (g_helper->member_id(leader_uuid) == replica)) { break; } - - LOGINFO("Waiting for replica={} to become leader", replica); - std::this_thread::sleep_for(std::chrono::milliseconds{500}); - } - } - } - } - - void run_on_leader(std::shared_ptr< TestReplicatedDB > db, auto&& lambda) { - do { - auto leader_uuid = db->repl_dev()->get_leader_id(); - - if (leader_uuid.is_nil()) { - LOGINFO("Waiting for leader to be elected for group={}", db->repl_dev()->group_id()); - std::this_thread::sleep_for(std::chrono::milliseconds{500}); - } else if (leader_uuid == g_helper->my_replica_id()) { - lambda(); - break; - } else { - break; - } - } while (true); - } - - void write_on_leader(uint32_t num_entries, bool wait_for_commit = true, shared< TestReplicatedDB > db = nullptr) { - do { - auto leader_uuid = dbs_[0]->repl_dev()->get_leader_id(); - - if (leader_uuid.is_nil()) { - LOGINFO("Waiting for leader to be elected"); - std::this_thread::sleep_for(std::chrono::milliseconds{500}); - } else if (leader_uuid == g_helper->my_replica_id()) { - LOGINFO("Writing {} entries since I am the leader my_uuid={}", num_entries, - boost::uuids::to_string(g_helper->my_replica_id())); - auto const block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); - g_helper->runner().set_num_tasks(num_entries); - - LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); - g_helper->runner().set_task([this, block_size, db]() { - static std::normal_distribution<> num_blks_gen{3.0, 2.0}; - this->generate_writes(std::abs(std::lround(num_blks_gen(g_re))) * block_size, block_size, db); - }); - if (wait_for_commit) { g_helper->runner().execute().get(); } - break; - } else { - LOGINFO("{} entries were written on the leader_uuid={} my_uuid={}", num_entries, - boost::uuids::to_string(leader_uuid), boost::uuids::to_string(g_helper->my_replica_id())); - break; - } - } while (true); - - written_entries_ += num_entries; - if (wait_for_commit) { this->wait_for_all_commits(); } - } - - void remove_db(std::shared_ptr< TestReplicatedDB > db, bool wait_for_removal) { - this->run_on_leader(db, [this, db]() { - auto err = hs()->repl_service().remove_repl_dev(db->repl_dev()->group_id()).get(); - ASSERT_EQ(err, ReplServiceError::OK) << "Error in destroying the group"; - }); - - // Remove the db from the dbs_ list and check if count matches with repl_device - for (auto it = dbs_.begin(); it != dbs_.end(); ++it) { - if (*it == db) { - dbs_.erase(it); - break; - } - } - - if (wait_for_removal) { wait_for_listener_destroy(dbs_.size()); } - } - - void wait_for_listener_destroy(uint64_t exp_listeners) { - while (true) { - auto total_listeners = g_helper->num_listeners(); - if (total_listeners == exp_listeners) { break; } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - } - - void restart_replica(uint16_t replica, uint32_t shutdown_delay_sec = 5u) { - if (g_helper->replica_num() == replica) { - LOGINFO("Restart homestore: replica_num = {}", replica); - g_helper->restart(shutdown_delay_sec); - // g_helper->sync_for_test_start(); - } else { - LOGINFO("Wait for replica={} to completely go down and removed from alive raft-groups", replica); - std::this_thread::sleep_for(std::chrono::seconds{5}); - } - } - - void shutdown_replica(uint16_t replica) { - if (g_helper->replica_num() == replica) { - LOGINFO("Shutdown homestore: replica_num = {}", replica); - g_helper->shutdown(); - } else { - LOGINFO("Wait for replica={} to completely go down and removed from alive raft-groups", replica); - std::this_thread::sleep_for(std::chrono::seconds{5}); - } - } - - void start_replica(uint16_t replica) { - if (g_helper->replica_num() == replica) { - LOGINFO("Start homestore: replica_num = {}", replica); - g_helper->start(); - } - } - - void create_snapshot() { dbs_[0]->create_snapshot(); } - void truncate(int num_reserved_entries) { dbs_[0]->truncate(num_reserved_entries); } - -protected: - std::vector< std::shared_ptr< TestReplicatedDB > > dbs_; - uint32_t written_entries_{0}; - -#ifdef _PRERELEASE - flip::FlipClient m_fc{iomgr_flip::instance()}; -#endif -}; +class RaftReplDevTest : public RaftReplDevTestBase {}; TEST_F(RaftReplDevTest, Write_Restart_Write) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); @@ -1015,7 +415,6 @@ int main(int argc, char* argv[]) { // Snapshot and truncation tests needs num reserved to be 0 and distance 10. s.consensus.num_reserved_log_items = 0; - s.consensus.snapshot_freq_distance = 10; s.resource_limits.resource_audit_timer_ms = 0; // only reset when user specified the value for test; @@ -1033,7 +432,8 @@ int main(int argc, char* argv[]) { FLAGS_folly_global_cpu_executor_threads = 4; g_helper = std::make_unique< test_common::HSReplTestHelper >("test_raft_repl_dev", args, orig_argv); - g_helper->setup(); + // No spare replica's are created. Test cases in this file expects fixed number of replica's. + g_helper->setup(SISL_OPTIONS["replicas"].as< uint32_t >()); auto ret = RUN_ALL_TESTS(); g_helper->teardown(); diff --git a/src/tests/test_raft_repl_dev_dynamic.cpp b/src/tests/test_raft_repl_dev_dynamic.cpp new file mode 100644 index 000000000..7bd69a13c --- /dev/null +++ b/src/tests/test_raft_repl_dev_dynamic.cpp @@ -0,0 +1,133 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#include "test_common/raft_repl_test_base.hpp" + +// Dynamic tests spawn spare replica's also which can be used to add and remove from a repl dev. +class ReplDevDynamicTest : public RaftReplDevTestBase {}; + +TEST_F(ReplDevDynamicTest, ReplaceMember) { + // Write some IO's, replace a member, validate all members data except which is out. + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + auto db = dbs_.back(); + auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >(); + auto num_members = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >(); + uint64_t num_io_entries = SISL_OPTIONS["num_io"].as< uint64_t >(); + + // Replace the last member in the group with index(num_replicas - 1) with a spare + // replica with index (num_replica). Member id's are 0,...,num_replicas-1, num_replicas,...,N + uint32_t member_out = num_replicas - 1; + uint32_t member_in = num_replicas; + + g_helper->sync_for_test_start(num_members); + if (g_helper->replica_num() < num_replicas) { + // With existing raft repl dev group, write IO's, validate and call replace_member on leader. + LOGINFO("Writing on leader num_io={} replica={}", num_io_entries, g_helper->replica_num()); + this->write_on_leader(num_io_entries, true /* wait_for_commit */); + + replace_member(db, g_helper->replica_id(member_out), g_helper->replica_id(member_in)); + std::this_thread::sleep_for(std::chrono::seconds(3)); + } else if (g_helper->replica_num() == member_in) { + LOGINFO("Wait for commits replica={}", g_helper->replica_num()); + wait_for_commits(num_io_entries); + } + + g_helper->sync_for_verify_start(num_members); + LOGINFO("sync_for_verify_state replica={} ", g_helper->replica_num()); + if (g_helper->replica_num() != member_out) { + // Skip the member which is going to be replaced. Validate data on all other replica's. + LOGINFO("Validate all data written so far by reading them replica={}", g_helper->replica_num()); + this->validate_data(); + } else { + // The out member will have the repl dev destroyed. + auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev()); + do { + std::this_thread::sleep_for(std::chrono::seconds(1)); + auto& raft_repl_svc = dynamic_cast< RaftReplService& >(hs()->repl_service()); + raft_repl_svc.gc_repl_devs(); + LOGINFO("Waiting for repl dev to get destroyed on out member replica={}", g_helper->replica_num()); + } while (!repl_dev->is_destroyed()); + LOGINFO("Repl dev destroyed on out member replica={}", g_helper->replica_num()); + } + + g_helper->sync_for_cleanup_start(num_members); + LOGINFO("ReplaceMember test done"); +} + +// TODO add more tests with leader and member restart, multiple member replace +// leader replace, commit quorum + +int main(int argc, char* argv[]) { + int parsed_argc = argc; + char** orig_argv = argv; + + // Save the args for replica use + std::vector< std::string > args; + for (int i = 0; i < argc; ++i) { + args.emplace_back(argv[i]); + } + + ::testing::InitGoogleTest(&parsed_argc, argv); + + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, config, test_raft_repl_dev, iomgr, test_common_setup, + test_repl_common_setup); + + // + // Entire test suite assumes that once a replica takes over as leader, it stays until it is explicitly yielded. + // Otherwise it is very hard to control or accurately test behavior. Hence we forcibly override the + // leadership_expiry time. + // + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { + s.consensus.leadership_expiry_ms = -1; // -1 means never expires; + s.generic.repl_dev_cleanup_interval_sec = 1; + + // Disable implicit flush and timer. + s.logstore.flush_threshold_size = 0; + s.logstore.flush_timer_frequency_us = 0; + + // Snapshot and truncation tests needs num reserved to be 0 and distance 10. + s.consensus.num_reserved_log_items = 0; + s.resource_limits.resource_audit_timer_ms = 0; + + // only reset when user specified the value for test; + if (SISL_OPTIONS.count("snapshot_distance")) { + s.consensus.snapshot_freq_distance = SISL_OPTIONS["snapshot_distance"].as< uint32_t >(); + } + if (SISL_OPTIONS.count("num_raft_logs_resv")) { + s.resource_limits.raft_logstore_reserve_threshold = SISL_OPTIONS["num_raft_logs_resv"].as< uint32_t >(); + } + if (SISL_OPTIONS.count("res_mgr_audit_timer_ms")) { + s.resource_limits.resource_audit_timer_ms = SISL_OPTIONS["res_mgr_audit_timer_ms"].as< uint32_t >(); + } + }); + HS_SETTINGS_FACTORY().save(); + + FLAGS_folly_global_cpu_executor_threads = 4; + g_helper = std::make_unique< test_common::HSReplTestHelper >("test_raft_repl_dev_dynamic", args, orig_argv); + + // We spawn spare replica's also for dynamic repl dev tests. + auto total_replicas = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >(); + g_helper->setup(total_replicas); + + auto ret = RUN_ALL_TESTS(); + g_helper->teardown(); + + std::string str; + sisl::ObjCounterRegistry::foreach ([&str](const std::string& name, int64_t created, int64_t alive) { + fmt::format_to(std::back_inserter(str), "{}: created={} alive={}\n", name, created, alive); + }); + LOGINFO("Object Life Counter\n:{}", str); + + return ret; +} diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index 9367d64f0..c26ba273d 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -136,6 +136,7 @@ class SoloReplDevTest : public testing::Test { cintrusive< repl_req_ctx >& ctx) override { LOGINFO("Received error={} on repl_dev", enum_name(error)); } + void replace_member(replica_id_t member_out, replica_id_t member_in) override {} void on_destroy() override {} };