Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
dist1ll committed Oct 25, 2024
1 parent 124ab18 commit 57e8d68
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 76 deletions.
10 changes: 3 additions & 7 deletions rs/artifact_pool/src/bin/consensus_pool_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,13 @@ fn export(path: &str, matches: &clap::ArgMatches) {
}
}
"Certification" => {
for x in certification_pool
.persistent_pool
.certifications()
.get_all()
{
for x in certification_pool.validated.certifications().get_all() {
println!("{}", to_string(&CertificationMessage::Certification(x)));
}
}
"CertificationShare" => {
for x in certification_pool
.persistent_pool
.validated
.certification_shares()
.get_all()
{
Expand Down Expand Up @@ -250,7 +246,7 @@ fn import(path: &str) {
});
consensus_pool.validated.mutate(ops);
} else if let Ok(msg) = from_str(&s) {
certification_pool.persistent_pool.insert(msg)
certification_pool.validated.insert(msg)
} else {
panic!("Failed to parse JSON: {}", s);
}
Expand Down
47 changes: 21 additions & 26 deletions rs/artifact_pool/src/certification_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct CertificationPoolImpl {
unvalidated_cert_index: HeightIndex<CertificationMessageHash>,
unvalidated: BTreeMap<CertificationMessageHash, CertificationMessage>,

pub persistent_pool: Box<dyn MutablePoolSection + Send + Sync>,
pub validated: Box<dyn MutablePoolSection + Send + Sync>,

unvalidated_pool_metrics: PoolMetrics,
validated_pool_metrics: PoolMetrics,
Expand Down Expand Up @@ -82,7 +82,7 @@ impl CertificationPoolImpl {
unvalidated_share_index: HeightIndex::default(),
unvalidated_cert_index: HeightIndex::default(),
unvalidated: BTreeMap::default(),
persistent_pool,
validated: persistent_pool,
invalidated_artifacts: metrics_registry.int_counter(
"certification_invalidated_artifacts",
"The number of invalidated certification artifacts",
Expand All @@ -102,12 +102,12 @@ impl CertificationPoolImpl {
}

fn validated_certifications(&self) -> Box<dyn Iterator<Item = Certification> + '_> {
self.persistent_pool.certifications().get_all()
self.validated.certifications().get_all()
}

fn insert_validated_certification(&self, certification: Certification) {
if let Some(existing_certification) = self
.persistent_pool
.validated
.certifications()
.get_by_height(certification.height)
.next()
Expand All @@ -116,7 +116,7 @@ impl CertificationPoolImpl {
panic!("Certifications are not expected to be added more than once per height.");
}
} else {
self.persistent_pool
self.validated
.insert(CertificationMessage::Certification(certification))
}
}
Expand Down Expand Up @@ -147,11 +147,11 @@ impl CertificationPoolImpl {
self.validated_pool_metrics
.pool_artifacts
.with_label_values(&[CERTIFICATION_ARTIFACT_TYPE])
.set(self.persistent_pool.certifications().size() as i64);
.set(self.validated.certifications().size() as i64);
self.validated_pool_metrics
.pool_artifacts
.with_label_values(&[CERTIFICATION_SHARE_ARTIFACT_TYPE])
.set(self.persistent_pool.certification_shares().size() as i64);
.set(self.validated.certification_shares().size() as i64);

// Unvalidated artifacts metrics
self.unvalidated_pool_metrics
Expand Down Expand Up @@ -216,7 +216,7 @@ impl MutablePool<CertificationMessage> for CertificationPoolImpl {
.received_artifact_bytes
.with_label_values(&[msg.label()])
.observe(std::mem::size_of_val(&msg) as f64);
self.persistent_pool.insert(msg);
self.validated.insert(msg);
}

ChangeAction::MoveToValidated(msg) => {
Expand All @@ -237,7 +237,7 @@ impl MutablePool<CertificationMessage> for CertificationPoolImpl {

match msg {
CertificationMessage::CertificationShare(share) => {
self.persistent_pool
self.validated
.insert(CertificationMessage::CertificationShare(share));
}
CertificationMessage::Certification(cert) => {
Expand All @@ -253,7 +253,7 @@ impl MutablePool<CertificationMessage> for CertificationPoolImpl {
ChangeAction::RemoveAllBelow(height) => {
self.remove_all_unvalidated_below(height);
transmits.extend(
self.persistent_pool
self.validated
.purge_below(height)
.drain(..)
.map(ArtifactTransmit::Abort),
Expand Down Expand Up @@ -299,23 +299,18 @@ pub trait MutablePoolSection {

impl CertificationPool for CertificationPoolImpl {
fn certification_at_height(&self, height: Height) -> Option<Certification> {
self.persistent_pool
.certifications()
.get_by_height(height)
.next()
self.validated.certifications().get_by_height(height).next()
}

fn shares_at_height(
&self,
height: Height,
) -> Box<dyn Iterator<Item = CertificationShare> + '_> {
self.persistent_pool
.certification_shares()
.get_by_height(height)
self.validated.certification_shares().get_by_height(height)
}

fn validated_shares(&self) -> Box<dyn Iterator<Item = CertificationShare> + '_> {
self.persistent_pool.certification_shares().get_all()
self.validated.certification_shares().get_all()
}

fn unvalidated_shares_at_height(
Expand Down Expand Up @@ -394,8 +389,8 @@ impl ValidatedPoolReader<CertificationMessage> for CertificationPoolImpl {
}

fn get_all_for_broadcast(&self) -> Box<dyn Iterator<Item = CertificationMessage> + '_> {
let certification_range = self.persistent_pool.certifications().height_range();
let share_range = self.persistent_pool.certification_shares().height_range();
let certification_range = self.validated.certifications().height_range();
let share_range = self.validated.certification_shares().height_range();

let ranges = [certification_range.as_ref(), share_range.as_ref()]
.into_iter()
Expand All @@ -408,11 +403,11 @@ impl ValidatedPoolReader<CertificationMessage> for CertificationPoolImpl {
// For all heights above the minimum, return the validated certification of the subnet,
// or the share signed by this node if we don't have the aggregate.
let iterator = (min.get()..=max.get()).map(Height::from).flat_map(|h| {
let mut certifications = self.persistent_pool.certifications().get_by_height(h);
let mut certifications = self.validated.certifications().get_by_height(h);
if let Some(certification) = certifications.next() {
vec![CertificationMessage::Certification(certification)]
} else {
self.persistent_pool
self.validated
.certification_shares()
.get_by_height(h)
.filter(|share| share.signed.signature.signer == self.node_id)
Expand Down Expand Up @@ -798,11 +793,11 @@ mod tests {
let cert_msg = fake_cert(8);

assert!(pool
.persistent_pool
.validated
.get(&CertificationMessageId::from(&share_msg))
.is_none());
assert!(pool
.persistent_pool
.validated
.get(&CertificationMessageId::from(&cert_msg))
.is_none());

Expand All @@ -822,13 +817,13 @@ mod tests {
);
assert_eq!(
share_msg,
pool.persistent_pool
pool.validated
.get(&CertificationMessageId::from(&share_msg))
.unwrap()
);
assert_eq!(
cert_msg,
pool.persistent_pool
pool.validated
.get(&CertificationMessageId::from(&cert_msg))
.unwrap()
);
Expand Down
2 changes: 1 addition & 1 deletion rs/consensus/src/certification/certifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,7 @@ mod tests {

for height in 1..=4 {
cert_pool
.persistent_pool
.validated
.insert(CertificationMessage::Certification(Certification {
height: Height::from(height),
signed: Signed {
Expand Down
63 changes: 25 additions & 38 deletions rs/recovery/src/steps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl Step for MergeCertificationPoolsStep {
"Moving certifications of all nodes to new pool."
);
pools.iter().for_each(|(ip, p)| {
p.persistent_pool.certifications().get_all().for_each(|c| {
p.validated.certifications().get_all().for_each(|c| {
if let Some(cert) = new_pool.certification_at_height(c.height) {
if cert != c {
warn!(
Expand All @@ -176,13 +176,13 @@ impl Step for MergeCertificationPoolsStep {
"Height {}: inserting certification from node {ip}", c.height
);
new_pool
.persistent_pool
.validated
.insert(CertificationMessage::Certification(c))
}
})
});

let max_full_cert = new_pool.persistent_pool.certifications().get_highest().ok();
let max_full_cert = new_pool.validated.certifications().get_highest().ok();

if let Some(cert) = max_full_cert.as_ref() {
info!(
Expand All @@ -196,17 +196,12 @@ impl Step for MergeCertificationPoolsStep {
// Analyze and move shares
let max_cert_share = pools
.values()
.flat_map(|p| {
p.persistent_pool
.certification_shares()
.get_highest_iter()
.next()
})
.flat_map(|p| p.validated.certification_shares().get_highest_iter().next())
.max_by_key(|c| c.height);

let min_share_height = pools
.values()
.flat_map(|p| p.persistent_pool.certification_shares().height_range())
.flat_map(|p| p.validated.certification_shares().height_range())
.map(|range| range.min.get())
.min();

Expand Down Expand Up @@ -247,7 +242,7 @@ impl Step for MergeCertificationPoolsStep {
"Inserting share from node {ip}: {:?}", s.signed
);
new_pool
.persistent_pool
.validated
.insert(CertificationMessage::CertificationShare(s))
});
}
Expand Down Expand Up @@ -1121,25 +1116,25 @@ mod tests {
// only one of them should be kept after the merge.
let cert1 = make_certification(1, vec![1, 2, 3]);
let cert1_2 = make_certification(1, vec![4, 5, 6]);
pool1.persistent_pool.insert(cert1);
pool2.persistent_pool.insert(cert1_2);
pool1.validated.insert(cert1);
pool2.validated.insert(cert1_2);

// Add the same certification for height 2 to both pools,
// it should only exists in the merged pool once.
let cert2 = make_certification(2, vec![1, 2, 3]);
pool1.persistent_pool.insert(cert2.clone());
pool2.persistent_pool.insert(cert2);
pool1.validated.insert(cert2.clone());
pool2.validated.insert(cert2);

// Add two more certifications for heights 3 and 4, one to each pool.
let cert3 = make_certification(3, vec![1, 2, 3]);
let cert4 = make_certification(4, vec![1, 2, 3]);
pool1.persistent_pool.insert(cert4);
pool2.persistent_pool.insert(cert3);
pool1.validated.insert(cert4);
pool2.validated.insert(cert3);

// Add a share at height 3 to one pool. It should not be added to the
// merged pool as it is lower than the highest full certification (4).
let share3 = make_share(3, vec![1], 1);
pool1.persistent_pool.insert(share3);
pool1.validated.insert(share3);

step.exec().expect("Failed to execute step.");

Expand All @@ -1151,19 +1146,15 @@ mod tests {
);

assert_eq!(
new_pool.persistent_pool.certifications().get_all().count(),
new_pool.validated.certifications().get_all().count(),
4 // One for each height 1-4
);
assert_eq!(
new_pool
.persistent_pool
.certification_shares()
.get_all()
.count(),
new_pool.validated.certification_shares().get_all().count(),
0
);
let range = new_pool
.persistent_pool
.validated
.certifications()
.height_range()
.expect("no height range");
Expand Down Expand Up @@ -1193,14 +1184,14 @@ mod tests {
let share6 = make_share(6, vec![6], 1);
let share6_2 = make_share(6, vec![6, 2], 2);

pool1.persistent_pool.insert(cert4);
pool1.persistent_pool.insert(share3);
pool1.persistent_pool.insert(share4);
pool1.persistent_pool.insert(share5.clone());
pool1.persistent_pool.insert(share6_2);
pool1.validated.insert(cert4);
pool1.validated.insert(share3);
pool1.validated.insert(share4);
pool1.validated.insert(share5.clone());
pool1.validated.insert(share6_2);

pool2.persistent_pool.insert(share5);
pool2.persistent_pool.insert(share6);
pool2.validated.insert(share5);
pool2.validated.insert(share6);

step.exec().expect("Failed to execute step.");

Expand All @@ -1212,15 +1203,11 @@ mod tests {
);

assert_eq!(
new_pool
.persistent_pool
.certification_shares()
.get_all()
.count(),
new_pool.validated.certification_shares().get_all().count(),
3 // share5, share6, share6_2
);
let range = new_pool
.persistent_pool
.validated
.certification_shares()
.height_range()
.expect("no height range");
Expand Down
6 changes: 2 additions & 4 deletions rs/replay/src/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ fn find_malicious_nodes(
) -> HashSet<NodeId> {
let mut malicious = HashSet::new();
if let Some(range) = certification_pool
.persistent_pool
.validated
.certification_shares()
.height_range()
{
Expand Down Expand Up @@ -1494,9 +1494,7 @@ mod tests {
make_share(3, vec![3], 7),
];

shares
.into_iter()
.for_each(|s| pool.persistent_pool.insert(s));
shares.into_iter().for_each(|s| pool.validated.insert(s));

let malicious = find_malicious_nodes(&pool, Height::new(0), &verify);
assert_eq!(malicious.len(), 1);
Expand Down

0 comments on commit 57e8d68

Please sign in to comment.