Skip to content

Commit

Permalink
Adding more test coverage for the Raft driver
Browse files Browse the repository at this point in the history
  • Loading branch information
dzmitry-huba committed Sep 25, 2023
1 parent 96d645c commit badf55d
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 13 deletions.
14 changes: 12 additions & 2 deletions trusted/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,15 @@ pub trait Raft {
#[derive(Default)]
pub struct RaftSimple<S: Store + RaftStorage> {
raft_node: Option<Box<RaftNode<S>>>,
committed_voters: Vec<u64>,
}

impl<S: Store + RaftStorage> RaftSimple<S> {
pub fn new() -> RaftSimple<S> {
RaftSimple { raft_node: None }
RaftSimple {
raft_node: None,
committed_voters: Vec::new(),
}
}

fn mut_raft_node(&mut self) -> &mut RaftNode<S> {
Expand Down Expand Up @@ -156,6 +160,7 @@ impl<S: Store + RaftStorage> Raft for RaftSimple<S> {
state.leader_node_id = raft_soft_state.leader_id;
state.leader_term = raft_hard_state.term;
state.has_pending_change = self.raft_node().raft.has_pending_conf();
state.committed_cluster_config = self.committed_voters.clone();
}

state
Expand All @@ -181,6 +186,7 @@ impl<S: Store + RaftStorage> Raft for RaftSimple<S> {
);

store.apply_snapshot(snapshot)?;
self.committed_voters = vec![node_id];
}

self.raft_node = Some(Box::new(RawNode::new(&config, store, logger)?));
Expand Down Expand Up @@ -212,7 +218,11 @@ impl<S: Store + RaftStorage> Raft for RaftSimple<S> {
&mut self,
config_change: &RaftConfigChange,
) -> Result<RaftConfigState, RaftError> {
self.mut_raft_node().apply_conf_change(config_change)
let config_state = self.mut_raft_node().apply_conf_change(config_change);
if let Ok(config_state) = &config_state {
self.committed_voters = config_state.voters.clone();
}
config_state
}

fn has_ready(&self) -> bool {
Expand Down
180 changes: 169 additions & 11 deletions trusted/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ impl<R: Raft<S = S>, S: Store + RaftStorage, A: Actor> Driver<R, S, A> {
})?;

self.tick_instant = self.instant;
// No need to initially report the state of the cluster, only after the changes.
self.prev_raft_state = self.raft.state();

Ok(())
}
Expand Down Expand Up @@ -522,18 +524,15 @@ impl<R: Raft<S = S>, S: Store + RaftStorage, A: Actor> Driver<R, S, A> {
}

fn reset_leader_state(&mut self) {
self.raft_state = RaftState::new();
self.prev_raft_state = RaftState::new();
}

fn collect_config_state(&mut self, config_state: RaftConfigState) {
self.raft_state.committed_cluster_config = config_state.voters.clone();
self.raft_progress.config_state = config_state;
}

fn stash_leader_state(&mut self) {
let mut raft_state = self.raft.state();
raft_state.committed_cluster_config = self.raft_state.committed_cluster_config.clone();
self.raft_state = raft_state;
self.raft_state = self.raft.state();

if self.prev_raft_state == self.raft_state {
return;
Expand Down Expand Up @@ -829,6 +828,7 @@ mod test {
use super::*;
use mock::{MockActor, MockAttestation, MockHost, MockRaft, MockStore};
use model::ActorError;
use raft::{eraftpb::ConfChange as RaftConfigChange, Ready as RaftReady};

fn create_actor_config() -> Vec<u8> {
Vec::new()
Expand Down Expand Up @@ -892,15 +892,58 @@ mod test {
envelope_out::Msg::ExecuteProposal(ExecuteProposalResponse { result_contents })
}

fn create_change_cluster_request(
node_id: u64,
change_type: ChangeClusterType,
) -> MessageEnvelope {
let envelope = EnvelopeIn {
msg: Some(envelope_in::Msg::ChangeCluster(ChangeClusterRequest {
change_id: 1,
node_id,
change_type: change_type.into(),
})),
};
envelope.encode_to_vec()
}

fn create_change_cluster_response(change_status: ChangeClusterStatus) -> envelope_out::Msg {
envelope_out::Msg::ChangeCluster(ChangeClusterResponse {
change_id: 1,
change_status: change_status.into(),
})
}

fn create_check_cluster_request() -> MessageEnvelope {
let envelope = EnvelopeIn {
msg: Some(envelope_in::Msg::CheckCluster(CheckClusterRequest {})),
};
envelope.encode_to_vec()
}

fn create_check_cluster_response(raft_state: &RaftState) -> envelope_out::Msg {
envelope_out::Msg::CheckCluster(CheckClusterResponse {
leader_node_id: raft_state.leader_node_id,
leader_term: raft_state.leader_term,
cluster_node_ids: raft_state.committed_cluster_config.clone(),
has_pending_changes: raft_state.has_pending_change,
})
}

fn create_send_messages_matcher(
expected: Vec<envelope_out::Msg>,
) -> impl Fn(&[MessageEnvelope]) -> bool {
move |envelopes: &[MessageEnvelope]| {
let actual: Vec<envelope_out::Msg> = envelopes
.iter()
.map(|m| EnvelopeOut::decode(m.as_ref()).unwrap().msg.unwrap())
.filter(|m| {
if let envelope_out::Msg::Log(_) = m {
return false;
};
true
})
.collect();
expected.iter().all(|e| actual.contains(e))
expected.iter().all(|e| actual.contains(e)) && expected.len() == actual.len()
}
}

Expand Down Expand Up @@ -984,9 +1027,9 @@ mod test {

fn expect_init(
mut self,
init_handler: impl Fn(u64, bool, MockStore, &Logger) -> Result<(), RaftError> + 'static,
handler: impl Fn(u64, bool, MockStore, &Logger) -> Result<(), RaftError> + 'static,
) -> RaftBuilder {
self.mock_raft.expect_init().return_once_st(init_handler);
self.mock_raft.expect_init().return_once_st(handler);
self.mock_raft.expect_initialized().returning(|| true);

self
Expand All @@ -1011,6 +1054,24 @@ mod test {
self
}

fn expect_make_config_change_proposal(
mut self,
config_change: RaftConfigChange,
handler: impl Fn(RaftConfigChange) -> Result<(), RaftError> + 'static,
) -> RaftBuilder {
self.mock_raft
.expect_make_config_change_proposal()
.with(eq(config_change))
.returning_st(handler);

self
}

fn _expect_ready(mut self, ready: RaftReady) -> RaftBuilder {
self.mock_raft.expect_get_ready().return_once(move || ready);
self
}

fn take(&mut self) -> (MockStore, MockRaft<MockStore>) {
(
mem::take(&mut self.mock_store),
Expand Down Expand Up @@ -1087,7 +1148,7 @@ mod test {
}

#[test]
fn test_driver_start_node() {
fn test_driver_start_node_request() {
let (node_id, instant, driver_config) = create_default_parameters();

let mut mock_host = MockHostBuilder::new()
Expand Down Expand Up @@ -1121,7 +1182,7 @@ mod test {
}

#[test]
fn test_driver_stop_node() {
fn test_driver_stop_node_request() {
let (node_id, instant, driver_config) = create_default_parameters();

let mut mock_host = MockHostBuilder::new()
Expand Down Expand Up @@ -1162,7 +1223,7 @@ mod test {
}

#[test]
fn test_driver_execute_proposal() {
fn test_driver_execute_proposal_request() {
let (node_id, instant, driver_config) = create_default_parameters();
let proposal_contents = vec![1, 2, 3];

Expand Down Expand Up @@ -1246,4 +1307,101 @@ mod test {
)
);
}

#[test]
fn test_driver_change_cluster_request() {
let (node_id, instant, driver_config) = create_default_parameters();
let peer_id = 2;

let mut mock_host = MockHostBuilder::new()
.expect_public_signing_key(vec![])
.expect_send_messages(vec![create_start_node_response(node_id)])
.expect_send_messages(vec![create_change_cluster_response(
ChangeClusterStatus::ChangeStatusPending,
)])
.take();

let raft_builder = RaftBuilder::new()
.expect_leader(false)
.expect_init(|_, _, _, _| Ok(()))
.expect_has_ready(false)
.expect_should_snapshot(false)
.expect_state(&create_default_raft_state(node_id))
.expect_make_config_change_proposal(
create_raft_config_change(peer_id, RaftConfigChangeType::AddNode),
|_| Ok(()),
);

let mut driver = DriverBuilder::new(driver_config.clone())
.expect_on_init(|_| Ok(()))
.take(raft_builder);

assert_eq!(
Ok(()),
driver.receive_message(
&mut mock_host,
instant,
Some(create_start_node_request(true, node_id)),
)
);

assert_eq!(
Ok(()),
driver.receive_message(
&mut mock_host,
instant + 10,
Some(create_change_cluster_request(
peer_id,
ChangeClusterType::ChangeTypeAddNode
)),
)
);
}

#[test]
fn test_driver_check_cluster_request() {
let (node_id, instant, driver_config) = create_default_parameters();
let peer_id = 2;

let raft_state = create_default_raft_state(node_id);

let mut mock_host = MockHostBuilder::new()
.expect_public_signing_key(vec![])
.expect_send_messages(vec![create_start_node_response(node_id)])
.expect_send_messages(vec![create_check_cluster_response(&raft_state)])
.take();

let raft_builder = RaftBuilder::new()
.expect_leader(false)
.expect_init(|_, _, _, _| Ok(()))
.expect_has_ready(false)
.expect_should_snapshot(false)
.expect_state(&raft_state)
.expect_make_config_change_proposal(
create_raft_config_change(peer_id, RaftConfigChangeType::AddNode),
|_| Ok(()),
);

let mut driver = DriverBuilder::new(driver_config.clone())
.expect_on_init(|_| Ok(()))
.take(raft_builder);

assert_eq!(
Ok(()),
driver.receive_message(
&mut mock_host,
instant,
Some(create_start_node_request(true, node_id)),
)
);

assert_eq!(
Ok(()),
driver.receive_message(
&mut mock_host,
instant + 10,
Some(create_check_cluster_request()),
)
);
}
}

0 comments on commit badf55d

Please sign in to comment.