diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 439f92bada9a..88822d6f286e 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -324,6 +324,29 @@ where } } +/// Represents the different modes of transaction propagation. +/// +/// This enum is used to determine how transactions are propagated to peers in the network. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum PropgateKind { + /// Default propagation mode. + /// + /// Transactions are only sent to peers that haven't seen them yet. + Basic, + /// Forced propagation mode. + /// + /// Transactions are sent to all peers regardless of whether they have been sent or received + /// before. + Forced, +} + +impl PropgateKind { + /// Returns `true` if the propagation kind is `Forced`. + pub const fn is_forced(self) -> bool { + matches!(self, Self::Forced) + } +} + impl TransactionsManager where Pool: TransactionPool + 'static, @@ -416,6 +439,7 @@ where fn propagate_all(&mut self, hashes: Vec) { let propagated = self.propagate_transactions( self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(), + PropgateKind::Basic, ); // notify pool so events get fired @@ -431,6 +455,7 @@ where fn propagate_transactions( &mut self, to_propagate: Vec, + propagate_kind: PropgateKind, ) -> PropagatedTransactions { let mut propagated = PropagatedTransactions::default(); if self.network.tx_gossip_disabled() { @@ -454,9 +479,12 @@ where // peer. for tx in &to_propagate { // Only proceed if the transaction is not in the peer's list of seen transactions - if !peer.seen_transactions.contains(&tx.hash()) { + if propagate_kind.is_forced() { // add transaction to the list of hashes to propagate builder.push(tx); + } else if !peer.seen_transactions.contains(&tx.hash()) { + // Include the transaction if the peer hasn't seen it + builder.push(tx); } } @@ -476,8 +504,13 @@ where for hash in new_pooled_hashes.iter_hashes().copied() { propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id)); - // mark transaction as seen by peer - peer.seen_transactions.insert(hash); + } + + // Update cache if respecting cache + if !propagate_kind.is_forced() { + for hash in new_pooled_hashes.iter_hashes().copied() { + peer.seen_transactions.insert(hash); + } } trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer"); @@ -488,10 +521,17 @@ where // send full transactions, if any if let Some(new_full_transactions) = full { + // Record propagated full transactions for tx in &new_full_transactions { propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(*peer_id)); - // mark transaction as seen by peer - peer.seen_transactions.insert(tx.hash()); + } + + // Update cache if respecting cache + if !propagate_kind.is_forced() { + for tx in &new_full_transactions { + // Mark transaction as seen by peer + peer.seen_transactions.insert(tx.hash()); + } } trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer"); @@ -514,6 +554,7 @@ where &mut self, txs: Vec, peer_id: PeerId, + propagate_kind: PropgateKind, ) -> Option { trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer"); @@ -527,7 +568,11 @@ where // Iterate through the transactions to propagate and fill the hashes and full transaction for tx in to_propagate { - if !peer.seen_transactions.contains(&tx.hash()) { + if propagate_kind.is_forced() { + // Always include the transaction, regardless of cache + full_transactions.push(&tx); + } else if !peer.seen_transactions.contains(&tx.hash()) { + // Only include if the peer hasn't seen the transaction full_transactions.push(&tx); } } @@ -543,9 +588,15 @@ where if let Some(new_pooled_hashes) = pooled { for hash in new_pooled_hashes.iter_hashes().copied() { propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id)); - // mark transaction as seen by peer - peer.seen_transactions.insert(hash); } + + // mark transaction as seen by peer + if !propagate_kind.is_forced() { + for hash in new_pooled_hashes.iter_hashes().copied() { + peer.seen_transactions.insert(hash); + } + } + // send hashes of transactions self.network.send_transactions_hashes(peer_id, new_pooled_hashes); } @@ -554,9 +605,15 @@ where if let Some(new_full_transactions) = full { for tx in &new_full_transactions { propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(peer_id)); - // mark transaction as seen by peer - peer.seen_transactions.insert(tx.hash()); } + + // mark transaction as seen by peer + if !propagate_kind.is_forced() { + for tx in &new_full_transactions { + peer.seen_transactions.insert(tx.hash()); + } + } + // send full transactions self.network.send_transactions(peer_id, new_full_transactions); } @@ -570,7 +627,12 @@ where /// Propagate the transaction hashes to the given peer /// /// Note: This will only send the hashes for transactions that exist in the pool. - fn propagate_hashes_to(&mut self, hashes: Vec, peer_id: PeerId) { + fn propagate_hashes_to( + &mut self, + hashes: Vec, + peer_id: PeerId, + propagate_kind: PropgateKind, + ) { trace!(target: "net::tx", "Start propagating transactions as hashes"); // This fetches a transactions from the pool, including the blob transactions, which are @@ -590,7 +652,11 @@ where let mut hashes = PooledTransactionsHashesBuilder::new(peer.version); for tx in to_propagate { - if !peer.seen_transactions.insert(tx.hash()) { + if propagate_kind.is_forced() { + // Always include the transaction + hashes.push(&tx); + } else if !peer.seen_transactions.contains(&tx.hash()) { + // Include if the peer hasn't seen it hashes.push(&tx); } } @@ -606,6 +672,13 @@ where propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id)); } + // Update the cache if respecting cache + if !propagate_kind.is_forced() { + for hash in new_pooled_hashes.iter_hashes().copied() { + peer.seen_transactions.insert(hash); + } + } + trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer"); // send hashes of transactions @@ -880,14 +953,16 @@ where self.on_new_pending_transactions(vec![hash]) } TransactionsCommand::PropagateHashesTo(hashes, peer) => { - self.propagate_hashes_to(hashes, peer) + self.propagate_hashes_to(hashes, peer, PropgateKind::Forced) } TransactionsCommand::GetActivePeers(tx) => { let peers = self.peers.keys().copied().collect::>(); tx.send(peers).ok(); } TransactionsCommand::PropagateTransactionsTo(txs, peer) => { - if let Some(propagated) = self.propagate_full_transactions_to_peer(txs, peer) { + if let Some(propagated) = + self.propagate_full_transactions_to_peer(txs, peer, PropgateKind::Forced) + { self.pool.on_propagated(propagated); } } @@ -2388,7 +2463,7 @@ mod tests { let eip4844_tx = Arc::new(factory.create_eip4844()); propagate.push(PropagateTransaction::new(eip4844_tx.clone())); - let propagated = tx_manager.propagate_transactions(propagate.clone()); + let propagated = tx_manager.propagate_transactions(propagate.clone(), PropgateKind::Basic); assert_eq!(propagated.0.len(), 2); let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap(); assert_eq!(prop_txs.len(), 1); @@ -2404,7 +2479,7 @@ mod tests { peer.seen_transactions.contains(eip4844_tx.transaction.hash()); // propagate again - let propagated = tx_manager.propagate_transactions(propagate); + let propagated = tx_manager.propagate_transactions(propagate, PropgateKind::Basic); assert!(propagated.0.is_empty()); } }