Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TransactionsHandle propagation commands should not adhere to caching #12079

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 68 additions & 16 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ where
fn propagate_all(&mut self, hashes: Vec<TxHash>) {
let propagated = self.propagate_transactions(
self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(),
PropagateKind::Basic,
);

// notify pool so events get fired
Expand All @@ -431,6 +432,7 @@ where
fn propagate_transactions(
&mut self,
to_propagate: Vec<PropagateTransaction>,
propagate_kind: PropagateKind,
) -> PropagatedTransactions {
let mut propagated = PropagatedTransactions::default();
if self.network.tx_gossip_disabled() {
Expand All @@ -454,9 +456,12 @@ where
// peer.
for tx in &to_propagate {
// Only proceed if the transaction is not in the peer's list of seen transactions
if !peer.seen_transactions.contains(&tx.hash()) {
if propagate_kind.is_forced() {
// add transaction to the list of hashes to propagate
builder.push(tx);
} else if !peer.seen_transactions.contains(&tx.hash()) {
// Include the transaction if the peer hasn't seen it
builder.push(tx);
}
}

Expand All @@ -476,8 +481,13 @@ where

for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(hash);
}

// Update cache if respecting cache
if !propagate_kind.is_forced() {
for hash in new_pooled_hashes.iter_hashes().copied() {
peer.seen_transactions.insert(hash);
}
}

trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
Expand All @@ -488,10 +498,17 @@ where

// send full transactions, if any
if let Some(new_full_transactions) = full {
// Record propagated full transactions
for tx in &new_full_transactions {
propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(*peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(tx.hash());
}

// Update cache if respecting cache
if !propagate_kind.is_forced() {
for tx in &new_full_transactions {
// Mark transaction as seen by peer
peer.seen_transactions.insert(tx.hash());
}
}

trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
Expand All @@ -514,6 +531,7 @@ where
&mut self,
txs: Vec<TxHash>,
peer_id: PeerId,
propagate_kind: PropagateKind,
) -> Option<PropagatedTransactions> {
trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");

Expand All @@ -527,7 +545,11 @@ where

// Iterate through the transactions to propagate and fill the hashes and full transaction
for tx in to_propagate {
if !peer.seen_transactions.contains(&tx.hash()) {
if propagate_kind.is_forced() {
// Always include the transaction, regardless of cache
full_transactions.push(&tx);
} else if !peer.seen_transactions.contains(&tx.hash()) {
// Only include if the peer hasn't seen the transaction
full_transactions.push(&tx);
}
}
Expand All @@ -543,9 +565,15 @@ where
if let Some(new_pooled_hashes) = pooled {
for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(hash);
}

// mark transaction as seen by peer
if !propagate_kind.is_forced() {
for hash in new_pooled_hashes.iter_hashes().copied() {
peer.seen_transactions.insert(hash);
}
}

// send hashes of transactions
self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
}
Expand All @@ -554,9 +582,15 @@ where
if let Some(new_full_transactions) = full {
for tx in &new_full_transactions {
propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(tx.hash());
}

// mark transaction as seen by peer
if !propagate_kind.is_forced() {
for tx in &new_full_transactions {
peer.seen_transactions.insert(tx.hash());
}
}

// send full transactions
self.network.send_transactions(peer_id, new_full_transactions);
}
Expand All @@ -570,7 +604,12 @@ where
/// Propagate the transaction hashes to the given peer
///
/// Note: This will only send the hashes for transactions that exist in the pool.
fn propagate_hashes_to(&mut self, hashes: Vec<TxHash>, peer_id: PeerId) {
fn propagate_hashes_to(
&mut self,
hashes: Vec<TxHash>,
peer_id: PeerId,
propagate_kind: PropagateKind,
) {
trace!(target: "net::tx", "Start propagating transactions as hashes");

// This fetches a transactions from the pool, including the blob transactions, which are
Expand All @@ -590,7 +629,11 @@ where
let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);

for tx in to_propagate {
if !peer.seen_transactions.insert(tx.hash()) {
if propagate_kind.is_forced() {
// Always include the transaction
hashes.push(&tx);
} else if !peer.seen_transactions.contains(&tx.hash()) {
// Include if the peer hasn't seen it
hashes.push(&tx);
}
}
Expand All @@ -606,6 +649,13 @@ where
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
}

// Update the cache if respecting cache
if !propagate_kind.is_forced() {
for hash in new_pooled_hashes.iter_hashes().copied() {
peer.seen_transactions.insert(hash);
}
}

trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");

// send hashes of transactions
Expand Down Expand Up @@ -880,14 +930,16 @@ where
self.on_new_pending_transactions(vec![hash])
}
TransactionsCommand::PropagateHashesTo(hashes, peer) => {
self.propagate_hashes_to(hashes, peer)
self.propagate_hashes_to(hashes, peer, PropagateKind::Forced)
}
TransactionsCommand::GetActivePeers(tx) => {
let peers = self.peers.keys().copied().collect::<HashSet<_>>();
tx.send(peers).ok();
}
TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
if let Some(propagated) = self.propagate_full_transactions_to_peer(txs, peer) {
if let Some(propagated) =
self.propagate_full_transactions_to_peer(txs, peer, PropagateKind::Forced)
{
self.pool.on_propagated(propagated);
}
}
Expand Down Expand Up @@ -2388,7 +2440,7 @@ mod tests {
let eip4844_tx = Arc::new(factory.create_eip4844());
propagate.push(PropagateTransaction::new(eip4844_tx.clone()));

let propagated = tx_manager.propagate_transactions(propagate.clone());
let propagated = tx_manager.propagate_transactions(propagate.clone(), PropagateKind::Basic);
assert_eq!(propagated.0.len(), 2);
let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
assert_eq!(prop_txs.len(), 1);
Expand All @@ -2404,7 +2456,7 @@ mod tests {
peer.seen_transactions.contains(eip4844_tx.transaction.hash());

// propagate again
let propagated = tx_manager.propagate_transactions(propagate);
let propagated = tx_manager.propagate_transactions(propagate, PropagateKind::Basic);
assert!(propagated.0.is_empty());
}
}
22 changes: 12 additions & 10 deletions crates/transaction-pool/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,18 +561,28 @@ pub enum PropagateKind {
Full(PeerId),
/// Only the Hash was propagated to the peer.
Hash(PeerId),
/// Default propagation mode, filters out txs that we already sent or received
Basic,
/// Always propagate, even if we already sent or received the txs.
Forced,
}

// === impl PropagateKind ===

impl PropagateKind {
/// Returns the peer the transaction was sent to
pub const fn peer(&self) -> &PeerId {
pub const fn peer(&self) -> Option<&PeerId> {
match self {
Self::Full(peer) | Self::Hash(peer) => peer,
Self::Full(peer) | Self::Hash(peer) => Some(peer),
Self::Basic | Self::Forced => None,
}
}

/// Returns `true` if the propagation kind is `Forced`.
pub const fn is_forced(self) -> bool {
matches!(self, Self::Forced)
}

/// Returns true if the transaction was sent as a full transaction
pub const fn is_full(&self) -> bool {
matches!(self, Self::Full(_))
Expand All @@ -584,14 +594,6 @@ impl PropagateKind {
}
}

impl From<PropagateKind> for PeerId {
fn from(value: PropagateKind) -> Self {
match value {
PropagateKind::Full(peer) | PropagateKind::Hash(peer) => peer,
}
}
}

/// Represents a new transaction
#[derive(Debug)]
pub struct NewTransactionEvent<T: PoolTransaction> {
Expand Down
Loading