From b452d311398518b9591fceb66c94d22648d140bd Mon Sep 17 00:00:00 2001 From: Xiaoxi Chen Date: Mon, 23 Sep 2024 00:03:32 +0800 Subject: [PATCH] Fix register_consumer Signed-off-by: Xiaoxi Chen --- conanfile.py | 2 +- .../replication/service/raft_repl_service.cpp | 41 ++++++++++--------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/conanfile.py b/conanfile.py index be8689f9b..524cd6a1d 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.61" + version = "6.4.62" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index 30f762fc8..bb9ebd0ee 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -433,44 +433,45 @@ void RaftReplService::flush_durable_commit_lsn() { } } - ///////////////////// RaftReplService CP Callbacks ///////////////////////////// -int ReplSvcCPContext::add_repl_dev_ctx(cshared dev, cshared dev_ctx) { +int ReplSvcCPContext::add_repl_dev_ctx(cshared< ReplDev > dev, cshared< ReplDevCPContext > dev_ctx) { m_cp_ctx_map.emplace(dev, dev_ctx); return 0; } -cshared ReplSvcCPContext::get_repl_dev_ctx(cshared dev) { +cshared< ReplDevCPContext > ReplSvcCPContext::get_repl_dev_ctx(cshared< ReplDev > dev) { if (m_cp_ctx_map.count(dev) == 0) { // it is possible if a repl dev added during the cp flush - return std::make_shared(); + return std::make_shared< ReplDevCPContext >(); } return m_cp_ctx_map[dev]; } std::unique_ptr< CPContext > RaftReplServiceCPHandler::on_switchover_cp(CP* cur_cp, CP* new_cp) { - // Add cp info from all devices to current cp. - // We dont need taking cp_guard as cp_mgr already taken it in do_trigger_cp_flush - auto cur_cp_ctx = s_cast< ReplSvcCPContext* >(cur_cp->context(cp_consumer_t::REPLICATION_SVC)); - repl_service().iterate_repl_devs([cur_cp, cur_cp_ctx](cshared< ReplDev >& repl_dev) { - // we need collecting the LSN of each repl dev and put it into current CP. - // There is no dirty buffers accumulated to new_cp yet, as the cp_mgr ensure replication_svc - // is the first one being called during cp switchover. - auto dev_ctx = std::static_pointer_cast< RaftReplDev >(repl_dev)->get_cp_ctx(cur_cp); - cur_cp_ctx->add_repl_dev_ctx(repl_dev, std::move(dev_ctx)); - }); - //create new ctx + // checking if cur_cp == nullptr as on_switchover_cp will be called when registering the cp handler + if (cur_cp != nullptr) { + // Add cp info from all devices to current cp. + // We dont need taking cp_guard as cp_mgr already taken it in do_trigger_cp_flush + auto cur_cp_ctx = s_cast< ReplSvcCPContext* >(cur_cp->context(cp_consumer_t::REPLICATION_SVC)); + repl_service().iterate_repl_devs([cur_cp, cur_cp_ctx](cshared< ReplDev >& repl_dev) { + // we need collecting the LSN of each repl dev and put it into current CP. + // There is no dirty buffers accumulated to new_cp yet, as the cp_mgr ensure replication_svc + // is the first one being called during cp switchover. + auto dev_ctx = std::static_pointer_cast< RaftReplDev >(repl_dev)->get_cp_ctx(cur_cp); + cur_cp_ctx->add_repl_dev_ctx(repl_dev, std::move(dev_ctx)); + }); + } + // create new ctx auto ctx = std::make_unique< ReplSvcCPContext >(new_cp); return ctx; } folly::Future< bool > RaftReplServiceCPHandler::cp_flush(CP* cp) { auto cp_ctx = s_cast< ReplSvcCPContext* >(cp->context(cp_consumer_t::REPLICATION_SVC)); - repl_service().iterate_repl_devs( - [cp, cp_ctx](cshared< ReplDev >& repl_dev) { - auto dev_ctx = cp_ctx->get_repl_dev_ctx(repl_dev); - std::static_pointer_cast< RaftReplDev >(repl_dev)->cp_flush(cp, dev_ctx); - }); + repl_service().iterate_repl_devs([cp, cp_ctx](cshared< ReplDev >& repl_dev) { + auto dev_ctx = cp_ctx->get_repl_dev_ctx(repl_dev); + std::static_pointer_cast< RaftReplDev >(repl_dev)->cp_flush(cp, dev_ctx); + }); return folly::makeFuture< bool >(true); }