Skip to content

Commit

Permalink
Deduping Raft heartbeat messages to same peer sent within a single me…
Browse files Browse the repository at this point in the history
…ssage processing cycle
  • Loading branch information
dzmitry-huba committed Sep 13, 2023
1 parent 6051e31 commit 379b77b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
19 changes: 17 additions & 2 deletions trusted/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ use core::cell::RefCell;
use core::cell::RefMut;
use core::cmp;
use core::mem;
use hashbrown::HashMap;
use platform::{Application, Host, MessageEnvelope, PalError};
use prost::Message;
use raft::SoftState;
use raft::{
eraftpb::ConfChangeType as RaftConfigChangeType, eraftpb::ConfState as RaftConfState,
eraftpb::Entry as RaftEntry, eraftpb::EntryType as RaftEntryType, eraftpb::HardState,
eraftpb::Message as RaftMessage, eraftpb::Snapshot as RaftSnapshot, storage::MemStorage,
Config as RaftConfig, Error as RaftError, RawNode, StateRole as RaftStateRole,
eraftpb::Message as RaftMessage, eraftpb::MessageType as RaftMessageType,
eraftpb::Snapshot as RaftSnapshot, storage::MemStorage, Config as RaftConfig,
Error as RaftError, RawNode, StateRole as RaftStateRole,
};
use slog::{debug, error, info, o, warn, Logger};
type RaftNode = RawNode<MemStorage>;
Expand Down Expand Up @@ -174,6 +176,7 @@ pub struct Driver {
driver_config: DriverConfig,
driver_state: DriverState,
messages: Vec<EnvelopeOut>,
heatbeat_messages: HashMap<u64, RaftMessage>,
raft_node_id: u64,
instant: u64,
tick_instant: u64,
Expand All @@ -193,6 +196,7 @@ impl Driver {
driver_config,
driver_state: DriverState::Created,
messages: Vec::new(),
heatbeat_messages: HashMap::new(),
raft_node_id: 0,
instant: 0,
tick_instant: 0,
Expand Down Expand Up @@ -420,6 +424,16 @@ impl Driver {

fn send_raft_messages(&mut self, raft_messages: Vec<RaftMessage>) -> Result<(), PalError> {
for raft_message in raft_messages {
// Try to dedup numerous heatbeat messages to the same recipient.
if raft_message.msg_type == RaftMessageType::MsgHeartbeat.into() {
if self.heatbeat_messages.contains_key(&raft_message.to)
&& self.heatbeat_messages[&raft_message.to] == raft_message
{
continue;
}
self.heatbeat_messages
.insert(raft_message.to, raft_message.clone());
}
// Buffer message to be sent out.
self.stash_message(envelope_out::Msg::DeliverMessage(DeliverMessage {
recipient_node_id: raft_message.to,
Expand Down Expand Up @@ -754,6 +768,7 @@ impl Driver {

self.stash_log_entries();

self.heatbeat_messages.clear();
// Take messages to be sent out.
Ok(mem::take(&mut self.messages))
}
Expand Down
1 change: 1 addition & 0 deletions trusted/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
extern crate alloc;
#[cfg(feature = "std")]
extern crate core;
extern crate hashbrown;
extern crate prost;
extern crate raft;
extern crate slog;
Expand Down

0 comments on commit 379b77b

Please sign in to comment.