Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TransactionsHandle propagation commands should not adhere to caching #12079

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 91 additions & 16 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,29 @@ where
}
}

/// Represents the different modes of transaction propagation.
///
/// This enum is used to determine how transactions are propagated to peers in the network.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum PropgateKind {
/// Default propagation mode.
///
/// Transactions are only sent to peers that haven't seen them yet.
Basic,
/// Forced propagation mode.
///
/// Transactions are sent to all peers regardless of whether they have been sent or received
/// before.
Forced,
}

impl PropgateKind {
/// Returns `true` if the propagation kind is `Forced`.
pub const fn is_forced(self) -> bool {
matches!(self, Self::Forced)
}
}

impl<Pool> TransactionsManager<Pool>
where
Pool: TransactionPool + 'static,
Expand Down Expand Up @@ -416,6 +439,7 @@ where
fn propagate_all(&mut self, hashes: Vec<TxHash>) {
let propagated = self.propagate_transactions(
self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(),
PropgateKind::Basic,
);

// notify pool so events get fired
Expand All @@ -431,6 +455,7 @@ where
fn propagate_transactions(
&mut self,
to_propagate: Vec<PropagateTransaction>,
propagate_kind: PropgateKind,
) -> PropagatedTransactions {
let mut propagated = PropagatedTransactions::default();
if self.network.tx_gossip_disabled() {
Expand All @@ -454,9 +479,12 @@ where
// peer.
for tx in &to_propagate {
// Only proceed if the transaction is not in the peer's list of seen transactions
if !peer.seen_transactions.contains(&tx.hash()) {
if propagate_kind.is_forced() {
// add transaction to the list of hashes to propagate
builder.push(tx);
} else if !peer.seen_transactions.contains(&tx.hash()) {
// Include the transaction if the peer hasn't seen it
builder.push(tx);
}
}

Expand All @@ -476,8 +504,13 @@ where

for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(hash);
}

// Update cache if respecting cache
if !propagate_kind.is_forced() {
for hash in new_pooled_hashes.iter_hashes().copied() {
peer.seen_transactions.insert(hash);
}
}

trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
Expand All @@ -488,10 +521,17 @@ where

// send full transactions, if any
if let Some(new_full_transactions) = full {
// Record propagated full transactions
for tx in &new_full_transactions {
propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(*peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(tx.hash());
}

// Update cache if respecting cache
if !propagate_kind.is_forced() {
for tx in &new_full_transactions {
// Mark transaction as seen by peer
peer.seen_transactions.insert(tx.hash());
}
}

trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
Expand All @@ -514,6 +554,7 @@ where
&mut self,
txs: Vec<TxHash>,
peer_id: PeerId,
propagate_kind: PropgateKind,
) -> Option<PropagatedTransactions> {
trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");

Expand All @@ -527,7 +568,11 @@ where

// Iterate through the transactions to propagate and fill the hashes and full transaction
for tx in to_propagate {
if !peer.seen_transactions.contains(&tx.hash()) {
if propagate_kind.is_forced() {
// Always include the transaction, regardless of cache
full_transactions.push(&tx);
} else if !peer.seen_transactions.contains(&tx.hash()) {
// Only include if the peer hasn't seen the transaction
full_transactions.push(&tx);
}
}
Expand All @@ -543,9 +588,15 @@ where
if let Some(new_pooled_hashes) = pooled {
for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(hash);
}

// mark transaction as seen by peer
if !propagate_kind.is_forced() {
for hash in new_pooled_hashes.iter_hashes().copied() {
peer.seen_transactions.insert(hash);
}
}

// send hashes of transactions
self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
}
Expand All @@ -554,9 +605,15 @@ where
if let Some(new_full_transactions) = full {
for tx in &new_full_transactions {
propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(tx.hash());
}

// mark transaction as seen by peer
if !propagate_kind.is_forced() {
for tx in &new_full_transactions {
peer.seen_transactions.insert(tx.hash());
}
}

// send full transactions
self.network.send_transactions(peer_id, new_full_transactions);
}
Expand All @@ -570,7 +627,12 @@ where
/// Propagate the transaction hashes to the given peer
///
/// Note: This will only send the hashes for transactions that exist in the pool.
fn propagate_hashes_to(&mut self, hashes: Vec<TxHash>, peer_id: PeerId) {
fn propagate_hashes_to(
&mut self,
hashes: Vec<TxHash>,
peer_id: PeerId,
propagate_kind: PropgateKind,
) {
trace!(target: "net::tx", "Start propagating transactions as hashes");

// This fetches a transactions from the pool, including the blob transactions, which are
Expand All @@ -590,7 +652,11 @@ where
let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);

for tx in to_propagate {
if !peer.seen_transactions.insert(tx.hash()) {
if propagate_kind.is_forced() {
// Always include the transaction
hashes.push(&tx);
} else if !peer.seen_transactions.contains(&tx.hash()) {
// Include if the peer hasn't seen it
hashes.push(&tx);
}
}
Expand All @@ -606,6 +672,13 @@ where
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
}

// Update the cache if respecting cache
if !propagate_kind.is_forced() {
for hash in new_pooled_hashes.iter_hashes().copied() {
peer.seen_transactions.insert(hash);
}
}

trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");

// send hashes of transactions
Expand Down Expand Up @@ -880,14 +953,16 @@ where
self.on_new_pending_transactions(vec![hash])
}
TransactionsCommand::PropagateHashesTo(hashes, peer) => {
self.propagate_hashes_to(hashes, peer)
self.propagate_hashes_to(hashes, peer, PropgateKind::Forced)
}
TransactionsCommand::GetActivePeers(tx) => {
let peers = self.peers.keys().copied().collect::<HashSet<_>>();
tx.send(peers).ok();
}
TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
if let Some(propagated) = self.propagate_full_transactions_to_peer(txs, peer) {
if let Some(propagated) =
self.propagate_full_transactions_to_peer(txs, peer, PropgateKind::Forced)
{
self.pool.on_propagated(propagated);
}
}
Expand Down Expand Up @@ -2388,7 +2463,7 @@ mod tests {
let eip4844_tx = Arc::new(factory.create_eip4844());
propagate.push(PropagateTransaction::new(eip4844_tx.clone()));

let propagated = tx_manager.propagate_transactions(propagate.clone());
let propagated = tx_manager.propagate_transactions(propagate.clone(), PropgateKind::Basic);
assert_eq!(propagated.0.len(), 2);
let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
assert_eq!(prop_txs.len(), 1);
Expand All @@ -2404,7 +2479,7 @@ mod tests {
peer.seen_transactions.contains(eip4844_tx.transaction.hash());

// propagate again
let propagated = tx_manager.propagate_transactions(propagate);
let propagated = tx_manager.propagate_transactions(propagate, PropgateKind::Basic);
assert!(propagated.0.is_empty());
}
}
Loading