Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add replace member changes. Perist membership details. #216

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.4"
version = "2.1.5"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
14 changes: 11 additions & 3 deletions src/include/homeobject/pg_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <boost/uuid/uuid_io.hpp>
#include <sisl/utility/enum.hpp>
#include <sisl/logging/logging.h>

#include "common.hpp"

Expand All @@ -14,9 +15,15 @@ ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, NOT_LEADE
CRC_MISMATCH, NO_SPACE_LEFT, DRIVE_WRITE_ERROR, RETRY_REQUEST);

struct PGMember {
// Max length is based on homestore::replica_member_info::max_name_len - 1. Last byte is null terminated.
static constexpr uint64_t max_name_len = 127;
explicit PGMember(peer_id_t _id) : id(_id) {}
PGMember(peer_id_t _id, std::string const& _name) : id(_id), name(_name) {}
PGMember(peer_id_t _id, std::string const& _name, int32_t _priority) : id(_id), name(_name), priority(_priority) {}
PGMember(peer_id_t _id, std::string const& _name) : id(_id), name(_name) {
RELEASE_ASSERT(name.size() <= max_name_len, "Name exceeds max length");
}
PGMember(peer_id_t _id, std::string const& _name, int32_t _priority) : id(_id), name(_name), priority(_priority) {
RELEASE_ASSERT(name.size() <= max_name_len, "Name exceeds max length");
}
peer_id_t id;
std::string name;
int32_t priority{0}; // <0 (Arbiter), ==0 (Follower), >0 (F|Leader)
Expand Down Expand Up @@ -78,7 +85,8 @@ struct PGStats {
class PGManager : public Manager< PGError > {
public:
virtual NullAsyncResult create_pg(PGInfo&& pg_info) = 0;
virtual NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member, u_int32_t commit_quorum) = 0;
virtual NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
u_int32_t commit_quorum = 0) = 0;

/**
* Retrieves the statistics for a specific PG (Placement Group) identified by its ID.
Expand Down
6 changes: 3 additions & 3 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class HomeObjectImpl : public HomeObject,

virtual PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) = 0;
virtual PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) = 0;
PGMember const& new_member, uint32_t commit_quorum) = 0;
virtual bool _get_stats(pg_id_t id, PGStats& stats) const = 0;
virtual void _get_pg_ids(std::vector< pg_id_t >& pg_ids) const = 0;

Expand Down Expand Up @@ -144,8 +144,8 @@ class HomeObjectImpl : public HomeObject,

/// PgManager
PGManager::NullAsyncResult create_pg(PGInfo&& pg_info) final;
PGManager::NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member, u_int32_t commit_quorum) final;
PGManager::NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
u_int32_t commit_quorum) final;
// see api comments in base class;
bool get_stats(pg_id_t id, PGStats& stats) const final;
void get_pg_ids(std::vector< pg_id_t >& pg_ids) const final;
Expand Down
9 changes: 5 additions & 4 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis

// Write to index table with key {shard id, blob id } and value {pba}.
auto const [exist_already, status] = add_to_index_table(index_table, blob_info);
LOGTRACEMOD(blobmgr, "blob put commit blob_id: {}, lsn:{}, exist_already:{}, status:{}, pbas: {}", blob_id, lsn,
exist_already, status, pbas.to_string());
LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, lsn:{}, exist_already:{}, status:{}, pbas: {}",
msg_header->shard_id, blob_id, lsn, exist_already, status, pbas.to_string());
if (!exist_already) {
if (status != homestore::btree_status_t::success) {
LOGE("Failed to insert into index table for blob {} err {}", lsn, enum_name(status));
Expand Down Expand Up @@ -254,7 +254,7 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard,

auto r = get_blob_from_index_table(index_table, shard.id, blob_id);
if (!r) {
BLOGW(shard.id, blob_id, "Blob not found in index");
BLOGE(shard.id, blob_id, "Blob not found in index during get blob");
return folly::makeUnexpected(r.error());
}

Expand Down Expand Up @@ -342,7 +342,8 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(msg_header->shard_id);
if (shard_iter == _shard_map.end()) {
LOGW("Received a blob_put on an unknown shard, underlying engine will retry this later");
LOGW("Received a blob_put on an unknown shard:{}, underlying engine will retry this later",
msg_header->shard_id);
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

Expand Down
14 changes: 12 additions & 2 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class HSHomeObject : public HomeObjectImpl {
BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) override;

PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) override;
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) override;
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
uint32_t commit_quorum) override;

bool _get_stats(pg_id_t id, PGStats& stats) const override;
void _get_pg_ids(std::vector< pg_id_t >& pg_ids) const override;
Expand Down Expand Up @@ -378,6 +378,16 @@ class HSHomeObject : public HomeObjectImpl {
void on_create_pg_message_commit(int64_t lsn, sisl::blob const& header, shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx);

/**
* @brief Function invoked when a member is replaced by a new member
*
* @param group_id The group id of replication device.
* @param member_out Member which is removed from group
* @param member_in Member which is added to group
* */
void on_pg_replace_member(homestore::group_id_t group_id, const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in);

/**
* @brief Callback function invoked when a message is committed on a shard.
*
Expand Down
69 changes: 66 additions & 3 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,72 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he
if (ctx) ctx->promise_.setValue(folly::Unit());
}

PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) {
return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP));
PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t pg_id, peer_id_t const& old_member_id,
PGMember const& new_member, uint32_t commit_quorum) {

group_id_t group_id;
{
auto lg = std::shared_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) return folly::makeUnexpected(PGError::UNKNOWN_PG);
auto& repl_dev = pg_repl_dev(*iter->second);

if (!repl_dev.is_leader() && commit_quorum == 0) {
// Only leader can replace a member
return folly::makeUnexpected(PGError::NOT_LEADER);
}
group_id = repl_dev.group_id();
}

LOGI("PG replace member initated member_out={} member_in={}", boost::uuids::to_string(old_member_id),
boost::uuids::to_string(new_member.id));

homestore::replica_member_info in_replica, out_replica;
out_replica.id = old_member_id;
in_replica.id = new_member.id;
in_replica.priority = new_member.priority;
std::strncpy(in_replica.name, new_member.name.data(), new_member.name.size());
in_replica.name[new_member.name.size()] = '\0';

return hs_repl_service()
.replace_member(group_id, out_replica, in_replica, commit_quorum)
.via(executor_)
.thenValue([this](auto&& v) mutable -> PGManager::NullAsyncResult {
if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); }
return folly::Unit();
});
}

void HSHomeObject::on_pg_replace_member(homestore::group_id_t group_id, const replica_member_info& member_out,
const replica_member_info& member_in) {
auto lg = std::shared_lock(_pg_lock);
for (const auto& iter : _pg_map) {
sanebay marked this conversation as resolved.
Show resolved Hide resolved
auto& pg = iter.second;
if (pg_repl_dev(*pg).group_id() == group_id) {
// Remove the old member and add the new member
auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get());
pg->pg_info_.members.erase(PGMember(member_out.id));
pg->pg_info_.members.emplace(PGMember(member_in.id, member_in.name, member_in.priority));

uint32_t i{0};
for (auto const& m : pg->pg_info_.members) {
hs_pg->pg_sb_->members[i].id = m.id;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HomeObject struct pg_members is identical to homestore::replica_member_info as of now.

As homestore::replica_member_info is the only information that follower can get regarding a PG, I doubt there is a possibility HomeObject::pg_members can diverse from homestore::replica_member_info

Thinking about consolidate them together, not blocking, more for discussion

std::strncpy(hs_pg->pg_sb_->members[i].name, m.name.c_str(),
std::min(m.name.size(), pg_members::max_name_len));
hs_pg->pg_sb_->members[i].priority = m.priority;
++i;
}

// Update the latest membership info to pg superblk.
hs_pg->pg_sb_.write();
LOGI("PG replace member done member_out={} member_in={}", boost::uuids::to_string(member_out.id),
boost::uuids::to_string(member_in.id));
return;
}
}

LOGE("PG replace member failed member_out={} member_in={}", boost::uuids::to_string(member_out.id),
boost::uuids::to_string(member_in.id));
}

void HSHomeObject::add_pg_to_map(unique< HS_PG > hs_pg) {
Expand Down
4 changes: 3 additions & 1 deletion src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
}

if (!shard_exist) {
add_new_shard_to_map(std::make_unique< HS_Shard >(std::move(shard_info), blkids.chunk_num()));
add_new_shard_to_map(std::make_unique< HS_Shard >(shard_info, blkids.chunk_num()));
// select_specific_chunk() will do something only when we are relaying journal after restart, during the
// runtime flow chunk is already been be mark busy when we write the shard info to the repldev.
chunk_selector_->select_specific_chunk(blkids.chunk_num());
Expand All @@ -344,6 +344,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
hs_pg->durable_entities_update([&blkids](auto& de) {
de.total_occupied_blk_count.fetch_add(blkids.blk_count(), std::memory_order_relaxed);
});
LOGI("Commit done for CREATE_SHARD_MSG for shard {}", shard_info.id);

break;
}
Expand All @@ -368,6 +369,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
} else
LOGW("try to commit SEAL_SHARD_MSG but shard state is not sealed, shard_id: {}", shard_info.id);
if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }
LOGI("Commit done for SEAL_SHARD_MSG for shard {}", shard_info.id);
break;
}
default:
Expand Down
5 changes: 5 additions & 0 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t
return homestore::blk_alloc_hints();
}

void ReplicationStateMachine::on_replace_member(const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in) {
home_object_->on_pg_replace_member(repl_dev()->group_id(), member_out, member_in);
}

void ReplicationStateMachine::on_destroy() {
// TODO:: add the logic to handle destroy
LOGI("replica destroyed");
Expand Down
5 changes: 2 additions & 3 deletions src/lib/homestore_backend/replication_state_machine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,8 @@ class ReplicationStateMachine : public homestore::ReplDevListener {
uint32_t data_size) override;

/// @brief Called when replication module is replacing an existing member with a new member
void replace_member(homestore::replica_id_t member_out, homestore::replica_id_t member_in) override {
// TODO
}
void on_replace_member(const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in) override;

/// @brief Called when the replica is being destroyed by nuraft;
void on_destroy() override;
Expand Down
4 changes: 2 additions & 2 deletions src/lib/memory_backend/mem_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class MemoryHomeObject : public HomeObjectImpl {

// PGManager
PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) override;
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) override;
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
uint32_t commit_quorum) override;

bool _get_stats(pg_id_t id, PGStats& stats) const override;
void _get_pg_ids(std::vector< pg_id_t >& pg_ids) const override;
Expand Down
10 changes: 8 additions & 2 deletions src/lib/memory_backend/mem_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ PGManager::NullAsyncResult MemoryHomeObject::_create_pg(PGInfo&& pg_info, std::s
}

PGManager::NullAsyncResult MemoryHomeObject::_replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) {
PGMember const& new_member, uint32_t commit_quorum) {
auto lg = std::shared_lock(_pg_lock);
auto it = _pg_map.find(id);
if (_pg_map.end() == it) {
return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNKNOWN_PG));
}
return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP));
}

Expand All @@ -25,7 +30,8 @@ bool MemoryHomeObject::_get_stats(pg_id_t id, PGStats& stats) const {
stats.open_shards =
std::count_if(pg->shards_.begin(), pg->shards_.end(), [](auto const& s) { return s->is_open(); });
for (auto const& m : pg->pg_info_.members) {
stats.members.emplace_back(std::make_tuple(m.id, m.name, 0 /* last commit lsn */, 0 /* last succ response us */));
stats.members.emplace_back(
std::make_tuple(m.id, m.name, 0 /* last commit lsn */, 0 /* last succ response us */));
}

return true;
Expand Down
12 changes: 4 additions & 8 deletions src/lib/pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,15 @@ PGManager::NullAsyncResult HomeObjectImpl::create_pg(PGInfo&& pg_info) {
}

PGManager::NullAsyncResult HomeObjectImpl::replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member, u_int32_t commit_quorum) {
LOGI("[pg={}] replace member [{}] with [{}]", id, to_string(old_member), to_string(new_member.id));
PGMember const& new_member, uint32_t commit_quorum) {
LOGI("[pg={}] replace member [{}] with [{}] quorum [{}]", id, to_string(old_member), to_string(new_member.id),
commit_quorum);
if (old_member == new_member.id) {
LOGW("rejecting identical replacement SvcId [{}]!", to_string(old_member));
return folly::makeUnexpected(PGError::INVALID_ARG);
}

if (old_member == our_uuid()) {
LOGW("refusing to remove ourself {}!", to_string(old_member));
return folly::makeUnexpected(PGError::INVALID_ARG);
}

return _replace_member(id, old_member, new_member);
return _replace_member(id, old_member, new_member, commit_quorum);
}

bool HomeObjectImpl::get_stats(pg_id_t id, PGStats& stats) const { return _get_stats(id, stats); }
Expand Down
1 change: 1 addition & 0 deletions src/lib/shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ folly::Future< ShardManager::Result< ShardInfo > > HomeObjectImpl::_get_shard(sh
return _defer().thenValue([this, id](auto) -> ShardManager::Result< ShardInfo > {
auto lg = std::shared_lock(_shard_lock);
if (auto it = _shard_map.find(id); _shard_map.end() != it) return (*it->second)->info;
LOGE("Couldnt find shard id in shard map {}", id);
return folly::makeUnexpected(ShardError::UNKNOWN_SHARD);
});
}
Expand Down
17 changes: 6 additions & 11 deletions src/lib/tests/PGManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,17 @@ TEST_F(TestFixture, Migrate) {
PGMember{boost::uuids::random_generator()()}, 0)
.get()
.error(),
PGError::UNSUPPORTED_OP);
EXPECT_EQ(
homeobj_->pg_manager()
->replace_member(_pg_id, boost::uuids::random_generator()(), PGMember{boost::uuids::random_generator()()}, 0)
.get()
.error(),
PGError::UNSUPPORTED_OP);
EXPECT_EQ(homeobj_->pg_manager()->replace_member(_pg_id, _peer1, PGMember{_peer1}, 0).get().error(),
PGError::INVALID_ARG);
EXPECT_EQ(homeobj_->pg_manager()->replace_member(_pg_id, _peer1, PGMember{_peer2}, 0).get().error(),
PGError::UNKNOWN_PG);
EXPECT_EQ(homeobj_->pg_manager()->replace_member(_pg_id, _peer1, PGMember{_peer1}).get().error(),
PGError::INVALID_ARG);
// TODO enable after HO test framework is enabled
#if 0
EXPECT_EQ(homeobj_->pg_manager()
->replace_member(_pg_id, _peer1, PGMember{boost::uuids::random_generator()()}, 0)
.get()
.error(),
PGError::INVALID_ARG);
EXPECT_FALSE(
homeobj_->pg_manager()->replace_member(_pg_id, _peer2, PGMember{boost::uuids::random_generator()()}, 0).get());
homeobj_->pg_manager()->replace_member(_pg_id, _peer2, PGMember{boost::uuids::random_generator()()}).get());
#endif
}
Loading