diff --git a/conanfile.py b/conanfile.py index be8689f9b..524cd6a1d 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.61" + version = "6.4.62" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/replication/repl_decls.h b/src/include/homestore/replication/repl_decls.h index ac15a53af..24c6a7571 100644 --- a/src/include/homestore/replication/repl_decls.h +++ b/src/include/homestore/replication/repl_decls.h @@ -19,17 +19,18 @@ namespace homestore { VENUM(ReplServiceError, int32_t, OK = 0, // Everything OK CANCELLED = -1, // Request was cancelled - TIMEOUT = -2, - NOT_LEADER = -3, - BAD_REQUEST = -4, - SERVER_ALREADY_EXISTS = -5, + TIMEOUT = -2, + NOT_LEADER = -3, + BAD_REQUEST = -4, + SERVER_ALREADY_EXISTS = -5, CONFIG_CHANGING = -6, - SERVER_IS_JOINING = -7, - SERVER_NOT_FOUND = -8, - CANNOT_REMOVE_LEADER = -9, + SERVER_IS_JOINING = -7, + SERVER_NOT_FOUND = -8, + CANNOT_REMOVE_LEADER = -9, SERVER_IS_LEAVING = -10, - TERM_MISMATCH = -11, - RESULT_NOT_EXIST_YET = -10000, + TERM_MISMATCH = -11, + RETRY_REQUEST = -12, + RESULT_NOT_EXIST_YET = -10000, NOT_IMPLEMENTED = -10001, NO_SPACE_LEFT = -20000, DRIVE_WRITE_ERROR = -20001, diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 9965ada5d..15dc4872a 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -42,7 +42,8 @@ VENUM(repl_req_state_t, uint32_t, VENUM(journal_type_t, uint16_t, HS_DATA_LINKED = 0, // Linked data where each entry will store physical blkid where data reside HS_DATA_INLINED = 1, // Data is inlined in the header of journal entry - HS_CTRL_DESTROY = 2 // Control message to destroy the repl_dev + HS_CTRL_DESTROY = 2, // Control message to destroy the repl_dev + HS_CTRL_REPLACE = 3, // Control message to replace a member ) struct repl_key { @@ -346,6 +347,9 @@ class ReplDevListener { /// after restart in case crash happened during the destroy. virtual void on_destroy() = 0; + /// @brief Called when replace member is performed. + virtual void replace_member(replica_id_t member_out, replica_id_t member_in) = 0; + /// @brief Called when the snapshot is being created by nuraft virtual AsyncReplResult<> create_snapshot(shared< snapshot_context > context) = 0; diff --git a/src/lib/replication/repl_dev/common.cpp b/src/lib/replication/repl_dev/common.cpp index 71927a3ad..b8800afea 100644 --- a/src/lib/replication/repl_dev/common.cpp +++ b/src/lib/replication/repl_dev/common.cpp @@ -192,9 +192,10 @@ std::string repl_req_ctx::to_string() const { } std::string repl_req_ctx::to_compact_string() const { - if (m_op_code == journal_type_t::HS_CTRL_DESTROY) { + if (m_op_code == journal_type_t::HS_CTRL_DESTROY || m_op_code == journal_type_t::HS_CTRL_REPLACE) { return fmt::format("term={} lsn={} op={}", m_rkey.term, m_lsn, enum_name(m_op_code)); } + return fmt::format("dsn={} term={} lsn={} op={} local_blkid={} state=[{}]", m_rkey.dsn, m_rkey.term, m_lsn, enum_name(m_op_code), m_local_blkid.to_string(), req_state_name(uint32_cast(state()))); } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 45a018d92..ee7cdec75 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -107,13 +107,79 @@ bool RaftReplDev::join_group() { m_msg_mgr.join_group(m_group_id, "homestore_replication", std::dynamic_pointer_cast< nuraft_mesg::mesg_state_mgr >(shared_from_this())); if (!raft_result) { - HS_DBG_ASSERT(false, "Unable to join the group_id={} with error={}", boost::uuids::to_string(m_group_id), - raft_result.error()); + HS_DBG_ASSERT(false, "Unable to join the group_id={} with error={}", group_id_str(), raft_result.error()); return false; } return true; } +AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, replica_id_t member_in_uuid) { + LOGINFO("Replace member group_id={} member_out={} member_in={}", group_id_str(), + boost::uuids::to_string(member_out_uuid), boost::uuids::to_string(member_in_uuid)); + + // Step 1: Check if leader itself is requested to move out. + if (m_my_repl_id == member_out_uuid && m_my_repl_id == get_leader_id()) { + // If leader is the member requested to move out, then give up leadership and return error. + // Client will retry replace_member request to the new leader. + raft_server()->yield_leadership(true /* immediate */, -1 /* successor */); + LOGINFO("Replace member leader is the member_out so yield leadership"); + return make_async_error<>(ReplServiceError::NOT_LEADER); + } + + // Step 2. Add the new member. + return m_msg_mgr.add_member(m_group_id, member_in_uuid) + .via(&folly::InlineExecutor::instance()) + .thenValue([this, member_in_uuid, member_out_uuid](auto&& e) -> AsyncReplResult<> { + // TODO Currently we ignore the cancelled, fix nuraft_mesg to not timeout + // when adding member. Member is added to cluster config until member syncs fully + // with atleast stop gap. This will take a lot of time for block or + // object storage. + if (e.hasError() && e.error() != nuraft::cmd_result_code::CANCELLED) { + RD_LOGE("Replace member error in add member : {}", e.error()); + return make_async_error<>(RaftReplService::to_repl_error(e.error())); + } + auto member_out = boost::uuids::to_string(member_out_uuid); + auto member_in = boost::uuids::to_string(member_in_uuid); + + LOGINFO("Replace member added member={} to group_id={}", member_in, group_id_str()); + + // Step 3. Append log entry to mark the old member is out and new member is added. + auto rreq = repl_req_ptr_t(new repl_req_ctx{}); + replace_members_ctx members; + std::copy(member_in_uuid.begin(), member_in_uuid.end(), members.in.begin()); + std::copy(member_out_uuid.begin(), member_out_uuid.end(), members.out.begin()); + sisl::blob header(r_cast< uint8_t* >(&members), members.in.size() + members.out.size()); + rreq->init( + repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)}, + journal_type_t::HS_CTRL_REPLACE, true, header, sisl::blob{}, 0); + + auto err = m_state_machine->propose_to_raft(std::move(rreq)); + if (err != ReplServiceError::OK) { + LOGERROR("Replace member propose to raft failed {}", err); + return make_async_error<>(std::move(err)); + } + + LOGINFO("Replace member proposed to raft group_id={}", group_id_str()); + + // Step 4. Remove the old member. Even if the old member is temporarily + // down and recovers, nuraft mesg see member remove from cluster log + // entry and call exit_group() and leave(). + return m_msg_mgr.rem_member(m_group_id, member_out_uuid) + .via(&folly::InlineExecutor::instance()) + .thenValue([this, member_out](auto&& e) -> AsyncReplResult<> { + if (e.hasError()) { + // Its ok to retry this request as the request + // of replace member is idempotent. + RD_LOGE("Replace member failed to remove member : {}", e.error()); + return make_async_error<>(ReplServiceError::RETRY_REQUEST); + } else { + LOGINFO("Replace member removed member={} from group_id={}", member_out, group_id_str()); + } + return make_async_success<>(); + }); + }); +} + folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() { // Set the intent to destroy the group m_stage.update([](auto* stage) { *stage = repl_dev_stage_t::DESTROYING; }); @@ -141,7 +207,7 @@ folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() { LOGERROR("RaftReplDev::destroy_group failed {}", err); } - LOGINFO("Raft repl dev destroy_group={}", boost::uuids::to_string(m_group_id)); + LOGINFO("Raft repl dev destroy_group={}", group_id_str()); return m_destroy_promise.getSemiFuture(); } @@ -786,6 +852,8 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) { RD_LOGD("Raft channel: Commit rreq=[{}]", rreq->to_string()); if (rreq->op_code() == journal_type_t::HS_CTRL_DESTROY) { leave(); + } else if (rreq->op_code() == journal_type_t::HS_CTRL_REPLACE) { + replace_member(rreq); } else { m_listener->on_commit(rreq->lsn(), rreq->header(), rreq->key(), rreq->local_blkid(), rreq); } @@ -820,7 +888,8 @@ void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err) blkid.to_string()); }); } - } else if (rreq->op_code() == journal_type_t::HS_CTRL_DESTROY) { + } else if (rreq->op_code() == journal_type_t::HS_CTRL_DESTROY || + rreq->op_code() == journal_type_t::HS_CTRL_REPLACE) { if (rreq->is_proposer()) { m_destroy_promise.setValue(err); } } @@ -836,6 +905,17 @@ void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err) rreq->clear(); } +void RaftReplDev::replace_member(repl_req_ptr_t rreq) { + auto members = r_cast< const replace_members_ctx* >(rreq->header().cbytes()); + replica_id_t member_in, member_out; + std::copy(members->out.begin(), members->out.end(), member_out.begin()); + std::copy(members->in.begin(), members->in.end(), member_in.begin()); + RD_LOGI("Raft repl replace_member member_out={} member_in={}", boost::uuids::to_string(member_out), + boost::uuids::to_string(member_in)); + + m_listener->replace_member(member_out, member_in); +} + static bool blob_equals(sisl::blob const& a, sisl::blob const& b) { if (a.size() != b.size()) { return false; } return (std::memcmp(a.cbytes(), b.cbytes(), a.size()) == 0); @@ -971,12 +1051,14 @@ void RaftReplDev::save_config(const nuraft::cluster_config& config) { std::unique_lock lg{m_config_mtx}; (*m_raft_config_sb)["config"] = serialize_cluster_config(config); m_raft_config_sb.write(); + RD_LOGI("Saved config {}", (*m_raft_config_sb)["config"].dump()); } void RaftReplDev::save_state(const nuraft::srv_state& state) { std::unique_lock lg{m_config_mtx}; (*m_raft_config_sb)["state"] = nlohmann::json{{"term", state.get_term()}, {"voted_for", state.get_voted_for()}}; m_raft_config_sb.write(); + RD_LOGI("Saved state {}", (*m_raft_config_sb)["state"].dump()); } nuraft::ptr< nuraft::srv_state > RaftReplDev::read_state() { @@ -1013,7 +1095,7 @@ uint32_t RaftReplDev::get_logstore_id() const { return m_data_journal->logstore_ std::shared_ptr< nuraft::state_machine > RaftReplDev::get_state_machine() { return m_state_machine; } void RaftReplDev::permanent_destroy() { - RD_LOGI("Permanent destroy for raft repl dev"); + RD_LOGI("Permanent destroy for raft repl dev group_id={}", group_id_str()); m_rd_sb.destroy(); m_raft_config_sb.destroy(); m_data_journal->remove_store(); @@ -1035,7 +1117,7 @@ void RaftReplDev::leave() { m_rd_sb->destroy_pending = 0x1; m_rd_sb.write(); - RD_LOGI("RaftReplDev leave group"); + RD_LOGI("RaftReplDev leave group_id={}", group_id_str()); m_destroy_promise.setValue(ReplServiceError::OK); // In case proposer is waiting for the destroy to complete } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 41594b528..e533bfa7e 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -35,6 +35,11 @@ using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >; ENUM(repl_dev_stage_t, uint8_t, INIT, ACTIVE, DESTROYING, DESTROYED, PERMANENT_DESTROYED); +struct replace_members_ctx { + std::array< uint8_t, 16 > out; + std::array< uint8_t, 16 > in; +}; + class RaftReplDevMetrics : public sisl::MetricsGroup { public: explicit RaftReplDevMetrics(const char* inst_name) : sisl::MetricsGroup("RaftReplDev", inst_name) { @@ -150,6 +155,7 @@ class RaftReplDev : public ReplDev, virtual ~RaftReplDev() = default; bool join_group(); + AsyncReplResult<> replace_member(replica_id_t member_out, replica_id_t member_in); folly::SemiFuture< ReplServiceError > destroy_group(); //////////////// All ReplDev overrides/implementation /////////////////////// @@ -268,6 +274,7 @@ class RaftReplDev : public ReplDev, bool wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms); void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx); void commit_blk(repl_req_ptr_t rreq); + void replace_member(repl_req_ptr_t rreq); }; } // namespace homestore diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index 65d928390..74225a506 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -93,7 +93,10 @@ void RaftReplService::start() { .with_hb_interval(HS_DYNAMIC_CONFIG(consensus.heartbeat_period_ms)) .with_max_append_size(HS_DYNAMIC_CONFIG(consensus.max_append_batch_size)) .with_log_sync_batch_size(HS_DYNAMIC_CONFIG(consensus.log_sync_batch_size)) + // TODO fix the log_gap thresholds when adding new member. +#if 0 .with_log_sync_stopping_gap(HS_DYNAMIC_CONFIG(consensus.min_log_gap_to_join)) +#endif .with_stale_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_hi_threshold)) .with_fresh_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_lo_threshold)) .with_snapshot_enabled(HS_DYNAMIC_CONFIG(consensus.snapshot_freq_distance)) @@ -327,7 +330,16 @@ void RaftReplService::load_repl_dev(sisl::byte_view const& buf, void* meta_cooki AsyncReplResult<> RaftReplService::replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in) const { - return make_async_error<>(ReplServiceError::NOT_IMPLEMENTED); + auto rdev_result = get_repl_dev(group_id); + if (!rdev_result) { return make_async_error<>(ReplServiceError::SERVER_NOT_FOUND); } + + return std::dynamic_pointer_cast< RaftReplDev >(rdev_result.value()) + ->replace_member(member_out, member_in) + .via(&folly::InlineExecutor::instance()) + .thenValue([this](auto&& e) mutable { + if (e.hasError()) { return make_async_error<>(e.error()); } + return make_async_success<>(); + }); } ////////////////////// Reaper Thread related ////////////////////////////////// diff --git a/src/lib/replication/service/raft_repl_service.h b/src/lib/replication/service/raft_repl_service.h index a38cbbccb..cba90e2e0 100644 --- a/src/lib/replication/service/raft_repl_service.h +++ b/src/lib/replication/service/raft_repl_service.h @@ -80,7 +80,6 @@ class RaftReplService : public GenericReplService, void gc_repl_devs(); void gc_repl_reqs(); void flush_durable_commit_lsn(); - }; class RaftReplServiceCPHandler : public CPCallbacks { diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index ddbac4c94..ce8ccb422 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -118,6 +118,10 @@ if (${io_tests}) target_sources(test_raft_repl_dev PRIVATE test_raft_repl_dev.cpp) target_link_libraries(test_raft_repl_dev homestore ${COMMON_TEST_DEPS} GTest::gmock) + add_executable(test_raft_repl_dev_dynamic) + target_sources(test_raft_repl_dev_dynamic PRIVATE test_raft_repl_dev_dynamic.cpp) + target_link_libraries(test_raft_repl_dev_dynamic homestore ${COMMON_TEST_DEPS} GTest::gmock) + can_build_epoll_io_tests(epoll_tests) if(${epoll_tests}) add_test(NAME LogDev-Epoll COMMAND test_log_dev) @@ -126,6 +130,7 @@ if (${io_tests}) add_test(NAME MetaBlkMgr-Epoll COMMAND test_meta_blk_mgr) add_test(NAME DataService-Epoll COMMAND test_data_service) add_test(NAME RaftReplDev-Epoll COMMAND test_raft_repl_dev) + add_test(NAME RaftReplDevDynamic-Epoll COMMAND test_raft_repl_dev_dynamic) # add_test(NAME SoloReplDev-Epoll COMMAND test_solo_repl_dev) endif() @@ -138,6 +143,7 @@ if (${io_tests}) add_test(NAME SoloReplDev-Spdk COMMAND test_solo_repl_dev -- --spdk "true") add_test(NAME HomeRaftLogStore-Spdk COMMAND test_home_raft_logstore -- --spdk "true") add_test(NAME RaftReplDev-Spdk COMMAND test_raft_repl_dev -- --spdk "true") + add_test(NAME RaftReplDevDynamic-Spdk COMMAND test_raft_repl_dev_dynamic -- --spdk "true") if(${epoll_tests}) SET_TESTS_PROPERTIES(MetaBlkMgr-Spdk PROPERTIES DEPENDS LogStore-Spdk) SET_TESTS_PROPERTIES(DataService-Spdk PROPERTIES DEPENDS MetaBlkMgr-Spdk) diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index 67abe2f8e..672acffcb 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -38,6 +38,8 @@ SISL_OPTION_GROUP(test_repl_common_setup, (replicas, "", "replicas", "Total number of replicas", ::cxxopts::value< uint32_t >()->default_value("3"), "number"), + (spare_replicas, "", "spare_replicas", "Additional number of spare replicas not part of repldev", + ::cxxopts::value< uint32_t >()->default_value("1"), "number"), (base_port, "", "base_port", "Port number of first replica", ::cxxopts::value< uint16_t >()->default_value("4000"), "number"), (replica_num, "", "replica_num", @@ -134,11 +136,12 @@ class HSReplTestHelper : public HSTestHelper { HSReplTestHelper(std::string const& name, std::vector< std::string > const& args, char** argv) : name_{name}, args_{args}, argv_{argv} {} - void setup() { + void setup(uint32_t num_replicas) { + num_replicas_ = num_replicas; replica_num_ = SISL_OPTIONS["replica_num"].as< uint16_t >(); + sisl::logging::SetLogger(name_ + std::string("_replica_") + std::to_string(replica_num_)); sisl::logging::SetLogPattern("[%D %T%z] [%^%L%$] [%n] [%t] %v"); - auto const num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >(); boost::uuids::string_generator gen; for (uint32_t i{0}; i < num_replicas; ++i) { @@ -226,7 +229,7 @@ class HSReplTestHelper : public HSTestHelper { void reset_setup() { teardown(); - setup(); + setup(num_replicas_); } void restart(uint32_t shutdown_delay_secs = 5u) { @@ -273,8 +276,12 @@ class HSReplTestHelper : public HSTestHelper { if (replica_num_ == 0) { std::set< homestore::replica_id_t > members; - std::transform(members_.begin(), members_.end(), std::inserter(members, members.end()), - [](auto const& p) { return p.first; }); + // By default we create repl dev with number of members equal to replicas argument. + // We dont add spare replica's to the group by default. + for (auto& m : members_) { + if (m.second < SISL_OPTIONS["replicas"].as< uint32_t >()) { members.insert(m.first); } + } + group_id_t repl_group_id = hs_utils::gen_random_uuid(); { std::unique_lock lg(groups_mtx_); @@ -299,6 +306,7 @@ class HSReplTestHelper : public HSTestHelper { auto listener = std::move(pending_listeners_[0]); repl_groups_.insert(std::pair(group_id, listener)); pending_listeners_.erase(pending_listeners_.begin()); + LOGINFO("Got listener for group_id={} replica={}", boost::uuids::to_string(group_id), replica_num_); return listener; } @@ -346,6 +354,7 @@ class HSReplTestHelper : public HSTestHelper { std::string name_; std::vector< std::string > args_; char** argv_; + uint32_t num_replicas_; std::vector< homestore::dev_info > dev_list_; diff --git a/src/tests/test_common/raft_repl_test_base.hpp b/src/tests/test_common/raft_repl_test_base.hpp new file mode 100644 index 000000000..7b96afa4c --- /dev/null +++ b/src/tests/test_common/raft_repl_test_base.hpp @@ -0,0 +1,629 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include "common/homestore_config.hpp" +#include "common/homestore_assert.hpp" +#include "common/homestore_utils.hpp" + +#define private public +#include "test_common/hs_repl_test_common.hpp" +#include "replication/service/raft_repl_service.h" +#include "replication/repl_dev/raft_repl_dev.h" + +using namespace homestore; + +SISL_LOGGING_DEF(test_raft_repl_dev) +SISL_LOGGING_INIT(HOMESTORE_LOG_MODS, nuraft_mesg) + +SISL_OPTION_GROUP(test_raft_repl_dev, + (block_size, "", "block_size", "block size to io", + ::cxxopts::value< uint32_t >()->default_value("4096"), "number"), + (num_raft_groups, "", "num_raft_groups", "number of raft groups per test", + ::cxxopts::value< uint32_t >()->default_value("1"), "number"), + // for below replication parameter, their default value always get from dynamic config, only used + // when specified by user + (snapshot_distance, "", "snapshot_distance", "distance between snapshots", + ::cxxopts::value< uint32_t >()->default_value("0"), "number"), + (num_raft_logs_resv, "", "num_raft_logs_resv", "number of raft logs reserved", + ::cxxopts::value< uint32_t >()->default_value("0"), "number"), + (res_mgr_audit_timer_ms, "", "res_mgr_audit_timer_ms", "resource manager audit timer", + ::cxxopts::value< uint32_t >()->default_value("0"), "number")); + +SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, config, test_common_setup, test_repl_common_setup) + +static std::unique_ptr< test_common::HSReplTestHelper > g_helper; +static std::random_device g_rd{}; +static std::default_random_engine g_re{g_rd()}; + +class TestReplicatedDB : public homestore::ReplDevListener { +public: + struct Key { + uint64_t id_; + bool operator<(Key const& other) const { return id_ < other.id_; } + }; + + struct Value { + int64_t lsn_; + uint64_t data_size_; + uint64_t data_pattern_; + MultiBlkId blkid_; + uint64_t id_; + }; + + struct KeyValuePair { + Key key; + Value value; + }; + + struct test_req : public repl_req_ctx { + struct journal_header { + uint64_t data_size; + uint64_t data_pattern; + }; + + journal_header jheader; + uint64_t key_id; + sisl::sg_list write_sgs; + sisl::sg_list read_sgs; + + sisl::blob header_blob() { return sisl::blob(uintptr_cast(&jheader), sizeof(journal_header)); } + sisl::blob key_blob() { return sisl::blob{uintptr_cast(&key_id), sizeof(uint64_t)}; } + + test_req() { + write_sgs.size = 0; + read_sgs.size = 0; + key_id = (uint64_t)rand() << 32 | rand(); + } + + ~test_req() { + for (auto const& iov : write_sgs.iovs) { + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + + for (auto const& iov : read_sgs.iovs) { + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + } + }; + + TestReplicatedDB() = default; + virtual ~TestReplicatedDB() = default; + + void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids, + cintrusive< repl_req_ctx >& ctx) override { + ASSERT_EQ(header.size(), sizeof(test_req::journal_header)); + + auto jheader = r_cast< test_req::journal_header const* >(header.cbytes()); + Key k{.id_ = *(r_cast< uint64_t const* >(key.cbytes()))}; + Value v{.lsn_ = lsn, + .data_size_ = jheader->data_size, + .data_pattern_ = jheader->data_pattern, + .blkid_ = blkids, + .id_ = k.id_}; + + LOGINFOMOD(replication, "[Replica={}] Received commit on lsn={} dsn={} key={} value[blkid={} pattern={}]", + g_helper->replica_num(), lsn, ctx->dsn(), k.id_, v.blkid_.to_string(), v.data_pattern_); + + { + std::unique_lock lk(db_mtx_); + inmem_db_.insert_or_assign(k, v); + lsn_index_.emplace(lsn, v); + last_committed_lsn = lsn; + ++commit_count_; + } + + if (ctx->is_proposer()) { g_helper->runner().next_task(); } + } + + bool on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) override { + LOGINFOMOD(replication, "[Replica={}] Received pre-commit on lsn={} dsn={}", g_helper->replica_num(), lsn, + ctx->dsn()); + return true; + } + + void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) override { + LOGINFOMOD(replication, "[Replica={}] Received rollback on lsn={}", g_helper->replica_num(), lsn); + } + + void on_restart() { + LOGINFOMOD(replication, "restarted repl dev for [Replica={}] Group={}", g_helper->replica_num(), + boost::uuids::to_string(repl_dev()->group_id())); + } + + void on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) override { + LOGINFOMOD(replication, "[Replica={}] Received error={} on key={}", g_helper->replica_num(), enum_name(error), + *(r_cast< uint64_t const* >(key.cbytes()))); + } + + AsyncReplResult<> create_snapshot(shared< snapshot_context > context) override { + std::lock_guard< std::mutex > lock(m_snapshot_lock); + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); + LOGINFOMOD(replication, "[Replica={}] Got snapshot callback term={} idx={}", g_helper->replica_num(), + s->get_last_log_term(), s->get_last_log_idx()); + m_last_snapshot = context; + return make_async_success<>(); + } + + int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); + + if (snp_data->offset == 0) { + snp_data->is_last_obj = false; + snp_data->blob = sisl::io_blob_safe(sizeof(ulong)); + LOGINFOMOD(replication, + "[Replica={}] Read logical snapshot callback first message obj_id={} term={} idx={}", + g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx()); + return 0; + } + + int64_t next_lsn = snp_data->offset; + std::vector< KeyValuePair > kv_snapshot_data; + // we can not use find to get the next element, since if the next lsn is a config lsn , it will not be put into + // lsn_index_ and as a result, the find will return the end of the map. so here we use lower_bound to get the + // first element to be read and transfered. + for (auto iter = lsn_index_.lower_bound(next_lsn); iter != lsn_index_.end(); iter++) { + auto& v = iter->second; + kv_snapshot_data.emplace_back(Key{v.id_}, v); + LOGTRACEMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} size={} pattern={}", + g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_); + if (kv_snapshot_data.size() >= 1000) { break; } + } + + if (kv_snapshot_data.size() == 0) { + snp_data->is_last_obj = true; + LOGINFOMOD(replication, "Snapshot is_last_obj is true"); + return 0; + } + + int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size(); + sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)}; + std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size); + snp_data->blob = std::move(blob); + snp_data->is_last_obj = false; + LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={} num_items={}", + g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), + kv_snapshot_data.size()); + + return 0; + } + + void snapshot_data_write(uint64_t data_size, uint64_t data_pattern, MultiBlkId& out_blkids) { + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + auto write_sgs = test_common::HSTestHelper::create_sgs(data_size, block_size, data_pattern); + auto fut = homestore::data_service().async_alloc_write(write_sgs, blk_alloc_hints{}, out_blkids); + std::move(fut).get(); + for (auto const& iov : write_sgs.iovs) { + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + } + + void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); + auto last_committed_idx = + std::dynamic_pointer_cast< RaftReplDev >(repl_dev())->raft_server()->get_committed_log_idx(); + if (snp_data->offset == 0) { + snp_data->offset = last_committed_lsn + 1; + LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}", + g_helper->replica_num(), snp_data->offset); + return; + } + + size_t kv_snapshot_data_size = snp_data->blob.size(); + if (kv_snapshot_data_size == 0) return; + + size_t num_items = kv_snapshot_data_size / sizeof(KeyValuePair); + std::unique_lock lk(db_mtx_); + auto ptr = r_cast< const KeyValuePair* >(snp_data->blob.bytes()); + for (size_t i = 0; i < num_items; i++) { + auto key = ptr->key; + auto value = ptr->value; + LOGTRACEMOD(replication, "[Replica={}] Save logical snapshot got lsn={} data_size={} data_pattern={}", + g_helper->replica_num(), value.lsn_, value.data_size_, value.data_pattern_); + + // Write to data service and inmem map. + MultiBlkId out_blkids; + if (value.data_size_ != 0) { + snapshot_data_write(value.data_size_, value.data_pattern_, out_blkids); + value.blkid_ = out_blkids; + } + inmem_db_.insert_or_assign(key, value); + last_committed_lsn = value.lsn_; + ++commit_count_; + ptr++; + } + + snp_data->offset = last_committed_lsn + 1; + LOGINFOMOD(replication, + "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={} num_items={}", + g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), + snp_data->is_last_obj, num_items); + } + + bool apply_snapshot(shared< snapshot_context > context) override { + std::lock_guard< std::mutex > lock(m_snapshot_lock); + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); + LOGINFOMOD(replication, "[Replica={}] Apply snapshot term={} idx={}", g_helper->replica_num(), + s->get_last_log_term(), s->get_last_log_idx()); + m_last_snapshot = context; + return true; + } + + shared< snapshot_context > last_snapshot() override { + std::lock_guard< std::mutex > lock(m_snapshot_lock); + if (!m_last_snapshot) return nullptr; + + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(m_last_snapshot)->nuraft_snapshot(); + LOGINFOMOD(replication, "[Replica={}] Last snapshot term={} idx={}", g_helper->replica_num(), + s->get_last_log_term(), s->get_last_log_idx()); + return m_last_snapshot; + } + + void free_user_snp_ctx(void*& user_snp_ctx) override {} + + ReplResult< blk_alloc_hints > get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) override { + return blk_alloc_hints{}; + } + void replace_member(replica_id_t member_out, replica_id_t member_in) override {} + + void on_destroy() override { + LOGINFOMOD(replication, "[Replica={}] Group={} is being destroyed", g_helper->replica_num(), + boost::uuids::to_string(repl_dev()->group_id())); + g_helper->unregister_listener(repl_dev()->group_id()); + } + + void db_write(uint64_t data_size, uint32_t max_size_per_iov) { + static std::atomic< uint32_t > s_uniq_num{0}; + auto req = intrusive< test_req >(new test_req()); + req->jheader.data_size = data_size; + req->jheader.data_pattern = ((long long)rand() << 32) | ++s_uniq_num; + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + + LOGINFOMOD(replication, "[Replica={}] Db write key={} data_size={} pattern={} block_size={}", + g_helper->replica_num(), req->key_id, data_size, req->jheader.data_pattern, block_size); + + if (data_size != 0) { + req->write_sgs = + test_common::HSTestHelper::create_sgs(data_size, max_size_per_iov, req->jheader.data_pattern); + } + + repl_dev()->async_alloc_write(req->header_blob(), req->key_blob(), req->write_sgs, req); + } + + void validate_db_data() { + g_helper->runner().set_num_tasks(inmem_db_.size()); + + LOGINFOMOD(replication, "[{}]: Total {} keys committed, validating them", + boost::uuids::to_string(repl_dev()->group_id()), inmem_db_.size()); + auto it = inmem_db_.begin(); + g_helper->runner().set_task([this, &it]() { + Key k; + Value v; + { + std::unique_lock lk(db_mtx_); + std::tie(k, v) = *it; + ++it; + } + + if (v.data_size_ != 0) { + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + auto read_sgs = test_common::HSTestHelper::create_sgs(v.data_size_, block_size); + + repl_dev()->async_read(v.blkid_, read_sgs, v.data_size_).thenValue([read_sgs, k, v](auto const ec) { + LOGINFOMOD(replication, "Validating key={} value[blkid={} pattern={}]", k.id_, v.blkid_.to_string(), + v.data_pattern_); + RELEASE_ASSERT(!ec, "Read of blkid={} for key={} error={}", v.blkid_.to_string(), k.id_, + ec.message()); + for (auto const& iov : read_sgs.iovs) { + test_common::HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, + v.data_pattern_); + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + g_helper->runner().next_task(); + }); + } else { + g_helper->runner().next_task(); + } + }); + g_helper->runner().execute().get(); + } + + uint64_t db_commit_count() const { + std::shared_lock lk(db_mtx_); + return commit_count_; + } + + uint64_t db_size() const { + std::shared_lock lk(db_mtx_); + return inmem_db_.size(); + } + + void create_snapshot() { + auto raft_repl_dev = std::dynamic_pointer_cast< RaftReplDev >(repl_dev()); + ulong snapshot_idx = raft_repl_dev->raft_server()->create_snapshot(); + LOGINFO("Manually create snapshot got index {}", snapshot_idx); + } + + void truncate(int num_reserved_entries) { + auto raft_repl_dev = std::dynamic_pointer_cast< RaftReplDev >(repl_dev()); + raft_repl_dev->truncate(num_reserved_entries); + LOGINFO("Manually truncated"); + } + + void set_zombie() { zombie_ = true; } + bool is_zombie() { + // Wether a group is zombie(non recoverable) + return zombie_; + } + +private: + std::map< Key, Value > inmem_db_; + std::map< int64_t, Value > lsn_index_; + uint64_t commit_count_{0}; + std::shared_mutex db_mtx_; + uint64_t last_committed_lsn{0}; + std::shared_ptr< snapshot_context > m_last_snapshot{nullptr}; + std::mutex m_snapshot_lock; + bool zombie_{false}; +}; + +class RaftReplDevTestBase : public testing::Test { +public: + void SetUp() override { + // By default it will create one db + for (uint32_t i{0}; i < SISL_OPTIONS["num_raft_groups"].as< uint32_t >(); ++i) { + auto db = std::make_shared< TestReplicatedDB >(); + g_helper->register_listener(db); + dbs_.emplace_back(std::move(db)); + } + } + + void TearDown() override { + for (auto const& db : dbs_) { + if (db->is_zombie()) { continue; } + run_on_leader(db, [this, db]() { + auto err = hs()->repl_service().remove_repl_dev(db->repl_dev()->group_id()).get(); + ASSERT_EQ(err, ReplServiceError::OK) << "Error in destroying the group"; + }); + } + + for (auto const& db : dbs_) { + if (db->is_zombie()) { continue; } + auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev()); + int i = 0; + bool force_leave = false; + do { + std::this_thread::sleep_for(std::chrono::seconds(1)); + auto& raft_repl_svc = dynamic_cast< RaftReplService& >(hs()->repl_service()); + raft_repl_svc.gc_repl_devs(); + LOGINFO("Waiting for repl dev to get destroyed"); + + // TODO: if leader is destroyed, but the follower does not receive the notification, it will not be + // destroyed for ever. we need handle this in raft_repl_dev. revisit here after making changes at + // raft_repl_dev side to hanle this case. this is a workaround to avoid the infinite loop for now. + if (i++ > 10 && !force_leave) { + LOGWARN("has already waited for repl dev to get destroyed for 10 times, so do a force leave"); + repl_dev->force_leave(); + force_leave = true; + } + + } while (!repl_dev->is_destroyed()); + } + } + + void generate_writes(uint64_t data_size, uint32_t max_size_per_iov, shared< TestReplicatedDB > db = nullptr) { + if (db == nullptr) { db = pick_one_db(); } + // LOGINFO("Writing on group_id={}", db->repl_dev()->group_id()); + db->db_write(data_size, max_size_per_iov); + } + + void wait_for_all_commits() { wait_for_commits(written_entries_); } + + void wait_for_commits(uint64_t exp_writes) { + uint64_t total_writes{0}; + while (true) { + total_writes = 0; + for (auto const& db : dbs_) { + total_writes += db->db_commit_count(); + } + + if (total_writes >= exp_writes) { break; } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + LOGINFO("Replica={} received {} commits but expected {}", g_helper->replica_num(), total_writes, + exp_writes); + } + LOGINFO("Replica={} has received {} commits as expected", g_helper->replica_num(), total_writes); + } + + void validate_data() { + for (auto const& db : dbs_) { + db->validate_db_data(); + } + } + + shared< TestReplicatedDB > pick_one_db() { return dbs_[0]; } + + void assign_leader(uint16_t replica) { + LOGINFO("Switch the leader to replica_num = {}", replica); + if (g_helper->replica_num() == replica) { + for (auto const& db : dbs_) { + do { + auto result = db->repl_dev()->become_leader().get(); + if (result.hasError()) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } else { + break; + } + } while (true); + } + } else { + for (auto const& db : dbs_) { + homestore::replica_id_t leader_uuid; + while (true) { + leader_uuid = db->repl_dev()->get_leader_id(); + if (!leader_uuid.is_nil() && (g_helper->member_id(leader_uuid) == replica)) { break; } + + LOGINFO("Waiting for replica={} to become leader", replica); + std::this_thread::sleep_for(std::chrono::milliseconds{500}); + } + } + } + } + + void run_on_leader(std::shared_ptr< TestReplicatedDB > db, auto&& lambda) { + do { + auto leader_uuid = db->repl_dev()->get_leader_id(); + + if (leader_uuid.is_nil()) { + LOGINFO("Waiting for leader to be elected for group={}", db->repl_dev()->group_id()); + std::this_thread::sleep_for(std::chrono::milliseconds{500}); + } else if (leader_uuid == g_helper->my_replica_id()) { + lambda(); + break; + } else { + break; + } + } while (true); + } + + void write_on_leader(uint32_t num_entries, bool wait_for_commit = true, shared< TestReplicatedDB > db = nullptr) { + do { + auto leader_uuid = dbs_[0]->repl_dev()->get_leader_id(); + + if (leader_uuid.is_nil()) { + LOGINFO("Waiting for leader to be elected"); + std::this_thread::sleep_for(std::chrono::milliseconds{500}); + } else if (leader_uuid == g_helper->my_replica_id()) { + LOGINFO("Writing {} entries since I am the leader my_uuid={}", num_entries, + boost::uuids::to_string(g_helper->my_replica_id())); + auto const block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + g_helper->runner().set_num_tasks(num_entries); + + LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); + g_helper->runner().set_task([this, block_size, db]() { + static std::normal_distribution<> num_blks_gen{3.0, 2.0}; + this->generate_writes(std::abs(std::lround(num_blks_gen(g_re))) * block_size, block_size, db); + }); + if (wait_for_commit) { g_helper->runner().execute().get(); } + break; + } else { + LOGINFO("{} entries were written on the leader_uuid={} my_uuid={}", num_entries, + boost::uuids::to_string(leader_uuid), boost::uuids::to_string(g_helper->my_replica_id())); + break; + } + } while (true); + + written_entries_ += num_entries; + if (wait_for_commit) { this->wait_for_all_commits(); } + } + + void remove_db(std::shared_ptr< TestReplicatedDB > db, bool wait_for_removal) { + this->run_on_leader(db, [this, db]() { + auto err = hs()->repl_service().remove_repl_dev(db->repl_dev()->group_id()).get(); + ASSERT_EQ(err, ReplServiceError::OK) << "Error in destroying the group"; + }); + + // Remove the db from the dbs_ list and check if count matches with repl_device + for (auto it = dbs_.begin(); it != dbs_.end(); ++it) { + if (*it == db) { + dbs_.erase(it); + break; + } + } + + if (wait_for_removal) { wait_for_listener_destroy(dbs_.size()); } + } + + void wait_for_listener_destroy(uint64_t exp_listeners) { + while (true) { + auto total_listeners = g_helper->num_listeners(); + if (total_listeners == exp_listeners) { break; } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } + + void restart_replica(uint16_t replica, uint32_t shutdown_delay_sec = 5u) { + if (g_helper->replica_num() == replica) { + LOGINFO("Restart homestore: replica_num = {}", replica); + g_helper->restart(shutdown_delay_sec); + // g_helper->sync_for_test_start(); + } else { + LOGINFO("Wait for replica={} to completely go down and removed from alive raft-groups", replica); + std::this_thread::sleep_for(std::chrono::seconds{5}); + } + } + + void shutdown_replica(uint16_t replica) { + if (g_helper->replica_num() == replica) { + LOGINFO("Shutdown homestore: replica_num = {}", replica); + g_helper->shutdown(); + } else { + LOGINFO("Wait for replica={} to completely go down and removed from alive raft-groups", replica); + std::this_thread::sleep_for(std::chrono::seconds{5}); + } + } + + void start_replica(uint16_t replica) { + if (g_helper->replica_num() == replica) { + LOGINFO("Start homestore: replica_num = {}", replica); + g_helper->start(); + } + } + + void create_snapshot() { dbs_[0]->create_snapshot(); } + void truncate(int num_reserved_entries) { dbs_[0]->truncate(num_reserved_entries); } + + void replace_member(std::shared_ptr< TestReplicatedDB > db, replica_id_t member_out, replica_id_t member_in) { + this->run_on_leader(db, [this, db, member_out, member_in]() { + LOGINFO("Replace member out={} in={}", boost::uuids::to_string(member_out), + boost::uuids::to_string(member_in)); + auto v = hs()->repl_service().replace_member(db->repl_dev()->group_id(), member_out, member_in).get(); + ASSERT_EQ(v.hasError(), false) << "Error in replacing member"; + }); + } + +protected: + std::vector< std::shared_ptr< TestReplicatedDB > > dbs_; + uint32_t written_entries_{0}; + +#ifdef _PRERELEASE + flip::FlipClient m_fc{iomgr_flip::instance()}; +#endif +}; diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index caf28e0ee..9ccc40dfc 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -12,609 +12,9 @@ * specific language governing permissions and limitations under the License. * *********************************************************************************/ -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include "common/homestore_config.hpp" -#include "common/homestore_assert.hpp" -#include "common/homestore_utils.hpp" - -#define private public -#include "test_common/hs_repl_test_common.hpp" -#include "replication/service/raft_repl_service.h" -#include "replication/repl_dev/raft_repl_dev.h" - -using namespace homestore; - -SISL_LOGGING_DEF(test_raft_repl_dev) -SISL_LOGGING_INIT(HOMESTORE_LOG_MODS, nuraft_mesg) - -SISL_OPTION_GROUP(test_raft_repl_dev, - (block_size, "", "block_size", "block size to io", - ::cxxopts::value< uint32_t >()->default_value("4096"), "number"), - (num_raft_groups, "", "num_raft_groups", "number of raft groups per test", - ::cxxopts::value< uint32_t >()->default_value("1"), "number"), - // for below replication parameter, their default value always get from dynamic config, only used - // when specified by user - (snapshot_distance, "", "snapshot_distance", "distance between snapshots", - ::cxxopts::value< uint32_t >()->default_value("0"), "number"), - (num_raft_logs_resv, "", "num_raft_logs_resv", "number of raft logs reserved", - ::cxxopts::value< uint32_t >()->default_value("0"), "number"), - (res_mgr_audit_timer_ms, "", "res_mgr_audit_timer_ms", "resource manager audit timer", - ::cxxopts::value< uint32_t >()->default_value("0"), "number")); - -SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, config, test_common_setup, test_repl_common_setup) - -static std::unique_ptr< test_common::HSReplTestHelper > g_helper; -static std::random_device g_rd{}; -static std::default_random_engine g_re{g_rd()}; - -class TestReplicatedDB : public homestore::ReplDevListener { -public: - struct Key { - uint64_t id_; - bool operator<(Key const& other) const { return id_ < other.id_; } - }; - - struct Value { - int64_t lsn_; - uint64_t data_size_; - uint64_t data_pattern_; - MultiBlkId blkid_; - uint64_t id_; - }; - - struct KeyValuePair { - Key key; - Value value; - }; - - struct test_req : public repl_req_ctx { - struct journal_header { - uint64_t data_size; - uint64_t data_pattern; - }; - - journal_header jheader; - uint64_t key_id; - sisl::sg_list write_sgs; - sisl::sg_list read_sgs; - - sisl::blob header_blob() { return sisl::blob(uintptr_cast(&jheader), sizeof(journal_header)); } - sisl::blob key_blob() { return sisl::blob{uintptr_cast(&key_id), sizeof(uint64_t)}; } - - test_req() { - write_sgs.size = 0; - read_sgs.size = 0; - key_id = (uint64_t)rand() << 32 | rand(); - } - - ~test_req() { - for (auto const& iov : write_sgs.iovs) { - iomanager.iobuf_free(uintptr_cast(iov.iov_base)); - } - - for (auto const& iov : read_sgs.iovs) { - iomanager.iobuf_free(uintptr_cast(iov.iov_base)); - } - } - }; - - TestReplicatedDB() = default; - virtual ~TestReplicatedDB() = default; - - void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids, - cintrusive< repl_req_ctx >& ctx) override { - ASSERT_EQ(header.size(), sizeof(test_req::journal_header)); - - auto jheader = r_cast< test_req::journal_header const* >(header.cbytes()); - Key k{.id_ = *(r_cast< uint64_t const* >(key.cbytes()))}; - Value v{.lsn_ = lsn, - .data_size_ = jheader->data_size, - .data_pattern_ = jheader->data_pattern, - .blkid_ = blkids, - .id_ = k.id_}; - - LOGINFOMOD(replication, "[Replica={}] Received commit on lsn={} dsn={} key={} value[blkid={} pattern={}]", - g_helper->replica_num(), lsn, ctx->dsn(), k.id_, v.blkid_.to_string(), v.data_pattern_); - - { - std::unique_lock lk(db_mtx_); - inmem_db_.insert_or_assign(k, v); - lsn_index_.emplace(lsn, v); - last_data_committed_lsn = lsn; - ++commit_count_; - } - - if (ctx->is_proposer()) { g_helper->runner().next_task(); } - } - - bool on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, - cintrusive< repl_req_ctx >& ctx) override { - LOGINFOMOD(replication, "[Replica={}] Received pre-commit on lsn={} dsn={}", g_helper->replica_num(), lsn, - ctx->dsn()); - return true; - } - - void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, - cintrusive< repl_req_ctx >& ctx) override { - LOGINFOMOD(replication, "[Replica={}] Received rollback on lsn={}", g_helper->replica_num(), lsn); - } - - void on_restart() { - LOGINFOMOD(replication, "restarted repl dev for [Replica={}] Group={}", g_helper->replica_num(), - boost::uuids::to_string(repl_dev()->group_id())); - } - - void on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key, - cintrusive< repl_req_ctx >& ctx) override { - LOGINFOMOD(replication, "[Replica={}] Received error={} on key={}", g_helper->replica_num(), enum_name(error), - *(r_cast< uint64_t const* >(key.cbytes()))); - } - - AsyncReplResult<> create_snapshot(shared< snapshot_context > context) override { - std::lock_guard< std::mutex > lock(m_snapshot_lock); - auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); - LOGINFOMOD(replication, "[Replica={}] Got snapshot callback term={} idx={}", g_helper->replica_num(), - s->get_last_log_term(), s->get_last_log_idx()); - m_last_snapshot = context; - return make_async_success<>(); - } - - int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { - auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); - - if (snp_data->offset == 0) { - snp_data->is_last_obj = false; - snp_data->blob = sisl::io_blob_safe(sizeof(ulong)); - LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={}", - g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx()); - return 0; - } - - int64_t next_lsn = snp_data->offset; - std::vector< KeyValuePair > kv_snapshot_data; - // we can not use find to get the next element, since if the next lsn is a config lsn , it will not be put into - // lsn_index_ and as a result, the find will return the end of the map. so here we use lower_bound to get the - // first element to be read and transfered. - for (auto iter = lsn_index_.lower_bound(next_lsn); iter != lsn_index_.end(); iter++) { - auto& v = iter->second; - kv_snapshot_data.emplace_back(Key{v.id_}, v); - LOGTRACEMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} size={} pattern={}", - g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_); - if (kv_snapshot_data.size() >= 1000) { break; } - } - - if (kv_snapshot_data.size() == 0) { - snp_data->is_last_obj = true; - LOGINFOMOD(replication, "Snapshot is_last_obj is true"); - return 0; - } - - int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size(); - sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)}; - std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size); - snp_data->blob = std::move(blob); - snp_data->is_last_obj = false; - LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={} num_items={}", - g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), - kv_snapshot_data.size()); - - return 0; - } - - void snapshot_data_write(uint64_t data_size, uint64_t data_pattern, MultiBlkId& out_blkids) { - auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); - auto write_sgs = test_common::HSTestHelper::create_sgs(data_size, block_size, data_pattern); - auto fut = homestore::data_service().async_alloc_write(write_sgs, blk_alloc_hints{}, out_blkids); - std::move(fut).get(); - for (auto const& iov : write_sgs.iovs) { - iomanager.iobuf_free(uintptr_cast(iov.iov_base)); - } - } - - void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { - auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); - if (snp_data->offset == 0) { - snp_data->offset = last_data_committed_lsn + 1; - LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}", - g_helper->replica_num(), snp_data->offset); - return; - } - - size_t kv_snapshot_data_size = snp_data->blob.size(); - if (kv_snapshot_data_size == 0) return; - - size_t num_items = kv_snapshot_data_size / sizeof(KeyValuePair); - std::unique_lock lk(db_mtx_); - auto ptr = r_cast< const KeyValuePair* >(snp_data->blob.bytes()); - for (size_t i = 0; i < num_items; i++) { - auto key = ptr->key; - auto value = ptr->value; - LOGTRACEMOD(replication, "[Replica={}] Save logical snapshot got lsn={} data_size={} data_pattern={}", - g_helper->replica_num(), value.lsn_, value.data_size_, value.data_pattern_); - - // Write to data service and inmem map. - MultiBlkId out_blkids; - if (value.data_size_ != 0) { - snapshot_data_write(value.data_size_, value.data_pattern_, out_blkids); - value.blkid_ = out_blkids; - } - last_data_committed_lsn = value.lsn_; - inmem_db_.insert_or_assign(key, value); - ++commit_count_; - ptr++; - } - - LOGINFOMOD(replication, - "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={} num_items={}", - g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), - snp_data->is_last_obj, num_items); - - // before we finish install snapshot, raft_server()->get_committed_log_idx() will always be the same. so we need - // last_data_committed_lsn to notify leader to transfer new data to follower. - snp_data->offset = last_data_committed_lsn + 1; - } - - bool apply_snapshot(shared< snapshot_context > context) override { - std::lock_guard< std::mutex > lock(m_snapshot_lock); - auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); - LOGINFOMOD(replication, "[Replica={}] Apply snapshot term={} idx={}", g_helper->replica_num(), - s->get_last_log_term(), s->get_last_log_idx()); - m_last_snapshot = context; - return true; - } - - shared< snapshot_context > last_snapshot() override { - std::lock_guard< std::mutex > lock(m_snapshot_lock); - if (!m_last_snapshot) return nullptr; - - auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(m_last_snapshot)->nuraft_snapshot(); - LOGINFOMOD(replication, "[Replica={}] Last snapshot term={} idx={}", g_helper->replica_num(), - s->get_last_log_term(), s->get_last_log_idx()); - return m_last_snapshot; - } - - void free_user_snp_ctx(void*& user_snp_ctx) override {} - - ReplResult< blk_alloc_hints > get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) override { - return blk_alloc_hints{}; - } - - void on_destroy() override { - LOGINFOMOD(replication, "[Replica={}] Group={} is being destroyed", g_helper->replica_num(), - boost::uuids::to_string(repl_dev()->group_id())); - g_helper->unregister_listener(repl_dev()->group_id()); - } - - void db_write(uint64_t data_size, uint32_t max_size_per_iov) { - static std::atomic< uint32_t > s_uniq_num{0}; - auto req = intrusive< test_req >(new test_req()); - req->jheader.data_size = data_size; - req->jheader.data_pattern = ((long long)rand() << 32) | ++s_uniq_num; - auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); - - LOGINFOMOD(replication, "[Replica={}] Db write key={} data_size={} pattern={} block_size={}", - g_helper->replica_num(), req->key_id, data_size, req->jheader.data_pattern, block_size); - - if (data_size != 0) { - req->write_sgs = - test_common::HSTestHelper::create_sgs(data_size, max_size_per_iov, req->jheader.data_pattern); - } - - repl_dev()->async_alloc_write(req->header_blob(), req->key_blob(), req->write_sgs, req); - } - - void validate_db_data() { - g_helper->runner().set_num_tasks(inmem_db_.size()); - - LOGINFOMOD(replication, "[{}]: Total {} keys committed, validating them", - boost::uuids::to_string(repl_dev()->group_id()), inmem_db_.size()); - auto it = inmem_db_.begin(); - g_helper->runner().set_task([this, &it]() { - Key k; - Value v; - { - std::unique_lock lk(db_mtx_); - std::tie(k, v) = *it; - ++it; - } - - if (v.data_size_ != 0) { - auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); - auto read_sgs = test_common::HSTestHelper::create_sgs(v.data_size_, block_size); - - repl_dev()->async_read(v.blkid_, read_sgs, v.data_size_).thenValue([read_sgs, k, v](auto const ec) { - LOGINFOMOD(replication, "Validating key={} value[blkid={} pattern={}]", k.id_, v.blkid_.to_string(), - v.data_pattern_); - RELEASE_ASSERT(!ec, "Read of blkid={} for key={} error={}", v.blkid_.to_string(), k.id_, - ec.message()); - for (auto const& iov : read_sgs.iovs) { - test_common::HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, - v.data_pattern_); - iomanager.iobuf_free(uintptr_cast(iov.iov_base)); - } - g_helper->runner().next_task(); - }); - } else { - g_helper->runner().next_task(); - } - }); - g_helper->runner().execute().get(); - } +#include "test_common/raft_repl_test_base.hpp" - uint64_t db_commit_count() const { - std::shared_lock lk(db_mtx_); - return commit_count_; - } - - uint64_t db_size() const { - std::shared_lock lk(db_mtx_); - return inmem_db_.size(); - } - - void create_snapshot() { - auto raft_repl_dev = std::dynamic_pointer_cast< RaftReplDev >(repl_dev()); - ulong snapshot_idx = raft_repl_dev->raft_server()->create_snapshot(); - LOGINFO("Manually create snapshot got index {}", snapshot_idx); - } - - void truncate(int num_reserved_entries) { - auto raft_repl_dev = std::dynamic_pointer_cast< RaftReplDev >(repl_dev()); - raft_repl_dev->truncate(num_reserved_entries); - LOGINFO("Manually truncated"); - } - - void set_zombie() { zombie_ = true; } - bool is_zombie() { - // Wether a group is zombie(non recoverable) - return zombie_; - } - -private: - std::map< Key, Value > inmem_db_; - std::map< int64_t, Value > lsn_index_; - uint64_t commit_count_{0}; - // this is the last lsn for data, might not be the same with the real last committed lsn - // which should be get by raft_server()->get_committed_log_idx() - uint64_t last_data_committed_lsn{0}; - std::shared_mutex db_mtx_; - std::shared_ptr< snapshot_context > m_last_snapshot{nullptr}; - std::mutex m_snapshot_lock; - bool zombie_{false}; -}; - -class RaftReplDevTest : public testing::Test { -public: - void SetUp() override { - // By default it will create one db - for (uint32_t i{0}; i < SISL_OPTIONS["num_raft_groups"].as< uint32_t >(); ++i) { - auto db = std::make_shared< TestReplicatedDB >(); - g_helper->register_listener(db); - dbs_.emplace_back(std::move(db)); - } - } - - void TearDown() override { - for (auto const& db : dbs_) { - if (db->is_zombie()) { continue; } - run_on_leader(db, [this, db]() { - auto err = hs()->repl_service().remove_repl_dev(db->repl_dev()->group_id()).get(); - ASSERT_EQ(err, ReplServiceError::OK) << "Error in destroying the group"; - }); - } - - for (auto const& db : dbs_) { - if (db->is_zombie()) { continue; } - auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev()); - int i = 0; - bool force_leave = false; - do { - std::this_thread::sleep_for(std::chrono::seconds(1)); - auto& raft_repl_svc = dynamic_cast< RaftReplService& >(hs()->repl_service()); - raft_repl_svc.gc_repl_devs(); - LOGINFO("Waiting for repl dev to get destroyed"); - - // TODO: if leader is destroyed, but the follower does not receive the notification, it will not be - // destroyed for ever. we need handle this in raft_repl_dev. revisit here after making changes at - // raft_repl_dev side to hanle this case. this is a workaround to avoid the infinite loop for now. - if (i++ > 10 && !force_leave) { - LOGWARN("has already waited for repl dev to get destroyed for 10 times, so do a force leave"); - repl_dev->force_leave(); - force_leave = true; - } - - } while (!repl_dev->is_destroyed()); - } - } - - void generate_writes(uint64_t data_size, uint32_t max_size_per_iov, shared< TestReplicatedDB > db = nullptr) { - if (db == nullptr) { db = pick_one_db(); } - // LOGINFO("Writing on group_id={}", db->repl_dev()->group_id()); - db->db_write(data_size, max_size_per_iov); - } - - void wait_for_all_commits() { wait_for_commits(written_entries_); } - - void wait_for_commits(uint64_t exp_writes) { - uint64_t total_writes{0}; - while (true) { - total_writes = 0; - for (auto const& db : dbs_) { - total_writes += db->db_commit_count(); - } - - if (total_writes >= exp_writes) { break; } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - LOGINFO("Replica={} has received {} commits as expected", g_helper->replica_num(), total_writes); - } - - void validate_data() { - for (auto const& db : dbs_) { - db->validate_db_data(); - } - } - - shared< TestReplicatedDB > pick_one_db() { return dbs_[0]; } - - void assign_leader(uint16_t replica) { - LOGINFO("Switch the leader to replica_num = {}", replica); - if (g_helper->replica_num() == replica) { - for (auto const& db : dbs_) { - do { - auto result = db->repl_dev()->become_leader().get(); - if (result.hasError()) { - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - } else { - break; - } - } while (true); - } - } else { - for (auto const& db : dbs_) { - homestore::replica_id_t leader_uuid; - while (true) { - leader_uuid = db->repl_dev()->get_leader_id(); - if (!leader_uuid.is_nil() && (g_helper->member_id(leader_uuid) == replica)) { break; } - - LOGINFO("Waiting for replica={} to become leader", replica); - std::this_thread::sleep_for(std::chrono::milliseconds{500}); - } - } - } - } - - void run_on_leader(std::shared_ptr< TestReplicatedDB > db, auto&& lambda) { - do { - auto leader_uuid = db->repl_dev()->get_leader_id(); - - if (leader_uuid.is_nil()) { - LOGINFO("Waiting for leader to be elected for group={}", db->repl_dev()->group_id()); - std::this_thread::sleep_for(std::chrono::milliseconds{500}); - } else if (leader_uuid == g_helper->my_replica_id()) { - lambda(); - break; - } else { - break; - } - } while (true); - } - - void write_on_leader(uint32_t num_entries, bool wait_for_commit = true, shared< TestReplicatedDB > db = nullptr) { - do { - auto leader_uuid = dbs_[0]->repl_dev()->get_leader_id(); - - if (leader_uuid.is_nil()) { - LOGINFO("Waiting for leader to be elected"); - std::this_thread::sleep_for(std::chrono::milliseconds{500}); - } else if (leader_uuid == g_helper->my_replica_id()) { - LOGINFO("Writing {} entries since I am the leader my_uuid={}", num_entries, - boost::uuids::to_string(g_helper->my_replica_id())); - auto const block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); - g_helper->runner().set_num_tasks(num_entries); - - LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); - g_helper->runner().set_task([this, block_size, db]() { - static std::normal_distribution<> num_blks_gen{3.0, 2.0}; - this->generate_writes(std::abs(std::lround(num_blks_gen(g_re))) * block_size, block_size, db); - }); - if (wait_for_commit) { g_helper->runner().execute().get(); } - break; - } else { - LOGINFO("{} entries were written on the leader_uuid={} my_uuid={}", num_entries, - boost::uuids::to_string(leader_uuid), boost::uuids::to_string(g_helper->my_replica_id())); - break; - } - } while (true); - - written_entries_ += num_entries; - if (wait_for_commit) { this->wait_for_all_commits(); } - } - - void remove_db(std::shared_ptr< TestReplicatedDB > db, bool wait_for_removal) { - this->run_on_leader(db, [this, db]() { - auto err = hs()->repl_service().remove_repl_dev(db->repl_dev()->group_id()).get(); - ASSERT_EQ(err, ReplServiceError::OK) << "Error in destroying the group"; - }); - - // Remove the db from the dbs_ list and check if count matches with repl_device - for (auto it = dbs_.begin(); it != dbs_.end(); ++it) { - if (*it == db) { - dbs_.erase(it); - break; - } - } - - if (wait_for_removal) { wait_for_listener_destroy(dbs_.size()); } - } - - void wait_for_listener_destroy(uint64_t exp_listeners) { - while (true) { - auto total_listeners = g_helper->num_listeners(); - if (total_listeners == exp_listeners) { break; } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - } - - void restart_replica(uint16_t replica, uint32_t shutdown_delay_sec = 5u) { - if (g_helper->replica_num() == replica) { - LOGINFO("Restart homestore: replica_num = {}", replica); - g_helper->restart(shutdown_delay_sec); - // g_helper->sync_for_test_start(); - } else { - LOGINFO("Wait for replica={} to completely go down and removed from alive raft-groups", replica); - std::this_thread::sleep_for(std::chrono::seconds{5}); - } - } - - void shutdown_replica(uint16_t replica) { - if (g_helper->replica_num() == replica) { - LOGINFO("Shutdown homestore: replica_num = {}", replica); - g_helper->shutdown(); - } else { - LOGINFO("Wait for replica={} to completely go down and removed from alive raft-groups", replica); - std::this_thread::sleep_for(std::chrono::seconds{5}); - } - } - - void start_replica(uint16_t replica) { - if (g_helper->replica_num() == replica) { - LOGINFO("Start homestore: replica_num = {}", replica); - g_helper->start(); - } - } - - void create_snapshot() { dbs_[0]->create_snapshot(); } - void truncate(int num_reserved_entries) { dbs_[0]->truncate(num_reserved_entries); } - -protected: - std::vector< std::shared_ptr< TestReplicatedDB > > dbs_; - uint32_t written_entries_{0}; - -#ifdef _PRERELEASE - flip::FlipClient m_fc{iomgr_flip::instance()}; -#endif -}; +class RaftReplDevTest : public RaftReplDevTestBase {}; TEST_F(RaftReplDevTest, Write_Restart_Write) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); @@ -1015,7 +415,6 @@ int main(int argc, char* argv[]) { // Snapshot and truncation tests needs num reserved to be 0 and distance 10. s.consensus.num_reserved_log_items = 0; - s.consensus.snapshot_freq_distance = 10; s.resource_limits.resource_audit_timer_ms = 0; // only reset when user specified the value for test; @@ -1033,7 +432,8 @@ int main(int argc, char* argv[]) { FLAGS_folly_global_cpu_executor_threads = 4; g_helper = std::make_unique< test_common::HSReplTestHelper >("test_raft_repl_dev", args, orig_argv); - g_helper->setup(); + // No spare replica's are created. Test cases in this file expects fixed number of replica's. + g_helper->setup(SISL_OPTIONS["replicas"].as< uint32_t >()); auto ret = RUN_ALL_TESTS(); g_helper->teardown(); diff --git a/src/tests/test_raft_repl_dev_dynamic.cpp b/src/tests/test_raft_repl_dev_dynamic.cpp new file mode 100644 index 000000000..7bd69a13c --- /dev/null +++ b/src/tests/test_raft_repl_dev_dynamic.cpp @@ -0,0 +1,133 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#include "test_common/raft_repl_test_base.hpp" + +// Dynamic tests spawn spare replica's also which can be used to add and remove from a repl dev. +class ReplDevDynamicTest : public RaftReplDevTestBase {}; + +TEST_F(ReplDevDynamicTest, ReplaceMember) { + // Write some IO's, replace a member, validate all members data except which is out. + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + auto db = dbs_.back(); + auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >(); + auto num_members = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >(); + uint64_t num_io_entries = SISL_OPTIONS["num_io"].as< uint64_t >(); + + // Replace the last member in the group with index(num_replicas - 1) with a spare + // replica with index (num_replica). Member id's are 0,...,num_replicas-1, num_replicas,...,N + uint32_t member_out = num_replicas - 1; + uint32_t member_in = num_replicas; + + g_helper->sync_for_test_start(num_members); + if (g_helper->replica_num() < num_replicas) { + // With existing raft repl dev group, write IO's, validate and call replace_member on leader. + LOGINFO("Writing on leader num_io={} replica={}", num_io_entries, g_helper->replica_num()); + this->write_on_leader(num_io_entries, true /* wait_for_commit */); + + replace_member(db, g_helper->replica_id(member_out), g_helper->replica_id(member_in)); + std::this_thread::sleep_for(std::chrono::seconds(3)); + } else if (g_helper->replica_num() == member_in) { + LOGINFO("Wait for commits replica={}", g_helper->replica_num()); + wait_for_commits(num_io_entries); + } + + g_helper->sync_for_verify_start(num_members); + LOGINFO("sync_for_verify_state replica={} ", g_helper->replica_num()); + if (g_helper->replica_num() != member_out) { + // Skip the member which is going to be replaced. Validate data on all other replica's. + LOGINFO("Validate all data written so far by reading them replica={}", g_helper->replica_num()); + this->validate_data(); + } else { + // The out member will have the repl dev destroyed. + auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev()); + do { + std::this_thread::sleep_for(std::chrono::seconds(1)); + auto& raft_repl_svc = dynamic_cast< RaftReplService& >(hs()->repl_service()); + raft_repl_svc.gc_repl_devs(); + LOGINFO("Waiting for repl dev to get destroyed on out member replica={}", g_helper->replica_num()); + } while (!repl_dev->is_destroyed()); + LOGINFO("Repl dev destroyed on out member replica={}", g_helper->replica_num()); + } + + g_helper->sync_for_cleanup_start(num_members); + LOGINFO("ReplaceMember test done"); +} + +// TODO add more tests with leader and member restart, multiple member replace +// leader replace, commit quorum + +int main(int argc, char* argv[]) { + int parsed_argc = argc; + char** orig_argv = argv; + + // Save the args for replica use + std::vector< std::string > args; + for (int i = 0; i < argc; ++i) { + args.emplace_back(argv[i]); + } + + ::testing::InitGoogleTest(&parsed_argc, argv); + + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, config, test_raft_repl_dev, iomgr, test_common_setup, + test_repl_common_setup); + + // + // Entire test suite assumes that once a replica takes over as leader, it stays until it is explicitly yielded. + // Otherwise it is very hard to control or accurately test behavior. Hence we forcibly override the + // leadership_expiry time. + // + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { + s.consensus.leadership_expiry_ms = -1; // -1 means never expires; + s.generic.repl_dev_cleanup_interval_sec = 1; + + // Disable implicit flush and timer. + s.logstore.flush_threshold_size = 0; + s.logstore.flush_timer_frequency_us = 0; + + // Snapshot and truncation tests needs num reserved to be 0 and distance 10. + s.consensus.num_reserved_log_items = 0; + s.resource_limits.resource_audit_timer_ms = 0; + + // only reset when user specified the value for test; + if (SISL_OPTIONS.count("snapshot_distance")) { + s.consensus.snapshot_freq_distance = SISL_OPTIONS["snapshot_distance"].as< uint32_t >(); + } + if (SISL_OPTIONS.count("num_raft_logs_resv")) { + s.resource_limits.raft_logstore_reserve_threshold = SISL_OPTIONS["num_raft_logs_resv"].as< uint32_t >(); + } + if (SISL_OPTIONS.count("res_mgr_audit_timer_ms")) { + s.resource_limits.resource_audit_timer_ms = SISL_OPTIONS["res_mgr_audit_timer_ms"].as< uint32_t >(); + } + }); + HS_SETTINGS_FACTORY().save(); + + FLAGS_folly_global_cpu_executor_threads = 4; + g_helper = std::make_unique< test_common::HSReplTestHelper >("test_raft_repl_dev_dynamic", args, orig_argv); + + // We spawn spare replica's also for dynamic repl dev tests. + auto total_replicas = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >(); + g_helper->setup(total_replicas); + + auto ret = RUN_ALL_TESTS(); + g_helper->teardown(); + + std::string str; + sisl::ObjCounterRegistry::foreach ([&str](const std::string& name, int64_t created, int64_t alive) { + fmt::format_to(std::back_inserter(str), "{}: created={} alive={}\n", name, created, alive); + }); + LOGINFO("Object Life Counter\n:{}", str); + + return ret; +} diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index 9367d64f0..c26ba273d 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -136,6 +136,7 @@ class SoloReplDevTest : public testing::Test { cintrusive< repl_req_ctx >& ctx) override { LOGINFO("Received error={} on repl_dev", enum_name(error)); } + void replace_member(replica_id_t member_out, replica_id_t member_in) override {} void on_destroy() override {} };