Skip to content

Commit

Permalink
fix repl dev ut
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Aug 28, 2024
1 parent 17521c5 commit a42a684
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 23 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.52"
version = "6.4.53"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
5 changes: 4 additions & 1 deletion src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ReplDev;
class ReplDevListener;
struct repl_req_ctx;
using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >;
using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >;
using repl_req_ptr_t = boost::intrusive_ptr< repl_req_ctx >;

VENUM(repl_req_state_t, uint32_t,
Expand Down Expand Up @@ -56,7 +57,9 @@ struct repl_key {
};

bool operator==(repl_key const& other) const = default;
std::string to_string() const { return fmt::format("server={}, term={}, dsn={}", server_id, term, dsn); }
std::string to_string() const {
return fmt::format("server={}, term={}, dsn={}, hash={}", server_id, term, dsn, Hasher()(*this));
}
};

using repl_snapshot = nuraft::snapshot;
Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/log_store/home_raft_log_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
namespace homestore {

using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >;
using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >;

class HomeRaftLogStore : public nuraft::log_store {
public:
Expand Down
3 changes: 2 additions & 1 deletion src/lib/replication/repl_dev/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ uint8_t* repl_req_ctx::raw_journal_buf() { return std::get< std::unique_ptr< uin

void repl_req_ctx::set_lsn(int64_t lsn) {
DEBUG_ASSERT((m_lsn == -1) || (m_lsn == lsn),
"Changing lsn for request={} on the fly can cause race condition, not expected", to_string());
"Changing lsn for request={} on the fly can cause race condition, not expected. lsn {}, m_lsn {}",
to_string(), lsn, m_lsn);
m_lsn = lsn;
LOGTRACEMOD(replication, "Setting lsn={} for request={}", lsn, to_string());
}
Expand Down
8 changes: 7 additions & 1 deletion src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,13 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; }
if (entry->get_buf_ptr()->size() == 0) { continue; }
auto req = m_state_machine->localize_journal_entry_prepare(*entry);
if (req == nullptr) {
// TODO :: we need to indentify whether this log entry should be appended to log store.
// 1 for lsn, if the req#lsn is not -1, it means this log has been localized and apeneded before, we
// should skip it.
// 2 for dsn, if the req#dsn is less than the next_dsn, it means this log has been
// committed, we should skip it.
// here, we only check the first condition for now. revisit here if we need to check the second
if (req == nullptr || req->lsn() != -1) {
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
return {true, nuraft::cb_func::ReturnCode::ReturnNull};
}
Expand Down
9 changes: 9 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct raft_repl_dev_superblk : public repl_dev_superblk {
#pragma pack()

using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >;
using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >;

ENUM(repl_dev_stage_t, uint8_t, INIT, ACTIVE, DESTROYING, DESTROYED, PERMANENT_DESTROYED);

Expand Down Expand Up @@ -229,6 +230,14 @@ class RaftReplDev : public ReplDev,
*/
void on_restart();

/**
* \brief This method is called to force leave the group without waiting for committing the destroy message.
* it is used when the repl_dev is a stale member of a destroyed group. this stable member does not receive the
* destroy message. but the group is already destroyed, so no leader will send this message again to this stale
* member. we need to force leave the group to avoid the stale member to be a part of the group.
*/
void force_leave() { leave(); }

protected:
//////////////// All nuraft::state_mgr overrides ///////////////////////
nuraft::ptr< nuraft::cluster_config > load_config() override;
Expand Down
5 changes: 5 additions & 0 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params
return m_success_ptr;
}

void RaftStateMachine::commit_config(const ulong log_idx, raft_cluster_config_ptr_t& new_conf) {
RD_LOGD("Raft channel: Commit new cluster conf , log_idx = {}", log_idx);
// TODO:add more logic here if necessary
}

void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb) {
for (auto [key, rreq] : m_lsn_req_map) {
cb(key, rreq);
Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/repl_dev/raft_state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class RaftStateMachine : public nuraft::state_machine {
uint64_t last_commit_index() override;
raft_buf_ptr_t pre_commit_ext(const nuraft::state_machine::ext_op_params& params) override;
raft_buf_ptr_t commit_ext(const nuraft::state_machine::ext_op_params& params) override;
void commit_config(const ulong log_idx, raft_cluster_config_ptr_t& new_conf) override;
void rollback(uint64_t lsn, nuraft::buffer&) override { LOGCRITICAL("Unimplemented rollback on: [{}]", lsn); }
void become_ready();

Expand Down
3 changes: 2 additions & 1 deletion src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ void RaftReplService::start_reaper_thread() {
m_reaper_fiber = iomanager.iofiber_self();

// Schedule the rdev garbage collector timer
LOGINFOMOD(replication, "Reaper Thread: scheduling GC every {} seconds", HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec));
LOGINFOMOD(replication, "Reaper Thread: scheduling GC every {} seconds",
HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec));
m_rdev_gc_timer_hdl = iomanager.schedule_thread_timer(
HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec) * 1000 * 1000 * 1000, true /* recurring */,
nullptr, [this](void*) {
Expand Down
36 changes: 18 additions & 18 deletions src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ class TestReplicatedDB : public homestore::ReplDevListener {
std::unique_lock lk(db_mtx_);
inmem_db_.insert_or_assign(k, v);
lsn_index_.emplace(lsn, v);
last_committed_lsn = lsn;
++commit_count_;
}

Expand Down Expand Up @@ -231,13 +230,11 @@ class TestReplicatedDB : public homestore::ReplDevListener {

void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override {
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot();

auto raft_server = std::dynamic_pointer_cast< RaftReplDev >(repl_dev())->raft_server();
if (snp_data->offset == 0) {
// For obj_id 0 we sent back the last committed lsn.
snp_data->offset = last_committed_lsn;
LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(),
snp_data->is_last_obj);
snp_data->offset = raft_server->get_committed_log_idx();
LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}",
g_helper->replica_num(), snp_data->offset);
return;
}

Expand All @@ -260,7 +257,6 @@ class TestReplicatedDB : public homestore::ReplDevListener {
value.blkid_ = out_blkids;
}
inmem_db_.insert_or_assign(key, value);
last_committed_lsn = value.lsn_;
++commit_count_;
ptr++;
}
Expand All @@ -269,7 +265,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
"[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={} num_items={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(),
snp_data->is_last_obj, num_items);
snp_data->offset = last_committed_lsn + 1;
snp_data->offset = raft_server->get_committed_log_idx() + 1;
}

bool apply_snapshot(shared< snapshot_context > context) override {
Expand Down Expand Up @@ -392,7 +388,6 @@ class TestReplicatedDB : public homestore::ReplDevListener {
std::map< int64_t, Value > lsn_index_;
uint64_t commit_count_{0};
std::shared_mutex db_mtx_;
uint64_t last_committed_lsn{0};
std::shared_ptr< snapshot_context > m_last_snapshot{nullptr};
std::mutex m_snapshot_lock;
bool zombie_{false};
Expand Down Expand Up @@ -421,11 +416,23 @@ class RaftReplDevTest : public testing::Test {
for (auto const& db : dbs_) {
if (db->is_zombie()) { continue; }
auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev());
int i = 0;
bool force_leave = false;
do {
std::this_thread::sleep_for(std::chrono::seconds(1));
auto& raft_repl_svc = dynamic_cast< RaftReplService& >(hs()->repl_service());
raft_repl_svc.gc_repl_devs();
LOGINFO("Waiting for repl dev to get destroyed");

// TODO: if leader is destroyed, but the follower does not receive the notification, it will not be
// destroyed for ever. we need handle this in raft_repl_dev. revisit here after making changes at
// raft_repl_dev side to hanle this case. this is a workaround to avoid the infinite loop for now.
if (i++ > 10 && !force_leave) {
LOGWARN("Waiting for repl dev to get destroyed and it is leader, so do a force leave");
repl_dev->force_leave();
force_leave = true;
}

} while (!repl_dev->is_destroyed());
}
}
Expand Down Expand Up @@ -919,16 +926,9 @@ TEST_F(RaftReplDevTest, BaselineTest) {
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();

#ifdef _PRERELEASE
// If debug build we set flip to force truncate.
if (g_helper->replica_num() == 0) {
LOGINFO("Set force home logstore truncate");
g_helper->set_basic_flip("force_home_raft_log_truncate");
}
#endif

// Write some entries on leader.
uint64_t entries_per_attempt = 50;

LOGINFO("Write on leader num_entries={}", entries_per_attempt);
this->write_on_leader(entries_per_attempt, true /* wait_for_commit */);

Expand Down

0 comments on commit a42a684

Please sign in to comment.