From 57e8d68fe4ee1d52545cc6c3a30ee033cf853a1f Mon Sep 17 00:00:00 2001 From: Adrian Alic Date: Fri, 25 Oct 2024 13:06:42 +0000 Subject: [PATCH] . --- .../src/bin/consensus_pool_util.rs | 10 +-- rs/artifact_pool/src/certification_pool.rs | 47 +++++++------- rs/consensus/src/certification/certifier.rs | 2 +- rs/recovery/src/steps.rs | 63 ++++++++----------- rs/replay/src/player.rs | 6 +- 5 files changed, 52 insertions(+), 76 deletions(-) diff --git a/rs/artifact_pool/src/bin/consensus_pool_util.rs b/rs/artifact_pool/src/bin/consensus_pool_util.rs index ef6c9938458..9719bd8ea87 100644 --- a/rs/artifact_pool/src/bin/consensus_pool_util.rs +++ b/rs/artifact_pool/src/bin/consensus_pool_util.rs @@ -211,17 +211,13 @@ fn export(path: &str, matches: &clap::ArgMatches) { } } "Certification" => { - for x in certification_pool - .persistent_pool - .certifications() - .get_all() - { + for x in certification_pool.validated.certifications().get_all() { println!("{}", to_string(&CertificationMessage::Certification(x))); } } "CertificationShare" => { for x in certification_pool - .persistent_pool + .validated .certification_shares() .get_all() { @@ -250,7 +246,7 @@ fn import(path: &str) { }); consensus_pool.validated.mutate(ops); } else if let Ok(msg) = from_str(&s) { - certification_pool.persistent_pool.insert(msg) + certification_pool.validated.insert(msg) } else { panic!("Failed to parse JSON: {}", s); } diff --git a/rs/artifact_pool/src/certification_pool.rs b/rs/artifact_pool/src/certification_pool.rs index 1fe8f56c2ab..e38188da129 100644 --- a/rs/artifact_pool/src/certification_pool.rs +++ b/rs/artifact_pool/src/certification_pool.rs @@ -36,7 +36,7 @@ pub struct CertificationPoolImpl { unvalidated_cert_index: HeightIndex, unvalidated: BTreeMap, - pub persistent_pool: Box, + pub validated: Box, unvalidated_pool_metrics: PoolMetrics, validated_pool_metrics: PoolMetrics, @@ -82,7 +82,7 @@ impl CertificationPoolImpl { unvalidated_share_index: HeightIndex::default(), unvalidated_cert_index: HeightIndex::default(), unvalidated: BTreeMap::default(), - persistent_pool, + validated: persistent_pool, invalidated_artifacts: metrics_registry.int_counter( "certification_invalidated_artifacts", "The number of invalidated certification artifacts", @@ -102,12 +102,12 @@ impl CertificationPoolImpl { } fn validated_certifications(&self) -> Box + '_> { - self.persistent_pool.certifications().get_all() + self.validated.certifications().get_all() } fn insert_validated_certification(&self, certification: Certification) { if let Some(existing_certification) = self - .persistent_pool + .validated .certifications() .get_by_height(certification.height) .next() @@ -116,7 +116,7 @@ impl CertificationPoolImpl { panic!("Certifications are not expected to be added more than once per height."); } } else { - self.persistent_pool + self.validated .insert(CertificationMessage::Certification(certification)) } } @@ -147,11 +147,11 @@ impl CertificationPoolImpl { self.validated_pool_metrics .pool_artifacts .with_label_values(&[CERTIFICATION_ARTIFACT_TYPE]) - .set(self.persistent_pool.certifications().size() as i64); + .set(self.validated.certifications().size() as i64); self.validated_pool_metrics .pool_artifacts .with_label_values(&[CERTIFICATION_SHARE_ARTIFACT_TYPE]) - .set(self.persistent_pool.certification_shares().size() as i64); + .set(self.validated.certification_shares().size() as i64); // Unvalidated artifacts metrics self.unvalidated_pool_metrics @@ -216,7 +216,7 @@ impl MutablePool for CertificationPoolImpl { .received_artifact_bytes .with_label_values(&[msg.label()]) .observe(std::mem::size_of_val(&msg) as f64); - self.persistent_pool.insert(msg); + self.validated.insert(msg); } ChangeAction::MoveToValidated(msg) => { @@ -237,7 +237,7 @@ impl MutablePool for CertificationPoolImpl { match msg { CertificationMessage::CertificationShare(share) => { - self.persistent_pool + self.validated .insert(CertificationMessage::CertificationShare(share)); } CertificationMessage::Certification(cert) => { @@ -253,7 +253,7 @@ impl MutablePool for CertificationPoolImpl { ChangeAction::RemoveAllBelow(height) => { self.remove_all_unvalidated_below(height); transmits.extend( - self.persistent_pool + self.validated .purge_below(height) .drain(..) .map(ArtifactTransmit::Abort), @@ -299,23 +299,18 @@ pub trait MutablePoolSection { impl CertificationPool for CertificationPoolImpl { fn certification_at_height(&self, height: Height) -> Option { - self.persistent_pool - .certifications() - .get_by_height(height) - .next() + self.validated.certifications().get_by_height(height).next() } fn shares_at_height( &self, height: Height, ) -> Box + '_> { - self.persistent_pool - .certification_shares() - .get_by_height(height) + self.validated.certification_shares().get_by_height(height) } fn validated_shares(&self) -> Box + '_> { - self.persistent_pool.certification_shares().get_all() + self.validated.certification_shares().get_all() } fn unvalidated_shares_at_height( @@ -394,8 +389,8 @@ impl ValidatedPoolReader for CertificationPoolImpl { } fn get_all_for_broadcast(&self) -> Box + '_> { - let certification_range = self.persistent_pool.certifications().height_range(); - let share_range = self.persistent_pool.certification_shares().height_range(); + let certification_range = self.validated.certifications().height_range(); + let share_range = self.validated.certification_shares().height_range(); let ranges = [certification_range.as_ref(), share_range.as_ref()] .into_iter() @@ -408,11 +403,11 @@ impl ValidatedPoolReader for CertificationPoolImpl { // For all heights above the minimum, return the validated certification of the subnet, // or the share signed by this node if we don't have the aggregate. let iterator = (min.get()..=max.get()).map(Height::from).flat_map(|h| { - let mut certifications = self.persistent_pool.certifications().get_by_height(h); + let mut certifications = self.validated.certifications().get_by_height(h); if let Some(certification) = certifications.next() { vec![CertificationMessage::Certification(certification)] } else { - self.persistent_pool + self.validated .certification_shares() .get_by_height(h) .filter(|share| share.signed.signature.signer == self.node_id) @@ -798,11 +793,11 @@ mod tests { let cert_msg = fake_cert(8); assert!(pool - .persistent_pool + .validated .get(&CertificationMessageId::from(&share_msg)) .is_none()); assert!(pool - .persistent_pool + .validated .get(&CertificationMessageId::from(&cert_msg)) .is_none()); @@ -822,13 +817,13 @@ mod tests { ); assert_eq!( share_msg, - pool.persistent_pool + pool.validated .get(&CertificationMessageId::from(&share_msg)) .unwrap() ); assert_eq!( cert_msg, - pool.persistent_pool + pool.validated .get(&CertificationMessageId::from(&cert_msg)) .unwrap() ); diff --git a/rs/consensus/src/certification/certifier.rs b/rs/consensus/src/certification/certifier.rs index b879722c171..0950eca6afd 100644 --- a/rs/consensus/src/certification/certifier.rs +++ b/rs/consensus/src/certification/certifier.rs @@ -1354,7 +1354,7 @@ mod tests { for height in 1..=4 { cert_pool - .persistent_pool + .validated .insert(CertificationMessage::Certification(Certification { height: Height::from(height), signed: Signed { diff --git a/rs/recovery/src/steps.rs b/rs/recovery/src/steps.rs index fd07a9d844d..effcb5a3fcd 100644 --- a/rs/recovery/src/steps.rs +++ b/rs/recovery/src/steps.rs @@ -160,7 +160,7 @@ impl Step for MergeCertificationPoolsStep { "Moving certifications of all nodes to new pool." ); pools.iter().for_each(|(ip, p)| { - p.persistent_pool.certifications().get_all().for_each(|c| { + p.validated.certifications().get_all().for_each(|c| { if let Some(cert) = new_pool.certification_at_height(c.height) { if cert != c { warn!( @@ -176,13 +176,13 @@ impl Step for MergeCertificationPoolsStep { "Height {}: inserting certification from node {ip}", c.height ); new_pool - .persistent_pool + .validated .insert(CertificationMessage::Certification(c)) } }) }); - let max_full_cert = new_pool.persistent_pool.certifications().get_highest().ok(); + let max_full_cert = new_pool.validated.certifications().get_highest().ok(); if let Some(cert) = max_full_cert.as_ref() { info!( @@ -196,17 +196,12 @@ impl Step for MergeCertificationPoolsStep { // Analyze and move shares let max_cert_share = pools .values() - .flat_map(|p| { - p.persistent_pool - .certification_shares() - .get_highest_iter() - .next() - }) + .flat_map(|p| p.validated.certification_shares().get_highest_iter().next()) .max_by_key(|c| c.height); let min_share_height = pools .values() - .flat_map(|p| p.persistent_pool.certification_shares().height_range()) + .flat_map(|p| p.validated.certification_shares().height_range()) .map(|range| range.min.get()) .min(); @@ -247,7 +242,7 @@ impl Step for MergeCertificationPoolsStep { "Inserting share from node {ip}: {:?}", s.signed ); new_pool - .persistent_pool + .validated .insert(CertificationMessage::CertificationShare(s)) }); } @@ -1121,25 +1116,25 @@ mod tests { // only one of them should be kept after the merge. let cert1 = make_certification(1, vec![1, 2, 3]); let cert1_2 = make_certification(1, vec![4, 5, 6]); - pool1.persistent_pool.insert(cert1); - pool2.persistent_pool.insert(cert1_2); + pool1.validated.insert(cert1); + pool2.validated.insert(cert1_2); // Add the same certification for height 2 to both pools, // it should only exists in the merged pool once. let cert2 = make_certification(2, vec![1, 2, 3]); - pool1.persistent_pool.insert(cert2.clone()); - pool2.persistent_pool.insert(cert2); + pool1.validated.insert(cert2.clone()); + pool2.validated.insert(cert2); // Add two more certifications for heights 3 and 4, one to each pool. let cert3 = make_certification(3, vec![1, 2, 3]); let cert4 = make_certification(4, vec![1, 2, 3]); - pool1.persistent_pool.insert(cert4); - pool2.persistent_pool.insert(cert3); + pool1.validated.insert(cert4); + pool2.validated.insert(cert3); // Add a share at height 3 to one pool. It should not be added to the // merged pool as it is lower than the highest full certification (4). let share3 = make_share(3, vec![1], 1); - pool1.persistent_pool.insert(share3); + pool1.validated.insert(share3); step.exec().expect("Failed to execute step."); @@ -1151,19 +1146,15 @@ mod tests { ); assert_eq!( - new_pool.persistent_pool.certifications().get_all().count(), + new_pool.validated.certifications().get_all().count(), 4 // One for each height 1-4 ); assert_eq!( - new_pool - .persistent_pool - .certification_shares() - .get_all() - .count(), + new_pool.validated.certification_shares().get_all().count(), 0 ); let range = new_pool - .persistent_pool + .validated .certifications() .height_range() .expect("no height range"); @@ -1193,14 +1184,14 @@ mod tests { let share6 = make_share(6, vec![6], 1); let share6_2 = make_share(6, vec![6, 2], 2); - pool1.persistent_pool.insert(cert4); - pool1.persistent_pool.insert(share3); - pool1.persistent_pool.insert(share4); - pool1.persistent_pool.insert(share5.clone()); - pool1.persistent_pool.insert(share6_2); + pool1.validated.insert(cert4); + pool1.validated.insert(share3); + pool1.validated.insert(share4); + pool1.validated.insert(share5.clone()); + pool1.validated.insert(share6_2); - pool2.persistent_pool.insert(share5); - pool2.persistent_pool.insert(share6); + pool2.validated.insert(share5); + pool2.validated.insert(share6); step.exec().expect("Failed to execute step."); @@ -1212,15 +1203,11 @@ mod tests { ); assert_eq!( - new_pool - .persistent_pool - .certification_shares() - .get_all() - .count(), + new_pool.validated.certification_shares().get_all().count(), 3 // share5, share6, share6_2 ); let range = new_pool - .persistent_pool + .validated .certification_shares() .height_range() .expect("no height range"); diff --git a/rs/replay/src/player.rs b/rs/replay/src/player.rs index 1499c9ba73c..717496376ef 100644 --- a/rs/replay/src/player.rs +++ b/rs/replay/src/player.rs @@ -1238,7 +1238,7 @@ fn find_malicious_nodes( ) -> HashSet { let mut malicious = HashSet::new(); if let Some(range) = certification_pool - .persistent_pool + .validated .certification_shares() .height_range() { @@ -1494,9 +1494,7 @@ mod tests { make_share(3, vec![3], 7), ]; - shares - .into_iter() - .for_each(|s| pool.persistent_pool.insert(s)); + shares.into_iter().for_each(|s| pool.validated.insert(s)); let malicious = find_malicious_nodes(&pool, Height::new(0), &verify); assert_eq!(malicious.len(), 1);