Skip to content

Commit

Permalink
Change replace member api signature.
Browse files Browse the repository at this point in the history
Add replica member info with name, priority and id.
Use replica member info for replace member api and listener callbacks.
  • Loading branch information
sanebay committed Oct 23, 2024
1 parent 634047c commit 6954fdd
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 77 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.67"
version = "6.5.1"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
7 changes: 7 additions & 0 deletions src/include/homestore/replication/repl_decls.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ struct peer_info {
uint64_t last_succ_resp_us_;
};

struct replica_member_info {
static constexpr uint64_t max_name_len = 128;
replica_id_t id;
char name[max_name_len];
int32_t priority{0};
};

} // namespace homestore

// hash function definitions
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class ReplDevListener {
virtual void on_destroy() = 0;

/// @brief Called when replace member is performed.
virtual void replace_member(replica_id_t member_out, replica_id_t member_in) = 0;
virtual void on_replace_member(const replica_member_info& member_out, const replica_member_info& member_in) = 0;

/// @brief Called when the snapshot is being created by nuraft
virtual AsyncReplResult<> create_snapshot(shared< snapshot_context > context) = 0;
Expand Down
3 changes: 2 additions & 1 deletion src/include/homestore/replication_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class ReplicationService {
/// @return A Future which gets called after schedule to release (before garbage collection is kicked in)
virtual folly::SemiFuture< ReplServiceError > remove_repl_dev(group_id_t group_id) = 0;

virtual AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in,
virtual AsyncReplResult<> replace_member(group_id_t group_id, const replica_member_info& member_out,
const replica_member_info& member_in,
uint32_t commit_quorum = 0) const = 0;

/// @brief Get the repl dev for a given group id if it is already created or opened
Expand Down
81 changes: 41 additions & 40 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk
m_rd_sb->free_blks_journal_id = m_free_blks_journal->get_store_id();
}
m_rd_sb.write();
bind_data_service();
bind_data_service();
}

RD_LOG(INFO,
Expand All @@ -90,27 +90,30 @@ bool RaftReplDev::bind_data_service() {
RD_LOG(INFO, "Starting data channel, group_id={}, replica_id={}", group_id_str(), my_replica_id_str());
bool success = false;
#ifdef _PRERELEASE
success = m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, [this](intrusive< sisl::GenericRpcData >& rpc_data) {
if (iomgr_flip::instance()->delay_flip("slow_down_data_channel", [this, rpc_data]() mutable {
RD_LOGI("Resuming after slow down data channel flip");
success =
m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, [this](intrusive< sisl::GenericRpcData >& rpc_data) {
if (iomgr_flip::instance()->delay_flip("slow_down_data_channel", [this, rpc_data]() mutable {
RD_LOGI("Resuming after slow down data channel flip");
on_push_data_received(rpc_data);
})) {
RD_LOGI("Slow down data channel flip is enabled, scheduling to call later");
} else {
on_push_data_received(rpc_data);
})) {
RD_LOGI("Slow down data channel flip is enabled, scheduling to call later");
} else {
on_push_data_received(rpc_data);
}
});
}
});
#else
success = m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, bind_this(RaftReplDev::on_push_data_received, 1));
success =
m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, bind_this(RaftReplDev::on_push_data_received, 1));
#endif
if (!success) {
RD_LOGE("Failed to bind data service request for PUSH_DATA");
return false;
return false;
}
success = m_msg_mgr.bind_data_service_request(FETCH_DATA, m_group_id, bind_this(RaftReplDev::on_fetch_data_received, 1));
success =
m_msg_mgr.bind_data_service_request(FETCH_DATA, m_group_id, bind_this(RaftReplDev::on_fetch_data_received, 1));
if (!success) {
RD_LOGE("Failed to bind data service request for FETCH_DATA");
return false;
return false;
}
return true;
}
Expand All @@ -127,18 +130,18 @@ bool RaftReplDev::join_group() {
return true;
}

AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, replica_id_t member_in_uuid,
uint32_t commit_quorum) {
AsyncReplResult<> RaftReplDev::replace_member(const replica_member_info& member_out,
const replica_member_info& member_in, uint32_t commit_quorum) {
LOGINFO("Replace member group_id={} member_out={} member_in={}", group_id_str(),
boost::uuids::to_string(member_out_uuid), boost::uuids::to_string(member_in_uuid));
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));

if (commit_quorum >= 1) {
// Two members are down and leader cant form the quorum. Reduce the quorum size.
reset_quorum_size(commit_quorum);
}

// Step 1: Check if leader itself is requested to move out.
if (m_my_repl_id == member_out_uuid && m_my_repl_id == get_leader_id()) {
if (m_my_repl_id == member_out.id && m_my_repl_id == get_leader_id()) {
// If leader is the member requested to move out, then give up leadership and return error.
// Client will retry replace_member request to the new leader.
raft_server()->yield_leadership(true /* immediate */, -1 /* successor */);
Expand All @@ -148,9 +151,9 @@ AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, repl
}

// Step 2. Add the new member.
return m_msg_mgr.add_member(m_group_id, member_in_uuid)
return m_msg_mgr.add_member(m_group_id, member_in.id)
.via(&folly::InlineExecutor::instance())
.thenValue([this, member_in_uuid, member_out_uuid, commit_quorum](auto&& e) -> AsyncReplResult<> {
.thenValue([this, member_in, member_out, commit_quorum](auto&& e) -> AsyncReplResult<> {
// TODO Currently we ignore the cancelled, fix nuraft_mesg to not timeout
// when adding member. Member is added to cluster config until member syncs fully
// with atleast stop gap. This will take a lot of time for block or
Expand All @@ -168,18 +171,17 @@ AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, repl
return make_async_error<>(RaftReplService::to_repl_error(e.error()));
}
}
auto member_out = boost::uuids::to_string(member_out_uuid);
auto member_in = boost::uuids::to_string(member_in_uuid);

RD_LOGI("Replace member added member={} to group_id={}", member_in, group_id_str());
RD_LOGI("Replace member added member={} to group_id={}", boost::uuids::to_string(member_in.id),
group_id_str());

// Step 3. Append log entry to mark the old member is out and new member is added.
auto rreq = repl_req_ptr_t(new repl_req_ctx{});
replace_members_ctx members;
std::copy(member_in_uuid.begin(), member_in_uuid.end(), members.in_replica_id.begin());
std::copy(member_out_uuid.begin(), member_out_uuid.end(), members.out_replica_id.begin());
sisl::blob header(r_cast< uint8_t* >(&members),
members.in_replica_id.size() + members.out_replica_id.size());
members.replica_out = member_out;
members.replica_in = member_in;

sisl::blob header(r_cast< uint8_t* >(&members), sizeof(replace_members_ctx));
rreq->init(
repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)},
journal_type_t::HS_CTRL_REPLACE, true, header, sisl::blob{}, 0);
Expand All @@ -196,7 +198,7 @@ AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, repl
// Step 4. Remove the old member. Even if the old member is temporarily
// down and recovers, nuraft mesg see member remove from cluster log
// entry and call exit_group() and leave().
return m_msg_mgr.rem_member(m_group_id, member_out_uuid)
return m_msg_mgr.rem_member(m_group_id, member_out.id)
.via(&folly::InlineExecutor::instance())
.thenValue([this, member_out, commit_quorum](auto&& e) -> AsyncReplResult<> {
if (e.hasError()) {
Expand All @@ -212,7 +214,8 @@ AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, repl
return make_async_error<>(ReplServiceError::RETRY_REQUEST);
}
} else {
RD_LOGI("Replace member removed member={} from group_id={}", member_out, group_id_str());
RD_LOGI("Replace member removed member={} from group_id={}",
boost::uuids::to_string(member_out.id), group_id_str());
}

// Revert the quorum size back to 0.
Expand Down Expand Up @@ -957,13 +960,11 @@ void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err)

void RaftReplDev::replace_member(repl_req_ptr_t rreq) {
auto members = r_cast< const replace_members_ctx* >(rreq->header().cbytes());
replica_id_t member_in, member_out;
std::copy(members->out_replica_id.begin(), members->out_replica_id.end(), member_out.begin());
std::copy(members->in_replica_id.begin(), members->in_replica_id.end(), member_in.begin());
RD_LOGI("Raft repl replace_member member_out={} member_in={}", boost::uuids::to_string(member_out),
boost::uuids::to_string(member_in));

m_listener->replace_member(member_out, member_in);
RD_LOGI("Raft repl replace_member commit member_out={} member_in={}",
boost::uuids::to_string(members->replica_out.id), boost::uuids::to_string(members->replica_in.id));

m_listener->on_replace_member(members->replica_out, members->replica_in);
}

static bool blob_equals(sisl::blob const& a, sisl::blob const& b) {
Expand Down Expand Up @@ -1224,7 +1225,7 @@ void RaftReplDev::flush_durable_commit_lsn() {
}

/////////////////////////////////// Private metohds ////////////////////////////////////
void RaftReplDev::cp_flush(CP* cp, cshared<ReplDevCPContext> ctx) {
void RaftReplDev::cp_flush(CP* cp, cshared< ReplDevCPContext > ctx) {
auto const lsn = ctx->cp_lsn;
auto const clsn = ctx->compacted_to_lsn;
auto const dsn = ctx->last_applied_dsn;
Expand All @@ -1247,14 +1248,14 @@ void RaftReplDev::cp_flush(CP* cp, cshared<ReplDevCPContext> ctx) {
cp->to_string());
}

cshared<ReplDevCPContext> RaftReplDev::get_cp_ctx(CP* cp) {
cshared< ReplDevCPContext > RaftReplDev::get_cp_ctx(CP* cp) {
auto const cp_lsn = m_commit_upto_lsn.load();
auto const clsn = m_compact_lsn.load();
auto const dsn = m_next_dsn.load();

RD_LOGD("getting cp_ctx for raft repl dev {}, cp_lsn={}, clsn={}, next_dsn={}, cp string:{}",
(void *)this, cp_lsn, clsn, dsn, cp->to_string());
auto dev_ctx = std::make_shared<ReplDevCPContext>();
RD_LOGD("getting cp_ctx for raft repl dev {}, cp_lsn={}, clsn={}, next_dsn={}, cp string:{}", (void*)this, cp_lsn,
clsn, dsn, cp->to_string());
auto dev_ctx = std::make_shared< ReplDevCPContext >();
dev_ctx->cp_lsn = cp_lsn;
dev_ctx->compacted_to_lsn = clsn;
dev_ctx->last_applied_dsn = dsn;
Expand Down
11 changes: 6 additions & 5 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >;
ENUM(repl_dev_stage_t, uint8_t, INIT, ACTIVE, DESTROYING, DESTROYED, PERMANENT_DESTROYED);

struct replace_members_ctx {
std::array< uint8_t, 16 > out_replica_id;
std::array< uint8_t, 16 > in_replica_id;
replica_member_info replica_out;
replica_member_info replica_in;
};

class RaftReplDevMetrics : public sisl::MetricsGroup {
Expand Down Expand Up @@ -162,7 +162,8 @@ class RaftReplDev : public ReplDev,

bool bind_data_service();
bool join_group();
AsyncReplResult<> replace_member(replica_id_t member_out, replica_id_t member_in, uint32_t commit_quorum);
AsyncReplResult<> replace_member(const replica_member_info& member_out, const replica_member_info& member_in,
uint32_t commit_quorum);
folly::SemiFuture< ReplServiceError > destroy_group();

//////////////// All ReplDev overrides/implementation ///////////////////////
Expand Down Expand Up @@ -199,8 +200,8 @@ class RaftReplDev : public ReplDev,
sisl::blob const& key, uint32_t data_size, bool is_data_channel);
folly::Future< folly::Unit > notify_after_data_written(std::vector< repl_req_ptr_t >* rreqs);
void check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreqs);
void cp_flush(CP* cp, cshared<ReplDevCPContext> ctx);
cshared<ReplDevCPContext> get_cp_ctx(CP* cp);
void cp_flush(CP* cp, cshared< ReplDevCPContext > ctx);
cshared< ReplDevCPContext > get_cp_ctx(CP* cp);
void cp_cleanup(CP* cp);
void become_ready();

Expand Down
4 changes: 2 additions & 2 deletions src/lib/replication/service/generic_repl_svc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ void SoloReplService::load_repl_dev(sisl::byte_view const& buf, void* meta_cooki
}
}

AsyncReplResult<> SoloReplService::replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in,
uint32_t commit_quorum) const {
AsyncReplResult<> SoloReplService::replace_member(group_id_t group_id, const replica_member_info& member_out,
const replica_member_info& member_in, uint32_t commit_quorum) const {
return make_async_error<>(ReplServiceError::NOT_IMPLEMENTED);
}

Expand Down
4 changes: 2 additions & 2 deletions src/lib/replication/service/generic_repl_svc.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ class SoloReplService : public GenericReplService {
std::set< replica_id_t > const& members) override;
folly::SemiFuture< ReplServiceError > remove_repl_dev(group_id_t group_id) override;
void load_repl_dev(sisl::byte_view const& buf, void* meta_cookie) override;
AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in,
uint32_t commit_quorum = 0) const override;
AsyncReplResult<> replace_member(group_id_t group_id, const replica_member_info& member_out,
const replica_member_info& member_in, uint32_t commit_quorum = 0) const override;
};

class SoloReplServiceCPHandler : public CPCallbacks {
Expand Down
25 changes: 12 additions & 13 deletions src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,11 @@ void RaftReplService::start() {
LOGINFO("Starting RaftReplService with server_uuid={} port={}", boost::uuids::to_string(params.server_uuid_),
params.mesg_port_);

//check if ssl cert files are provided, if yes, monitor the changes
// check if ssl cert files are provided, if yes, monitor the changes
if (!params.ssl_key_.empty() && !params.ssl_cert_.empty()) {
ioenvironment.with_file_watcher();
monitor_cert_changes();
}


// Step 2: Register all RAFT parameters. At the end of this step, raft is ready to be created/join group
auto r_params = nuraft::raft_params()
Expand Down Expand Up @@ -158,7 +157,7 @@ void RaftReplService::start() {
auto rdev = std::dynamic_pointer_cast< RaftReplDev >(it->second);
rdev->wait_for_logstore_ready();
if (!rdev->join_group()) {
HS_REL_ASSERT(false, "FAILED TO JOIN GROUP, PANIC HERE");
HS_REL_ASSERT(false, "FAILED TO JOIN GROUP, PANIC HERE");
it = m_rd_map.erase(it);
} else {
++it;
Expand Down Expand Up @@ -191,19 +190,19 @@ void RaftReplService::monitor_cert_changes() {
restart_svc.detach();
};

//monitor ssl cert file
// monitor ssl cert file
if (!fw->register_listener(ioenvironment.get_ssl_cert(), "hs_ssl_cert_watcher", cert_change_cb)) {
LOGERROR("Failed to register listner, {} to watch file {}, Not monitoring cert files",
"hs_ssl_cert_watcher", ioenvironment.get_ssl_cert());
LOGERROR("Failed to register listner, {} to watch file {}, Not monitoring cert files", "hs_ssl_cert_watcher",
ioenvironment.get_ssl_cert());
}
//monitor ssl key file
// monitor ssl key file
if (!fw->register_listener(ioenvironment.get_ssl_key(), "hs_ssl_key_watcher", cert_change_cb)) {
LOGERROR("Failed to register listner, {} to watch file {}, Not monitoring cert files",
"hs_ssl_key_watcher", ioenvironment.get_ssl_key());
LOGERROR("Failed to register listner, {} to watch file {}, Not monitoring cert files", "hs_ssl_key_watcher",
ioenvironment.get_ssl_key());
}
}

void RaftReplService::restart_raft_svc(const std::string filepath, const bool deleted){
void RaftReplService::restart_raft_svc(const std::string filepath, const bool deleted) {
if (deleted && !wait_for_cert(filepath)) {
LOGINFO("file {} deleted, ", filepath)
// wait for the deleted file to be added again
Expand All @@ -215,7 +214,7 @@ void RaftReplService::restart_raft_svc(const std::string filepath, const bool de
}

bool RaftReplService::wait_for_cert(const std::string& filepath) {
auto attempts = cert_change_timeout/cert_check_sleep;
auto attempts = cert_change_timeout / cert_check_sleep;
for (auto i = attempts; i > 0; --i) {
if (std::filesystem::exists(filepath)) { return true; }
std::this_thread::sleep_for(cert_check_sleep);
Expand Down Expand Up @@ -394,8 +393,8 @@ void RaftReplService::load_repl_dev(sisl::byte_view const& buf, void* meta_cooki
add_repl_dev(group_id, rdev);
}

AsyncReplResult<> RaftReplService::replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in,
uint32_t commit_quorum) const {
AsyncReplResult<> RaftReplService::replace_member(group_id_t group_id, const replica_member_info& member_out,
const replica_member_info& member_in, uint32_t commit_quorum) const {
auto rdev_result = get_repl_dev(group_id);
if (!rdev_result) { return make_async_error<>(ReplServiceError::SERVER_NOT_FOUND); }

Expand Down
15 changes: 8 additions & 7 deletions src/lib/replication/service/raft_repl_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class RaftReplService : public GenericReplService,
iomgr::timer_handle_t m_flush_durable_commit_timer_hdl;
iomgr::io_fiber_t m_reaper_fiber;
std::mutex raft_restart_mutex;

public:
RaftReplService(cshared< ReplApplication >& repl_app);

Expand All @@ -73,8 +73,8 @@ class RaftReplService : public GenericReplService,
std::set< replica_id_t > const& members) override;
folly::SemiFuture< ReplServiceError > remove_repl_dev(group_id_t group_id) override;
void load_repl_dev(sisl::byte_view const& buf, void* meta_cookie) override;
AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in,
uint32_t commit_quorum = 0) const override;
AsyncReplResult<> replace_member(group_id_t group_id, const replica_member_info& member_out,
const replica_member_info& member_in, uint32_t commit_quorum = 0) const override;

private:
RaftReplDev* raft_group_config_found(sisl::byte_view const& buf, void* meta_cookie);
Expand All @@ -98,12 +98,13 @@ struct ReplDevCPContext;

class ReplSvcCPContext : public CPContext {
std::shared_mutex m_cp_map_mtx;
std::map< ReplDev*, cshared<ReplDevCPContext> > m_cp_ctx_map;
std::map< ReplDev*, cshared< ReplDevCPContext > > m_cp_ctx_map;

public:
ReplSvcCPContext(CP* cp) : CPContext(cp){};
ReplSvcCPContext(CP* cp) : CPContext(cp) {};
virtual ~ReplSvcCPContext() = default;
int add_repl_dev_ctx(ReplDev* dev, cshared<ReplDevCPContext> dev_ctx);
cshared<ReplDevCPContext> get_repl_dev_ctx(ReplDev* dev);
int add_repl_dev_ctx(ReplDev* dev, cshared< ReplDevCPContext > dev_ctx);
cshared< ReplDevCPContext > get_repl_dev_ctx(ReplDev* dev);
};

class RaftReplServiceCPHandler : public CPCallbacks {
Expand Down
Loading

0 comments on commit 6954fdd

Please sign in to comment.