Skip to content

Commit

Permalink
Fix register_consumer
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaoxi Chen <xiaoxchen@ebay.com>
  • Loading branch information
xiaoxichen committed Sep 22, 2024
1 parent e803760 commit b452d31
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 21 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.61"
version = "6.4.62"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
41 changes: 21 additions & 20 deletions src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,44 +433,45 @@ void RaftReplService::flush_durable_commit_lsn() {
}
}


///////////////////// RaftReplService CP Callbacks /////////////////////////////
int ReplSvcCPContext::add_repl_dev_ctx(cshared<ReplDev > dev, cshared<ReplDevCPContext> dev_ctx) {
int ReplSvcCPContext::add_repl_dev_ctx(cshared< ReplDev > dev, cshared< ReplDevCPContext > dev_ctx) {
m_cp_ctx_map.emplace(dev, dev_ctx);
return 0;
}

cshared<ReplDevCPContext> ReplSvcCPContext::get_repl_dev_ctx(cshared<ReplDev > dev) {
cshared< ReplDevCPContext > ReplSvcCPContext::get_repl_dev_ctx(cshared< ReplDev > dev) {
if (m_cp_ctx_map.count(dev) == 0) {
// it is possible if a repl dev added during the cp flush
return std::make_shared<ReplDevCPContext>();
return std::make_shared< ReplDevCPContext >();
}
return m_cp_ctx_map[dev];
}

std::unique_ptr< CPContext > RaftReplServiceCPHandler::on_switchover_cp(CP* cur_cp, CP* new_cp) {
// Add cp info from all devices to current cp.
// We dont need taking cp_guard as cp_mgr already taken it in do_trigger_cp_flush
auto cur_cp_ctx = s_cast< ReplSvcCPContext* >(cur_cp->context(cp_consumer_t::REPLICATION_SVC));
repl_service().iterate_repl_devs([cur_cp, cur_cp_ctx](cshared< ReplDev >& repl_dev) {
// we need collecting the LSN of each repl dev and put it into current CP.
// There is no dirty buffers accumulated to new_cp yet, as the cp_mgr ensure replication_svc
// is the first one being called during cp switchover.
auto dev_ctx = std::static_pointer_cast< RaftReplDev >(repl_dev)->get_cp_ctx(cur_cp);
cur_cp_ctx->add_repl_dev_ctx(repl_dev, std::move(dev_ctx));
});
//create new ctx
// checking if cur_cp == nullptr as on_switchover_cp will be called when registering the cp handler
if (cur_cp != nullptr) {
// Add cp info from all devices to current cp.
// We dont need taking cp_guard as cp_mgr already taken it in do_trigger_cp_flush
auto cur_cp_ctx = s_cast< ReplSvcCPContext* >(cur_cp->context(cp_consumer_t::REPLICATION_SVC));
repl_service().iterate_repl_devs([cur_cp, cur_cp_ctx](cshared< ReplDev >& repl_dev) {
// we need collecting the LSN of each repl dev and put it into current CP.
// There is no dirty buffers accumulated to new_cp yet, as the cp_mgr ensure replication_svc
// is the first one being called during cp switchover.
auto dev_ctx = std::static_pointer_cast< RaftReplDev >(repl_dev)->get_cp_ctx(cur_cp);
cur_cp_ctx->add_repl_dev_ctx(repl_dev, std::move(dev_ctx));
});
}
// create new ctx
auto ctx = std::make_unique< ReplSvcCPContext >(new_cp);
return ctx;
}

folly::Future< bool > RaftReplServiceCPHandler::cp_flush(CP* cp) {
auto cp_ctx = s_cast< ReplSvcCPContext* >(cp->context(cp_consumer_t::REPLICATION_SVC));
repl_service().iterate_repl_devs(
[cp, cp_ctx](cshared< ReplDev >& repl_dev) {
auto dev_ctx = cp_ctx->get_repl_dev_ctx(repl_dev);
std::static_pointer_cast< RaftReplDev >(repl_dev)->cp_flush(cp, dev_ctx);
});
repl_service().iterate_repl_devs([cp, cp_ctx](cshared< ReplDev >& repl_dev) {
auto dev_ctx = cp_ctx->get_repl_dev_ctx(repl_dev);
std::static_pointer_cast< RaftReplDev >(repl_dev)->cp_flush(cp, dev_ctx);
});
return folly::makeFuture< bool >(true);
}

Expand Down

0 comments on commit b452d31

Please sign in to comment.