From 98beab48188cfaf4fc65cde780990783a3855d40 Mon Sep 17 00:00:00 2001 From: YI Date: Fri, 27 Sep 2024 06:23:59 +0000 Subject: [PATCH 01/11] Add a peer store to network actor Save peer information to network actor state Maybe connect to peer when on node annoouncement received Persist state when peer addresses changed --- src/fiber/graph.rs | 31 ----- src/fiber/network.rs | 215 +++++++++++++++++++++++++++------- src/fiber/tests/graph.rs | 46 +------- src/fiber/tests/test_utils.rs | 70 ++++------- src/fiber/types.rs | 4 + src/main.rs | 31 ++--- src/store.rs | 114 ++++++++---------- 7 files changed, 260 insertions(+), 251 deletions(-) diff --git a/src/fiber/graph.rs b/src/fiber/graph.rs index 525c2f2c..a357cce5 100644 --- a/src/fiber/graph.rs +++ b/src/fiber/graph.rs @@ -11,8 +11,6 @@ use ckb_types::packed::{OutPoint, Script}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use std::collections::HashMap; -use tentacle::multiaddr::Multiaddr; -use tentacle::secio::PeerId; use thiserror::Error; use tracing::log::error; use tracing::{debug, info, warn}; @@ -138,8 +136,6 @@ pub struct NetworkGraph { // Similar to the best_height, this is the last update time of the network graph. // We assume that we have already synced the graph up to this time - ASSUME_MAX_MESSAGE_TIMESTAMP_GAP. last_update_timestamp: u64, - // when we restarting a node, we will reconnect to these peers - connected_peer_addresses: HashMap, nodes: HashMap, store: S, chain_hash: Hash256, @@ -172,7 +168,6 @@ where last_update_timestamp: 0, channels: HashMap::new(), nodes: HashMap::new(), - connected_peer_addresses: HashMap::new(), store, chain_hash: get_chain_hash(), }; @@ -212,9 +207,6 @@ where } self.nodes.insert(node.node_id, node.clone()); } - for (peer, addr) in self.store.get_connected_peer(None) { - self.connected_peer_addresses.insert(peer, addr); - } } pub fn get_best_height(&self) -> u64 { @@ -421,25 +413,6 @@ where self.chain_hash == chain_hash } - pub fn add_connected_peer(&mut self, peer_id: &PeerId, address: Multiaddr) { - self.connected_peer_addresses - .insert(peer_id.clone(), address.clone()); - self.store.insert_connected_peer(peer_id.clone(), address); - } - - pub fn get_connected_peers(&self) -> Vec<(&PeerId, &Multiaddr)> { - self.connected_peer_addresses.iter().collect() - } - - pub fn get_peers_to_sync_network_graph(&self) -> Vec<(&PeerId, &Multiaddr)> { - self.connected_peer_addresses.iter().take(3).collect() - } - - pub fn remove_connected_peer(&mut self, peer_id: &PeerId) { - self.connected_peer_addresses.remove(peer_id); - self.store.remove_connected_peer(peer_id); - } - pub fn get_node_inbounds( &self, node_id: Pubkey, @@ -479,7 +452,6 @@ where pub fn reset(&mut self) { self.channels.clear(); self.nodes.clear(); - self.connected_peer_addresses.clear(); } pub fn init_payment_session(&self, payment_request: SendPaymentCommand) -> PaymentSession { @@ -808,9 +780,6 @@ pub trait NetworkGraphStateStore { ) -> (Vec, JsonBytes); fn insert_channel(&self, channel: ChannelInfo); fn insert_node(&self, node: NodeInfo); - fn insert_connected_peer(&self, peer_id: PeerId, multiaddr: Multiaddr); - fn get_connected_peer(&self, peer_id: Option) -> Vec<(PeerId, Multiaddr)>; - fn remove_connected_peer(&self, peer_id: &PeerId); fn get_payment_session(&self, payment_hash: Hash256) -> Option; fn insert_payment_session(&self, session: PaymentSession); } diff --git a/src/fiber/network.rs b/src/fiber/network.rs index ded5b300..6d8fa3d6 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -1,5 +1,3 @@ -use crate::ckb::config::UdtCfgInfos; -use crate::fiber::serde_utils::EntityHex; use ckb_hash::blake2b_256; use ckb_jsonrpc_types::{BlockNumber, Status, TxStatus}; use ckb_types::core::TransactionView; @@ -14,8 +12,9 @@ use ractor::{ use rand::Rng; use secp256k1::{Message, Secp256k1}; use serde::{Deserialize, Serialize}; -use serde_with::serde_as; +use serde_with::{serde_as, DisplayFromStr}; use std::borrow::Cow; +use std::collections::hash_map::Entry; use tentacle::utils::extract_peer_id; use std::collections::{HashMap, HashSet}; @@ -67,12 +66,14 @@ use super::types::{ }; use super::{FiberConfig, ASSUME_NETWORK_ACTOR_ALIVE}; +use crate::ckb::config::UdtCfgInfos; use crate::ckb::contracts::{check_udt_script, get_udt_whitelist, is_udt_type_auto_accept}; use crate::ckb::{CkbChainMessage, FundingRequest, FundingTx, TraceTxRequest, TraceTxResponse}; use crate::fiber::channel::{ AddTlcCommand, AddTlcResponse, TxCollaborationCommand, TxUpdateCommand, }; use crate::fiber::graph::{ChannelInfo, PaymentSession}; +use crate::fiber::serde_utils::EntityHex; use crate::fiber::types::{ secp256k1_instance, FiberChannelMessage, PaymentOnionPacket, PeeledPaymentOnionPacket, TxSignatures, @@ -539,7 +540,8 @@ pub struct NetworkActor { impl NetworkActor where - S: ChannelActorStateStore + S: NetworkActorStateStore + + ChannelActorStateStore + NetworkGraphStateStore + InvoiceStore + Clone @@ -669,10 +671,7 @@ where } }; for message in messages { - if let Err(e) = self - .process_broadcasted_message(&state.network, message) - .await - { + if let Err(e) = self.process_broadcasted_message(state, message).await { let fail_message = format!("Failed to process broadcasted message: {:?}", &e); error!("{}", &fail_message); @@ -1007,10 +1006,6 @@ where self.on_service_event(e).await; } NetworkActorEvent::PeerConnected(id, pubkey, session) => { - self.network_graph - .write() - .await - .add_connected_peer(&id, session.address.clone()); state.on_peer_connected(&id, pubkey, &session).await; // Notify outside observers. myself @@ -1023,7 +1018,6 @@ where .expect(ASSUME_NETWORK_MYSELF_ALIVE); } NetworkActorEvent::PeerDisconnected(id, session) => { - self.network_graph.write().await.remove_connected_peer(&id); state.on_peer_disconnected(&id); // Notify outside observers. myself @@ -1535,10 +1529,7 @@ where ); for message in broadcasted_message_queue { let (_peer_id, message) = message; - if let Err(e) = self - .process_broadcasted_message(&state.network, message) - .await - { + if let Err(e) = self.process_broadcasted_message(state, message).await { error!("Failed to process broadcasted message: {:?}", e); } } @@ -1753,13 +1744,12 @@ where NetworkActorCommand::BroadcastMessage(message.clone()), )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); - self.process_broadcasted_message(&state.network, message) - .await + self.process_broadcasted_message(state, message).await } async fn process_broadcasted_message( &self, - network: &ActorRef, + state: &mut NetworkActorState, message: FiberBroadcastMessage, ) -> Result<(), Error> { match message { @@ -1787,16 +1777,21 @@ where // TODO: bookkeeping how many nodes we have connected to. Stop connecting once we surpass a threshold. for addr in &node_announcement.addresses { - network.send_message(NetworkActorMessage::new_command( - NetworkActorCommand::ConnectPeer(addr.clone()), - ))?; + state + .network + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::ConnectPeer(addr.clone()), + ))?; } // Add the node to the network graph. self.network_graph .write() .await - .process_node_announcement(node_announcement); + .process_node_announcement(node_announcement.clone()); + + let peer_id = node_announcement.peer_id(); + state.on_node_announcement(peer_id, node_announcement.addresses.clone()); Ok(()) } _ => { @@ -2193,6 +2188,7 @@ enum RequestState { pub struct NetworkActorState { store: S, + persistent_state: PersistentNetworkActorState, // The name of the node to be announced to the network, may be empty. node_name: Option, peer_id: PeerId, @@ -2264,6 +2260,62 @@ pub struct NetworkActorState { broadcasted_message_queue: Vec<(PeerId, FiberBroadcastMessage)>, } +#[serde_as] +#[derive(Default, Clone, Serialize, Deserialize)] +pub struct PersistentNetworkActorState { + // when we restarting a node, we will reconnect to these peers + #[serde_as(as = "Vec<(DisplayFromStr, _)>")] + peer_store: HashMap>, +} + +impl PersistentNetworkActorState { + pub fn new() -> Self { + Default::default() + } + + /// Save peer addresses to the peer store. If the peer addresses are updated, + /// return true, otherwise return false. + fn save_peer_addresses(&mut self, peer_id: PeerId, addr: Vec) -> bool { + match self.peer_store.entry(peer_id) { + Entry::Occupied(mut entry) => { + if entry.get() == &addr { + false + } else { + entry.insert(addr); + true + } + } + Entry::Vacant(entry) => { + entry.insert(addr); + true + } + } + } + + fn get_peers_to_sync_graph(&self) -> HashMap> { + const NUM_PEERS_TO_SYNC_NETWORK_GRAPH: usize = 5; + self.peer_store + .iter() + .take(NUM_PEERS_TO_SYNC_NETWORK_GRAPH) + .map(|(id, addr)| (id.clone(), addr.clone())) + .collect() + } + + fn get_peers_to_connect(&self) -> HashMap> { + const NUM_PEERS_TO_CONNECT: usize = 20; + self.peer_store + .iter() + .take(NUM_PEERS_TO_CONNECT) + .map(|(id, addr)| (id.clone(), addr.clone())) + .collect() + } +} + +pub trait NetworkActorStateStore { + fn get_network_actor_state(&self, id: &PeerId) -> Option; + fn insert_network_actor_state(&self, id: &PeerId, state: PersistentNetworkActorState); +} + static CHANNEL_ACTOR_NAME_PREFIX: AtomicU64 = AtomicU64::new(0u64); // ractor requires that the actor name is unique, so we add a prefix to the actor name. @@ -2278,7 +2330,8 @@ fn generate_channel_actor_name(local_peer_id: &PeerId, remote_peer_id: &PeerId) impl NetworkActorState where - S: ChannelActorStateStore + S: NetworkActorStateStore + + ChannelActorStateStore + NetworkGraphStateStore + InvoiceStore + Clone @@ -2943,6 +2996,52 @@ where self.maybe_tell_syncer_peer_disconnected(id); } + fn on_node_announcement(&mut self, peer_id: PeerId, addresses: Vec) { + self.maybe_connect_to_peer(&peer_id, &addresses); + self.save_peer_addresses(peer_id, addresses); + } + + fn save_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec) { + if self + .persistent_state + .save_peer_addresses(peer_id, addresses) + { + self.persist_state(); + } + } + + fn persist_state(&self) { + self.store + .insert_network_actor_state(&self.peer_id, self.persistent_state.clone()); + } + + fn maybe_connect_to_peer(&mut self, peer_id: &PeerId, addresses: &[Multiaddr]) { + // TODO: Make this configurable and add more complex connection logic, + // e.g. if a peer is banned, we should not connect to it. + const MAX_CONNECTED_PEERS: usize = 40; + if self.peer_session_map.len() >= MAX_CONNECTED_PEERS { + debug!( + "Already connected to {} peers, skipping connection to peer {:?}", + MAX_CONNECTED_PEERS, peer_id + ); + return; + } + if let Some(session) = self.get_peer_session(peer_id) { + debug!( + "Peer {:?} already connected with session id {:?}, skipping connection", + peer_id, session + ); + return; + } + for addr in addresses { + self.network + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::ConnectPeer(addr.clone()), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + } + } + async fn maybe_sync_network_graph(&mut self, peer_id: &PeerId) { if let NetworkSyncStatus::Running(state) = &mut self.sync_status { if let Some(_) = state @@ -3341,7 +3440,8 @@ pub struct NetworkActorStartArguments { #[rasync_trait] impl Actor for NetworkActor where - S: ChannelActorStateStore + S: NetworkActorStateStore + + ChannelActorStateStore + NetworkGraphStateStore + InvoiceStore + Clone @@ -3431,16 +3531,33 @@ where let mut graph = self.network_graph.write().await; - let peers_to_sync_network_graph = graph - .get_peers_to_sync_network_graph() - .into_iter() - .map(|(a, b)| (a.clone(), b.clone())) - .collect(); + let persistent_state = self + .store + .get_network_actor_state(&my_peer_id) + .unwrap_or_default(); + let mut peers_to_connect = persistent_state.get_peers_to_connect(); + let bootnodes = config.bootnode_addrs.clone(); + for bootnode in &bootnodes { + let addr = Multiaddr::from_str(&bootnode).expect("valid bootnode"); + let peer_id = extract_peer_id(&addr).expect("valid peer id"); + // If we have already selected the boot node as a peer to connect, then + // add this address to the list of addresses to connect to. + // Otherwise, create a new list out of this address. + peers_to_connect + .entry(peer_id.clone()) + .or_insert_with(Vec::new) + .push(addr); + } + + // Don't sync graph with bootnodes, as that may overload the bootnodes. + // Instead select a few peers from the persistent state to sync graph. + let peers_to_sync_graph = persistent_state.get_peers_to_sync_graph(); + let height = graph.get_best_height(); let last_update = graph.get_last_update_timestamp(); debug!( "Trying to sync network graph with peers {:?} with height {} and last update {:?}", - &peers_to_sync_network_graph, &height, &last_update + &peers_to_sync_graph, &height, &last_update ); let chain_actor = self.chain_actor.clone(); @@ -3452,10 +3569,15 @@ where height, current_block_number, last_update, - peers_to_sync_network_graph, + peers_to_sync_graph + .into_iter() + .map(|(peer_id, addrs)| (peer_id, addrs.into_iter().next().unwrap())) + .collect(), ); + let mut state = NetworkActorState { store: self.store.clone(), + persistent_state, node_name: config.announced_node_name, peer_id: my_peer_id, announced_addrs, @@ -3494,16 +3616,12 @@ where let node_announcement = state.get_or_create_new_node_announcement_message(); graph.process_node_announcement(node_announcement); - // load the connected peers from the network graph - let peers = graph.get_connected_peers(); - // TODO: we need to bootstrap the network if no peers are connected. - if peers.is_empty() { - warn!("No connected peers found in the network graph"); - } - for (_peer_id, addr) in peers { - myself.send_message(NetworkActorMessage::new_command( - NetworkActorCommand::ConnectPeer(addr.clone()), - ))?; + for (_peer_id, addrs) in peers_to_connect { + for addr in addrs { + myself.send_message(NetworkActorMessage::new_command( + NetworkActorCommand::ConnectPeer(addr), + ))?; + } } let announce_node_interval_seconds = config.announce_node_interval_seconds(); @@ -3547,6 +3665,8 @@ where if let Err(err) = state.control.close().await { error!("Failed to close tentacle service: {}", err); } + debug!("Saving network actor state for {:?}", state.peer_id); + state.persist_state(); debug!("Network service for {:?} shutdown", state.peer_id); // The event receiver may have been closed already. // We ignore the error here. @@ -3695,7 +3815,14 @@ pub(crate) fn emit_service_event( } pub async fn start_network< - S: ChannelActorStateStore + NetworkGraphStateStore + InvoiceStore + Clone + Send + Sync + 'static, + S: NetworkActorStateStore + + ChannelActorStateStore + + NetworkGraphStateStore + + InvoiceStore + + Clone + + Send + + Sync + + 'static, >( config: FiberConfig, chain_actor: ActorRef, diff --git a/src/fiber/tests/graph.rs b/src/fiber/tests/graph.rs index ce9fd6d4..9d784dac 100644 --- a/src/fiber/tests/graph.rs +++ b/src/fiber/tests/graph.rs @@ -11,17 +11,8 @@ use ckb_types::{ prelude::Entity, }; use secp256k1::{PublicKey, SecretKey, XOnlyPublicKey}; -use tentacle::{multiaddr::Multiaddr, secio::PeerId}; -use super::test_utils::{generate_keypair, generate_pubkey}; - -fn generate_keys(num: usize) -> Vec { - let mut keys = vec![]; - for _ in 0..num { - keys.push(generate_pubkey()); - } - keys -} +use super::test_utils::generate_keypair; fn generate_key_pairs(num: usize) -> Vec<(SecretKey, PublicKey)> { let mut keys = vec![]; @@ -193,41 +184,6 @@ impl MockNetworkGraph { } } -#[test] -fn test_graph_connected_peers() { - let temp_path = tempfile::tempdir().unwrap(); - let store = Store::new(temp_path.path()); - let keys = generate_keys(1); - let public_key1 = keys[0]; - let mut network_graph = NetworkGraph::new(store, public_key1.into()); - - let peer_id = PeerId::random(); - let address: Multiaddr = "/ip4/127.0.0.1/tcp/10000".parse().unwrap(); - network_graph.add_connected_peer(&peer_id, address.clone()); - - let connected_peers = network_graph.get_connected_peers(); - assert_eq!(connected_peers.len(), 1); - assert_eq!(connected_peers[0], (&peer_id, &address)); - - network_graph.reset(); - let connected_peers = network_graph.get_connected_peers(); - assert_eq!(connected_peers.len(), 0); - - // load from db - network_graph.load_from_store(); - let connected_peers = network_graph.get_connected_peers(); - assert_eq!(connected_peers.len(), 1); - assert_eq!(connected_peers[0], (&peer_id, &address)); - - network_graph.remove_connected_peer(&peer_id); - let connected_peers = network_graph.get_connected_peers(); - assert_eq!(connected_peers.len(), 0); - - network_graph.load_from_store(); - let connected_peers = network_graph.get_connected_peers(); - assert_eq!(connected_peers.len(), 0); -} - #[test] fn test_graph_channel_info() { let mut mock_network = MockNetworkGraph::new(1); diff --git a/src/fiber/tests/test_utils.rs b/src/fiber/tests/test_utils.rs index f123d1a4..63383fed 100644 --- a/src/fiber/tests/test_utils.rs +++ b/src/fiber/tests/test_utils.rs @@ -1,6 +1,3 @@ -use crate::fiber::graph::{ChannelInfo, NetworkGraph, NodeInfo}; -use crate::fiber::types::Pubkey; -use crate::invoice::{CkbInvoice, InvoiceError, InvoiceStore}; use ckb_jsonrpc_types::JsonBytes; use ckb_types::packed::OutPoint; use ckb_types::{core::TransactionView, packed::Byte32}; @@ -17,7 +14,6 @@ use std::{ time::Duration, }; use tempfile::TempDir as OldTempDir; -use tentacle::multiaddr::Multiaddr; use tentacle::{multiaddr::MultiAddr, secio::PeerId}; use tokio::sync::RwLock as TokioRwLock; use tokio::{ @@ -30,19 +26,20 @@ use crate::{ actors::{RootActor, RootActorMessage}, ckb::tests::test_utils::{submit_tx, trace_tx, trace_tx_hash, MockChainActor}, ckb::CkbChainMessage, + fiber::channel::{ChannelActorState, ChannelActorStateStore, ChannelState}, + fiber::graph::NetworkGraphStateStore, + fiber::graph::PaymentSession, + fiber::graph::{ChannelInfo, NetworkGraph, NodeInfo}, fiber::network::NetworkActorStartArguments, + fiber::network::{NetworkActor, NetworkActorCommand, NetworkActorMessage}, + fiber::network::{NetworkActorStateStore, PersistentNetworkActorState}, + fiber::types::Hash256, + fiber::types::Pubkey, + invoice::{CkbInvoice, InvoiceError, InvoiceStore}, tasks::{new_tokio_cancellation_token, new_tokio_task_tracker}, FiberConfig, NetworkServiceEvent, }; -use crate::fiber::graph::NetworkGraphStateStore; -use crate::fiber::graph::PaymentSession; -use crate::fiber::{ - channel::{ChannelActorState, ChannelActorStateStore, ChannelState}, - types::Hash256, - NetworkActor, NetworkActorCommand, NetworkActorMessage, -}; - static RETAIN_VAR: &str = "TEST_TEMP_RETAIN"; #[derive(Debug)] @@ -465,15 +462,28 @@ impl NetworkNode { #[derive(Clone, Default)] pub struct MemoryStore { + network_actor_sate_map: Arc>>, channel_actor_state_map: Arc>>, channels_map: Arc>>, - pub nodes_map: Arc>>, - connected_peer_addresses: Arc>>, + nodes_map: Arc>>, payment_sessions: Arc>>, invoice_store: Arc>>, invoice_hash_to_preimage: Arc>>, } +impl NetworkActorStateStore for MemoryStore { + fn get_network_actor_state(&self, id: &PeerId) -> Option { + self.network_actor_sate_map.read().unwrap().get(id).cloned() + } + + fn insert_network_actor_state(&self, id: &PeerId, state: PersistentNetworkActorState) { + self.network_actor_sate_map + .write() + .unwrap() + .insert(id.clone(), state); + } +} + impl NetworkGraphStateStore for MemoryStore { fn get_channels(&self, outpoint: Option) -> Vec { if let Some(outpoint) = outpoint { @@ -538,38 +548,6 @@ impl NetworkGraphStateStore for MemoryStore { .insert(node.node_id.clone(), node); } - fn insert_connected_peer(&self, peer_id: PeerId, multiaddr: Multiaddr) { - self.connected_peer_addresses - .write() - .unwrap() - .insert(peer_id, multiaddr); - } - - fn get_connected_peer(&self, peer_id: Option) -> Vec<(PeerId, Multiaddr)> { - if let Some(peer_id) = peer_id { - let mut res = vec![]; - - if let Some(addr) = self.connected_peer_addresses.read().unwrap().get(&peer_id) { - res.push((peer_id, addr.clone())); - } - res - } else { - self.connected_peer_addresses - .read() - .unwrap() - .iter() - .map(|(peer_id, addr)| (peer_id.clone(), addr.clone())) - .collect() - } - } - - fn remove_connected_peer(&self, peer_id: &PeerId) { - self.connected_peer_addresses - .write() - .unwrap() - .remove(peer_id); - } - fn get_payment_session(&self, id: Hash256) -> Option { self.payment_sessions.read().unwrap().get(&id).cloned() } diff --git a/src/fiber/types.rs b/src/fiber/types.rs index 4009e37d..8b49f39a 100644 --- a/src/fiber/types.rs +++ b/src/fiber/types.rs @@ -1494,6 +1494,10 @@ impl NodeAnnouncement { }; deterministically_hash(&unsigned_announcement) } + + pub fn peer_id(&self) -> PeerId { + PeerId::from_public_key(&self.node_id.into()) + } } impl From for molecule_fiber::UdtCellDep { diff --git a/src/main.rs b/src/main.rs index 94d4c2e9..1d17c837 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,24 @@ -use ckb_hash::blake2b_256; -use core::default::Default; use fnn::actors::RootActor; use fnn::cch::CchMessage; -use fnn::ckb::contracts::{get_script_by_contract, init_contracts_context, Contract}; -use fnn::ckb::CkbChainActor; -use fnn::fiber::graph::NetworkGraph; -use fnn::fiber::{channel::ChannelSubscribers, NetworkActorCommand, NetworkActorMessage}; +use fnn::ckb::{ + contracts::{get_script_by_contract, init_contracts_context, Contract}, + CkbChainActor, +}; +use fnn::fiber::{channel::ChannelSubscribers, graph::NetworkGraph}; use fnn::store::Store; use fnn::tasks::{ cancel_tasks_and_wait_for_completion, new_tokio_cancellation_token, new_tokio_task_tracker, }; use fnn::watchtower::{WatchtowerActor, WatchtowerMessage}; use fnn::{start_cch, start_network, start_rpc, Config}; -use ractor::Actor; -use secp256k1::Secp256k1; -use std::str::FromStr; + +use core::default::Default; use std::sync::Arc; use std::time::Duration; -use tentacle::multiaddr::Multiaddr; + +use ckb_hash::blake2b_256; +use ractor::Actor; +use secp256k1::Secp256k1; use tokio::sync::{mpsc, RwLock}; use tokio::{select, signal}; use tracing::{debug, error, info, info_span, trace}; @@ -85,8 +86,6 @@ pub async fn main() { const CHANNEL_SIZE: usize = 4000; let (event_sender, mut event_receiver) = mpsc::channel(CHANNEL_SIZE); - let bootnodes = fiber_config.bootnode_addrs.clone(); - let network_graph = Arc::new(RwLock::new(NetworkGraph::new( store.clone(), node_public_key.clone().into(), @@ -112,14 +111,6 @@ pub async fn main() { ) .await; - for bootnode in bootnodes { - let addr = Multiaddr::from_str(&bootnode).expect("valid bootnode"); - let command = NetworkActorCommand::ConnectPeer(addr); - network_actor - .send_message(NetworkActorMessage::new_command(command)) - .expect("ckb actor alive") - } - let watchtower_actor = Actor::spawn_linked( Some("watchtower".to_string()), WatchtowerActor::new(store.clone()), diff --git a/src/store.rs b/src/store.rs index 4ef45fc6..c465f57d 100644 --- a/src/store.rs +++ b/src/store.rs @@ -2,6 +2,7 @@ use crate::{ fiber::{ channel::{ChannelActorState, ChannelActorStateStore, ChannelState}, graph::{ChannelInfo, NetworkGraphStateStore, NodeInfo, PaymentSession}, + network::{NetworkActorStateStore, PersistentNetworkActorState}, types::{Hash256, Pubkey}, }, invoice::{CkbInvoice, InvoiceError, InvoiceStore}, @@ -13,7 +14,7 @@ use ckb_types::prelude::Entity; use rocksdb::{prelude::*, DBIterator, Direction, IteratorMode, WriteBatch, DB}; use serde_json; use std::{path::Path, sync::Arc}; -use tentacle::{multiaddr::Multiaddr, secio::PeerId}; +use tentacle::secio::PeerId; #[derive(Clone)] pub struct Store { @@ -157,18 +158,19 @@ impl Batch { serde_json::to_vec(&node).expect("serialize NodeInfo should be OK"), ); } - KeyValue::PeerIdMultiAddr(peer_id, multiaddr) => { - let key = [&[PEER_ID_MULTIADDR_PREFIX], peer_id.as_bytes()].concat(); + KeyValue::WatchtowerChannel(channel_id, channel_data) => { + let key = [&[WATCHTOWER_CHANNEL_PREFIX], channel_id.as_ref()].concat(); self.put( key, - serde_json::to_vec(&multiaddr).expect("serialize Multiaddr should be OK"), + serde_json::to_vec(&channel_data).expect("serialize ChannelData should be OK"), ); } - KeyValue::WatchtowerChannel(channel_id, channel_data) => { - let key = [&[WATCHTOWER_CHANNEL_PREFIX], channel_id.as_ref()].concat(); + KeyValue::NetworkActorState(peer_id, persistent_network_actor_state) => { + let key = [&[PEER_ID_NETWORK_ACTOR_STATE_PREFIX], peer_id.as_bytes()].concat(); self.put( key, - serde_json::to_vec(&channel_data).expect("serialize ChannelData should be OK"), + serde_json::to_vec(&persistent_network_actor_state) + .expect("serialize PersistentNetworkActorState should be OK"), ); } } @@ -188,24 +190,26 @@ impl Batch { } /// -/// +--------------+--------------------+--------------------------+ -/// | KeyPrefix:: | Key:: | Value:: | -/// +--------------+--------------------+--------------------------+ -/// | 0 | Hash256 | ChannelActorState | -/// | 32 | Hash256 | CkbInvoice | -/// | 64 | PeerId | Hash256 | ChannelState | -/// | 96 | ChannelId | ChannelInfo | -/// | 97 | Block | Index | ChannelId | -/// | 98 | Timestamp | ChannelId | -/// | 128 | NodeId | NodeInfo | -/// | 129 | Timestamp | NodeId | -/// | 160 | PeerId | MultiAddr | -/// | 192 | Hash256 | PaymentSession | -/// | 224 | Hash256 | ChannelData | -/// +--------------+--------------------+--------------------------+ +/// +--------------+--------------------+-----------------------------+ +/// | KeyPrefix:: | Key:: | Value:: | +/// +--------------+--------------------+-----------------------------+ +/// | 0 | Hash256 | ChannelActorState | +/// | 16 | PeerId | PersistentNetworkActorState | +/// | 32 | Hash256 | CkbInvoice | +/// | 64 | PeerId | Hash256 | ChannelState | +/// | 96 | ChannelId | ChannelInfo | +/// | 97 | Block | Index | ChannelId | +/// | 98 | Timestamp | ChannelId | +/// | 128 | NodeId | NodeInfo | +/// | 129 | Timestamp | NodeId | +/// | 160 | PeerId | MultiAddr | +/// | 192 | Hash256 | PaymentSession | +/// | 224 | Hash256 | ChannelData | +/// +--------------+--------------------+-----------------------------+ /// const CHANNEL_ACTOR_STATE_PREFIX: u8 = 0; +const PEER_ID_NETWORK_ACTOR_STATE_PREFIX: u8 = 16; const CKB_INVOICE_PREFIX: u8 = 32; const CKB_INVOICE_PREIMAGE_PREFIX: u8 = 33; const PEER_ID_CHANNEL_ID_PREFIX: u8 = 64; @@ -214,7 +218,6 @@ const CHANNEL_ANNOUNCEMENT_INDEX_PREFIX: u8 = 97; const CHANNEL_UPDATE_INDEX_PREFIX: u8 = 98; pub(crate) const NODE_INFO_PREFIX: u8 = 128; const NODE_ANNOUNCEMENT_INDEX_PREFIX: u8 = 129; -const PEER_ID_MULTIADDR_PREFIX: u8 = 160; const PAYMENT_SESSION_PREFIX: u8 = 192; const WATCHTOWER_CHANNEL_PREFIX: u8 = 224; @@ -223,11 +226,33 @@ enum KeyValue { CkbInvoice(Hash256, CkbInvoice), CkbInvoicePreimage(Hash256, Hash256), PeerIdChannelId((PeerId, Hash256), ChannelState), - PeerIdMultiAddr(PeerId, Multiaddr), NodeInfo(Pubkey, NodeInfo), ChannelInfo(OutPoint, ChannelInfo), WatchtowerChannel(Hash256, ChannelData), PaymentSession(Hash256, PaymentSession), + NetworkActorState(PeerId, PersistentNetworkActorState), +} + +impl NetworkActorStateStore for Store { + fn get_network_actor_state(&self, id: &PeerId) -> Option { + let mut key = Vec::with_capacity(33); + key.push(PEER_ID_NETWORK_ACTOR_STATE_PREFIX); + key.extend_from_slice(id.as_bytes()); + let iter = self + .db + .prefix_iterator(key.as_ref()) + .find(|(col_key, _)| col_key.starts_with(&key)); + iter.map(|(_key, value)| { + serde_json::from_slice(value.as_ref()) + .expect("deserialize PersistentNetworkActorState should be OK") + }) + } + + fn insert_network_actor_state(&self, id: &PeerId, state: PersistentNetworkActorState) { + let mut batch = self.batch(); + batch.put_kv(KeyValue::NetworkActorState(id.clone(), state)); + batch.commit(); + } } impl ChannelActorStateStore for Store { @@ -446,30 +471,6 @@ impl NetworkGraphStateStore for Store { (nodes, JsonBytes::from_bytes(last_key.into())) } - fn get_connected_peer(&self, peer_id: Option) -> Vec<(PeerId, Multiaddr)> { - let key = match peer_id { - Some(peer_id) => { - let mut key = Vec::with_capacity(33); - key.push(PEER_ID_MULTIADDR_PREFIX); - key.extend_from_slice(peer_id.as_bytes()); - key - } - None => vec![PEER_ID_MULTIADDR_PREFIX], - }; - let iter = self - .db - .prefix_iterator(key.as_ref()) - .take_while(|(col_key, _)| col_key.starts_with(&key)); - iter.map(|(key, value)| { - let peer_id = - PeerId::from_bytes(key[1..].into()).expect("deserialize peer id should be OK"); - let addr = - serde_json::from_slice(value.as_ref()).expect("deserialize Multiaddr should be OK"); - (peer_id, addr) - }) - .collect() - } - fn insert_channel(&self, channel: ChannelInfo) { let mut batch = self.batch(); batch.put_kv(KeyValue::ChannelInfo(channel.out_point(), channel.clone())); @@ -482,23 +483,6 @@ impl NetworkGraphStateStore for Store { batch.commit(); } - fn insert_connected_peer(&self, peer_id: PeerId, multiaddr: Multiaddr) { - let mut batch = self.batch(); - batch.put_kv(KeyValue::PeerIdMultiAddr(peer_id, multiaddr)); - batch.commit(); - } - - fn remove_connected_peer(&self, peer_id: &PeerId) { - let prefix = [&[PEER_ID_MULTIADDR_PREFIX], peer_id.as_bytes()].concat(); - let iter = self - .db - .prefix_iterator(prefix.as_ref()) - .take_while(|(key, _)| key.starts_with(&prefix)); - for (key, _) in iter { - self.db.delete(key).expect("delete should be OK"); - } - } - fn get_payment_session(&self, payment_hash: Hash256) -> Option { let prefix = [&[PAYMENT_SESSION_PREFIX], payment_hash.as_ref()].concat(); self.get(prefix).map(|v| { From ddf07942b45342057b320f8e4b29e7fb6dfaf311 Mon Sep 17 00:00:00 2001 From: YI Date: Thu, 17 Oct 2024 19:27:20 +0800 Subject: [PATCH 02/11] Do scheduled maintenance for connected peers in network --- src/fiber/network.rs | 116 +++++++++++++++++++++++++------------------ 1 file changed, 69 insertions(+), 47 deletions(-) diff --git a/src/fiber/network.rs b/src/fiber/network.rs index 6d8fa3d6..92571842 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -99,6 +99,13 @@ const ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW: &str = const ASSUME_NETWORK_MYSELF_ALIVE: &str = "network actor myself alive"; +// This is the approximate number of peers that we need to keep connection to to make the +// network operating normally. +const NUM_PEERS_IN_CONNECTION: usize = 40; + +// The duration for which we will try to maintain the number of peers in connection. +const MAINTAINING_CONNECTED_PEERS_INTERVAL: Duration = Duration::from_secs(3600); + pub(crate) fn get_chain_hash() -> Hash256 { Default::default() } @@ -155,6 +162,7 @@ pub enum NetworkActorCommand { /// Network commands ConnectPeer(Multiaddr), DisconnectPeer(PeerId), + MaintainConnectedPeers(), // For internal use and debugging only. Most of the messages requires some // changes to local state. Even if we can send a message to a peer, some // part of the local state is not changed. @@ -1229,6 +1237,41 @@ where } } + NetworkActorCommand::MaintainConnectedPeers() => { + let num_connected_peers = state.peer_session_map.len(); + if num_connected_peers >= NUM_PEERS_IN_CONNECTION { + debug!( + "Already connected to {} peers, skipping connecting to more peers", + num_connected_peers, + ); + return Ok(()); + } + // Even though we have connected to some peers, we still try to select + // NUM_PEERS_IN_CONNECTION, because unless we're extremely lucky, we will connect + // to much less than 2 * NUM_PEERS_IN_CONNECTION peers. That should still be OK, + // because we will not be connecting to more peers in the next run. + let peers_to_connect = state + .state_to_be_persisted + .sample_n_peers_to_connect(NUM_PEERS_IN_CONNECTION); + for (peer_id, addresses) in peers_to_connect { + if let Some(session) = state.get_peer_session(&peer_id) { + debug!( + "Randomly selected peer {:?} already connected with session id {:?}, skipping connection", + peer_id, session + ); + continue; + } + for addr in addresses { + state + .network + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::ConnectPeer(addr.clone()), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + } + } + } + NetworkActorCommand::OpenChannel(open_channel, reply) => { match state.create_outbound_channel(open_channel).await { Ok((_, channel_id)) => { @@ -2188,7 +2231,7 @@ enum RequestState { pub struct NetworkActorState { store: S, - persistent_state: PersistentNetworkActorState, + state_to_be_persisted: PersistentNetworkActorState, // The name of the node to be announced to the network, may be empty. node_name: Option, peer_id: PeerId, @@ -2301,11 +2344,10 @@ impl PersistentNetworkActorState { .collect() } - fn get_peers_to_connect(&self) -> HashMap> { - const NUM_PEERS_TO_CONNECT: usize = 20; + fn sample_n_peers_to_connect(&self, n: usize) -> HashMap> { self.peer_store .iter() - .take(NUM_PEERS_TO_CONNECT) + .take(n) .map(|(id, addr)| (id.clone(), addr.clone())) .collect() } @@ -2997,13 +3039,12 @@ where } fn on_node_announcement(&mut self, peer_id: PeerId, addresses: Vec) { - self.maybe_connect_to_peer(&peer_id, &addresses); self.save_peer_addresses(peer_id, addresses); } fn save_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec) { if self - .persistent_state + .state_to_be_persisted .save_peer_addresses(peer_id, addresses) { self.persist_state(); @@ -3012,34 +3053,7 @@ where fn persist_state(&self) { self.store - .insert_network_actor_state(&self.peer_id, self.persistent_state.clone()); - } - - fn maybe_connect_to_peer(&mut self, peer_id: &PeerId, addresses: &[Multiaddr]) { - // TODO: Make this configurable and add more complex connection logic, - // e.g. if a peer is banned, we should not connect to it. - const MAX_CONNECTED_PEERS: usize = 40; - if self.peer_session_map.len() >= MAX_CONNECTED_PEERS { - debug!( - "Already connected to {} peers, skipping connection to peer {:?}", - MAX_CONNECTED_PEERS, peer_id - ); - return; - } - if let Some(session) = self.get_peer_session(peer_id) { - debug!( - "Peer {:?} already connected with session id {:?}, skipping connection", - peer_id, session - ); - return; - } - for addr in addresses { - self.network - .send_message(NetworkActorMessage::new_command( - NetworkActorCommand::ConnectPeer(addr.clone()), - )) - .expect(ASSUME_NETWORK_MYSELF_ALIVE); - } + .insert_network_actor_state(&self.peer_id, self.state_to_be_persisted.clone()); } async fn maybe_sync_network_graph(&mut self, peer_id: &PeerId) { @@ -3531,11 +3545,12 @@ where let mut graph = self.network_graph.write().await; - let persistent_state = self + let mut state_to_be_persisted = self .store .get_network_actor_state(&my_peer_id) .unwrap_or_default(); - let mut peers_to_connect = persistent_state.get_peers_to_connect(); + let mut peers_to_connect = + state_to_be_persisted.sample_n_peers_to_connect(NUM_PEERS_IN_CONNECTION); let bootnodes = config.bootnode_addrs.clone(); for bootnode in &bootnodes { let addr = Multiaddr::from_str(&bootnode).expect("valid bootnode"); @@ -3544,14 +3559,23 @@ where // add this address to the list of addresses to connect to. // Otherwise, create a new list out of this address. peers_to_connect - .entry(peer_id.clone()) + .entry(peer_id) .or_insert_with(Vec::new) .push(addr); } + for (peer_id, addrs) in peers_to_connect { + state_to_be_persisted.save_peer_addresses(peer_id, addrs.clone()); + for addr in addrs { + myself.send_message(NetworkActorMessage::new_command( + NetworkActorCommand::ConnectPeer(addr), + ))?; + } + } + // Don't sync graph with bootnodes, as that may overload the bootnodes. // Instead select a few peers from the persistent state to sync graph. - let peers_to_sync_graph = persistent_state.get_peers_to_sync_graph(); + let peers_to_sync_graph = state_to_be_persisted.get_peers_to_sync_graph(); let height = graph.get_best_height(); let last_update = graph.get_last_update_timestamp(); @@ -3577,7 +3601,7 @@ where let mut state = NetworkActorState { store: self.store.clone(), - persistent_state, + state_to_be_persisted, node_name: config.announced_node_name, peer_id: my_peer_id, announced_addrs, @@ -3616,14 +3640,6 @@ where let node_announcement = state.get_or_create_new_node_announcement_message(); graph.process_node_announcement(node_announcement); - for (_peer_id, addrs) in peers_to_connect { - for addr in addrs { - myself.send_message(NetworkActorMessage::new_command( - NetworkActorCommand::ConnectPeer(addr), - ))?; - } - } - let announce_node_interval_seconds = config.announce_node_interval_seconds(); if announce_node_interval_seconds > 0 { myself.send_interval(Duration::from_secs(announce_node_interval_seconds), || { @@ -3633,6 +3649,12 @@ where }); } + myself.send_interval(MAINTAINING_CONNECTED_PEERS_INTERVAL, || { + NetworkActorMessage::new_command(NetworkActorCommand::MaintainConnectedPeers()) + }); + + // Save bootnodes to the network actor state. + state.persist_state(); Ok(state) } From 034eed469e88974dd0969f3a0183806e2572bb9f Mon Sep 17 00:00:00 2001 From: YI Date: Thu, 17 Oct 2024 20:11:36 +0800 Subject: [PATCH 03/11] Add some unit tests for persistent network state --- src/fiber/network.rs | 6 ++--- src/fiber/tests/network.rs | 54 +++++++++++++++++++++++++++++++++++--- 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/src/fiber/network.rs b/src/fiber/network.rs index 92571842..36cef3cf 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -2318,7 +2318,7 @@ impl PersistentNetworkActorState { /// Save peer addresses to the peer store. If the peer addresses are updated, /// return true, otherwise return false. - fn save_peer_addresses(&mut self, peer_id: PeerId, addr: Vec) -> bool { + pub(crate) fn save_peer_addresses(&mut self, peer_id: PeerId, addr: Vec) -> bool { match self.peer_store.entry(peer_id) { Entry::Occupied(mut entry) => { if entry.get() == &addr { @@ -2335,7 +2335,7 @@ impl PersistentNetworkActorState { } } - fn get_peers_to_sync_graph(&self) -> HashMap> { + pub(crate) fn get_peers_to_sync_graph(&self) -> HashMap> { const NUM_PEERS_TO_SYNC_NETWORK_GRAPH: usize = 5; self.peer_store .iter() @@ -2344,7 +2344,7 @@ impl PersistentNetworkActorState { .collect() } - fn sample_n_peers_to_connect(&self, n: usize) -> HashMap> { + pub(crate) fn sample_n_peers_to_connect(&self, n: usize) -> HashMap> { self.peer_store .iter() .take(n) diff --git a/src/fiber/tests/network.rs b/src/fiber/tests/network.rs index 236a0c10..0bfcb835 100644 --- a/src/fiber/tests/network.rs +++ b/src/fiber/tests/network.rs @@ -2,7 +2,8 @@ use super::test_utils::{init_tracing, NetworkNode}; use crate::{ fiber::{ graph::{ChannelInfo, NetworkGraphStateStore}, - network::get_chain_hash, + network::{get_chain_hash, NetworkActorStateStore}, + tests::test_utils::NetworkNodeConfigBuilder, types::{ ChannelAnnouncement, ChannelUpdate, FiberBroadcastMessage, FiberMessage, NodeAnnouncement, Privkey, Pubkey, @@ -21,8 +22,11 @@ use ckb_types::{ packed::OutPoint, prelude::{Builder, Entity, Pack}, }; -use std::str::FromStr; -use tentacle::{multiaddr::MultiAddr, secio::PeerId}; +use std::{borrow::Cow, str::FromStr}; +use tentacle::{ + multiaddr::{MultiAddr, Protocol}, + secio::PeerId, +}; fn get_test_priv_key() -> Privkey { Privkey::from_slice(&[42u8; 32]) @@ -37,6 +41,21 @@ fn get_test_peer_id() -> PeerId { PeerId::from_public_key(&pub_key) } +fn get_fake_peer_id_and_address() -> (PeerId, MultiAddr) { + let peer_id = PeerId::random(); + let mut address = MultiAddr::from_str(&format!( + "/ip4/{}.{}.{}.{}/tcp/{}", + rand::random::(), + rand::random::(), + rand::random::(), + rand::random::(), + rand::random::() + )) + .expect("valid multiaddr"); + address.push(Protocol::P2P(Cow::Owned(peer_id.clone().into_bytes()))); + (peer_id, address) +} + fn create_fake_channel_announcement_mesage( priv_key: Privkey, capacity: u64, @@ -520,3 +539,32 @@ async fn test_sync_node_announcement_after_restart() { let node = node2.store.get_nodes(Some(test_pub_key)); assert!(!node.is_empty()); } + +#[tokio::test] +async fn test_persisting_network_state() { + let mut node = NetworkNode::new().await; + let state = node.store.clone(); + let peer_id = node.peer_id.clone(); + node.stop().await; + assert!(state.get_network_actor_state(&peer_id).is_some()) +} + +#[tokio::test] +async fn test_persisting_bootnode() { + let (boot_peer_id, address) = get_fake_peer_id_and_address(); + let address_string = format!("{}", &address); + + let mut node = NetworkNode::new_with_config( + NetworkNodeConfigBuilder::new() + .fiber_config_updater(move |config| config.bootnode_addrs = vec![address_string]) + .build(), + ) + .await; + let state = node.store.clone(); + let peer_id = node.peer_id.clone(); + node.stop().await; + + let state = state.get_network_actor_state(&peer_id).unwrap(); + let peers = state.sample_n_peers_to_connect(1); + assert_eq!(peers.get(&boot_peer_id), Some(&vec![address])); +} From 41e099a9dd78e1a5dd9d60d77789f6eb4e1796e2 Mon Sep 17 00:00:00 2001 From: YI Date: Fri, 18 Oct 2024 10:53:36 +0800 Subject: [PATCH 04/11] Only connect to peers on maintaining connections --- src/fiber/network.rs | 76 +++++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 37 deletions(-) diff --git a/src/fiber/network.rs b/src/fiber/network.rs index 36cef3cf..184dc90c 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -99,12 +99,12 @@ const ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW: &str = const ASSUME_NETWORK_MYSELF_ALIVE: &str = "network actor myself alive"; -// This is the approximate number of peers that we need to keep connection to to make the +// This is the default approximate number of peers that we need to keep connection to to make the // network operating normally. -const NUM_PEERS_IN_CONNECTION: usize = 40; +const NUM_PEER_CONNECTIONS: usize = 40; // The duration for which we will try to maintain the number of peers in connection. -const MAINTAINING_CONNECTED_PEERS_INTERVAL: Duration = Duration::from_secs(3600); +const MAINTAINING_CONNECTIONS_INTERVAL: Duration = Duration::from_secs(3600); pub(crate) fn get_chain_hash() -> Hash256 { Default::default() @@ -162,7 +162,8 @@ pub enum NetworkActorCommand { /// Network commands ConnectPeer(Multiaddr), DisconnectPeer(PeerId), - MaintainConnectedPeers(), + // We need to maintain a certain number of peers connections to keep the network running. + MaintainConnections(usize), // For internal use and debugging only. Most of the messages requires some // changes to local state. Even if we can send a message to a peer, some // part of the local state is not changed. @@ -1237,22 +1238,18 @@ where } } - NetworkActorCommand::MaintainConnectedPeers() => { + NetworkActorCommand::MaintainConnections(num_peers) => { let num_connected_peers = state.peer_session_map.len(); - if num_connected_peers >= NUM_PEERS_IN_CONNECTION { + if num_connected_peers >= num_peers { debug!( "Already connected to {} peers, skipping connecting to more peers", num_connected_peers, ); return Ok(()); } - // Even though we have connected to some peers, we still try to select - // NUM_PEERS_IN_CONNECTION, because unless we're extremely lucky, we will connect - // to much less than 2 * NUM_PEERS_IN_CONNECTION peers. That should still be OK, - // because we will not be connecting to more peers in the next run. let peers_to_connect = state .state_to_be_persisted - .sample_n_peers_to_connect(NUM_PEERS_IN_CONNECTION); + .sample_n_peers_to_connect(num_peers - num_connected_peers); for (peer_id, addresses) in peers_to_connect { if let Some(session) = state.get_peer_session(&peer_id) { debug!( @@ -2316,8 +2313,27 @@ impl PersistentNetworkActorState { Default::default() } + /// Save a single peer address to the peer store. If this address for the peer does not exist, + /// then return false, otherwise return true. + pub(crate) fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) -> bool { + match self.peer_store.entry(peer_id) { + Entry::Occupied(mut entry) => { + if entry.get().contains(&addr) { + false + } else { + entry.get_mut().push(addr); + true + } + } + Entry::Vacant(entry) => { + entry.insert(vec![addr]); + true + } + } + } + /// Save peer addresses to the peer store. If the peer addresses are updated, - /// return true, otherwise return false. + /// return true, otherwise return false. This method will NOT keep the old addresses. pub(crate) fn save_peer_addresses(&mut self, peer_id: PeerId, addr: Vec) -> bool { match self.peer_store.entry(peer_id) { Entry::Occupied(mut entry) => { @@ -3549,28 +3565,11 @@ where .store .get_network_actor_state(&my_peer_id) .unwrap_or_default(); - let mut peers_to_connect = - state_to_be_persisted.sample_n_peers_to_connect(NUM_PEERS_IN_CONNECTION); - let bootnodes = config.bootnode_addrs.clone(); - for bootnode in &bootnodes { - let addr = Multiaddr::from_str(&bootnode).expect("valid bootnode"); - let peer_id = extract_peer_id(&addr).expect("valid peer id"); - // If we have already selected the boot node as a peer to connect, then - // add this address to the list of addresses to connect to. - // Otherwise, create a new list out of this address. - peers_to_connect - .entry(peer_id) - .or_insert_with(Vec::new) - .push(addr); - } - for (peer_id, addrs) in peers_to_connect { - state_to_be_persisted.save_peer_addresses(peer_id, addrs.clone()); - for addr in addrs { - myself.send_message(NetworkActorMessage::new_command( - NetworkActorCommand::ConnectPeer(addr), - ))?; - } + for bootnode in &config.bootnode_addrs { + let addr = Multiaddr::from_str(bootnode.as_str()).expect("valid bootnode"); + let peer_id = extract_peer_id(&addr).expect("valid peer id"); + state_to_be_persisted.add_peer_address(peer_id, addr); } // Don't sync graph with bootnodes, as that may overload the bootnodes. @@ -3649,12 +3648,15 @@ where }); } - myself.send_interval(MAINTAINING_CONNECTED_PEERS_INTERVAL, || { - NetworkActorMessage::new_command(NetworkActorCommand::MaintainConnectedPeers()) - }); - // Save bootnodes to the network actor state. state.persist_state(); + + myself.send_interval(MAINTAINING_CONNECTIONS_INTERVAL, || { + NetworkActorMessage::new_command(NetworkActorCommand::MaintainConnections( + NUM_PEER_CONNECTIONS, + )) + }); + Ok(state) } From adeea7ee0f7fc57efc8e3766f30b4aa43db8619f Mon Sep 17 00:00:00 2001 From: YI Date: Fri, 18 Oct 2024 11:17:40 +0800 Subject: [PATCH 05/11] Update the comment of getting peers to sync graph --- src/fiber/network.rs | 31 ++++++++----------------------- 1 file changed, 8 insertions(+), 23 deletions(-) diff --git a/src/fiber/network.rs b/src/fiber/network.rs index 184dc90c..7356ee9f 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -2085,6 +2085,11 @@ struct NetworkSyncState { // The timestamp we started syncing. starting_time: u64, // All the pinned peers that we are going to sync with. + // TODO: the intention of passing a few peer addresses to the sync status was to let the user + // select a few peers to sync network graph (these peers may have faster connection to the node). + // After some refactoring, the code below is a little bit clouded. We are currently only connecting + // to random peers. If this functionality is desired, we should make a config option for it. + // Otherwise, remove this completely. pinned_syncing_peers: Vec<(PeerId, Multiaddr)>, active_syncers: HashMap>, // Number of peers with whom we succeeded to sync. @@ -2185,13 +2190,13 @@ impl NetworkSyncStatus { starting_height: u64, ending_height: u64, starting_time: u64, - syncing_peers: Vec<(PeerId, Multiaddr)>, + pinned_syncing_peers: Vec<(PeerId, Multiaddr)>, ) -> Self { let state = NetworkSyncState { starting_height, ending_height, starting_time, - pinned_syncing_peers: syncing_peers, + pinned_syncing_peers, active_syncers: Default::default(), succeeded: 0, failed: 0, @@ -2351,15 +2356,6 @@ impl PersistentNetworkActorState { } } - pub(crate) fn get_peers_to_sync_graph(&self) -> HashMap> { - const NUM_PEERS_TO_SYNC_NETWORK_GRAPH: usize = 5; - self.peer_store - .iter() - .take(NUM_PEERS_TO_SYNC_NETWORK_GRAPH) - .map(|(id, addr)| (id.clone(), addr.clone())) - .collect() - } - pub(crate) fn sample_n_peers_to_connect(&self, n: usize) -> HashMap> { self.peer_store .iter() @@ -3572,16 +3568,8 @@ where state_to_be_persisted.add_peer_address(peer_id, addr); } - // Don't sync graph with bootnodes, as that may overload the bootnodes. - // Instead select a few peers from the persistent state to sync graph. - let peers_to_sync_graph = state_to_be_persisted.get_peers_to_sync_graph(); - let height = graph.get_best_height(); let last_update = graph.get_last_update_timestamp(); - debug!( - "Trying to sync network graph with peers {:?} with height {} and last update {:?}", - &peers_to_sync_graph, &height, &last_update - ); let chain_actor = self.chain_actor.clone(); let current_block_number = call!(chain_actor, CkbChainMessage::GetCurrentBlockNumber, ()) @@ -3592,10 +3580,7 @@ where height, current_block_number, last_update, - peers_to_sync_graph - .into_iter() - .map(|(peer_id, addrs)| (peer_id, addrs.into_iter().next().unwrap())) - .collect(), + vec![], ); let mut state = NetworkActorState { From d7ca6f36810cd93a1731aa3b9b570b985742384b Mon Sep 17 00:00:00 2001 From: YI Date: Fri, 18 Oct 2024 11:37:16 +0800 Subject: [PATCH 06/11] Add test for bootnode connection --- src/fiber/tests/network.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/fiber/tests/network.rs b/src/fiber/tests/network.rs index 0bfcb835..52040d01 100644 --- a/src/fiber/tests/network.rs +++ b/src/fiber/tests/network.rs @@ -568,3 +568,22 @@ async fn test_persisting_bootnode() { let peers = state.sample_n_peers_to_connect(1); assert_eq!(peers.get(&boot_peer_id), Some(&vec![address])); } + +#[tokio::test] +async fn test_connecting_to_bootnode() { + let boot_node = NetworkNode::new().await; + let boot_node_address = format!("{}", boot_node.get_node_address()); + let boot_node_id = &boot_node.peer_id; + + let mut node = NetworkNode::new_with_config( + NetworkNodeConfigBuilder::new() + .fiber_config_updater(move |config| config.bootnode_addrs = vec![boot_node_address]) + .build(), + ) + .await; + + node.expect_event( + |event| matches!(event, NetworkServiceEvent::PeerConnected(id, _addr) if id == boot_node_id), + ) + .await; +} From 7d84f35d975a535ec9e7afbdfe4bc5e5f10dac30 Mon Sep 17 00:00:00 2001 From: YI Date: Fri, 18 Oct 2024 11:37:54 +0800 Subject: [PATCH 07/11] Fire MaintainConnection immediately --- src/fiber/network.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/fiber/network.rs b/src/fiber/network.rs index 7356ee9f..30ce7991 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -3636,13 +3636,25 @@ where // Save bootnodes to the network actor state. state.persist_state(); + Ok(state) + } + + async fn post_start( + &self, + myself: ActorRef, + _state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + myself + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::MaintainConnections(NUM_PEER_CONNECTIONS), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); myself.send_interval(MAINTAINING_CONNECTIONS_INTERVAL, || { NetworkActorMessage::new_command(NetworkActorCommand::MaintainConnections( NUM_PEER_CONNECTIONS, )) }); - - Ok(state) + Ok(()) } async fn handle( From 9725d761b6d4a28225339c9ea11b7f48489607eb Mon Sep 17 00:00:00 2001 From: YI Date: Fri, 18 Oct 2024 19:51:53 +0800 Subject: [PATCH 08/11] Add command to save peer address to peer store --- src/fiber/network.rs | 136 +++++++++++++++++++++++++++++-------- src/fiber/tests/network.rs | 28 ++++++++ src/rpc/peer.rs | 12 +++- 3 files changed, 147 insertions(+), 29 deletions(-) diff --git a/src/fiber/network.rs b/src/fiber/network.rs index 30ce7991..e31ee8b9 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -15,6 +15,7 @@ use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use std::borrow::Cow; use std::collections::hash_map::Entry; +use std::hash::RandomState; use tentacle::utils::extract_peer_id; use std::collections::{HashMap, HashSet}; @@ -160,8 +161,12 @@ pub struct NodeInfoResponse { #[derive(Debug)] pub enum NetworkActorCommand { /// Network commands + // Connect to a peer, and optionally also save the peer to the peer store. ConnectPeer(Multiaddr), DisconnectPeer(PeerId), + // Save the address of a peer to the peer store, the address here must be a valid + // multiaddr with the peer id. + SavePeerAddress(Multiaddr), // We need to maintain a certain number of peers connections to keep the network running. MaintainConnections(usize), // For internal use and debugging only. Most of the messages requires some @@ -1199,7 +1204,6 @@ where state: &mut NetworkActorState, command: NetworkActorCommand, ) -> crate::Result<()> { - debug!("Handling command: {:?}", command); match command { NetworkActorCommand::SendFiberMessage(FiberMessageWithPeerId { peer_id, message }) => { state.send_message_to_peer(&peer_id, message).await?; @@ -1238,7 +1242,19 @@ where } } + NetworkActorCommand::SavePeerAddress(addr) => match extract_peer_id(&addr) { + Some(peer) => { + debug!("Saved peer id {:?} with address {:?}", &peer, &addr); + state.save_peer_address(peer, addr); + } + None => { + error!("Failed to save address to peer store: unable to extract peer id from address {:?}", &addr); + } + }, + NetworkActorCommand::MaintainConnections(num_peers) => { + debug!("Maintaining connections to {} peers", num_peers); + let num_connected_peers = state.peer_session_map.len(); if num_connected_peers >= num_peers { debug!( @@ -1250,6 +1266,10 @@ where let peers_to_connect = state .state_to_be_persisted .sample_n_peers_to_connect(num_peers - num_connected_peers); + debug!( + "Randomly selected peers to connect: {:?}", + &peers_to_connect + ); for (peer_id, addresses) in peers_to_connect { if let Some(session) = state.get_peer_session(&peer_id) { debug!( @@ -1815,15 +1835,6 @@ where &node_announcement ); - // TODO: bookkeeping how many nodes we have connected to. Stop connecting once we surpass a threshold. - for addr in &node_announcement.addresses { - state - .network - .send_message(NetworkActorMessage::new_command( - NetworkActorCommand::ConnectPeer(addr.clone()), - ))?; - } - // Add the node to the network graph. self.network_graph .write() @@ -1831,7 +1842,10 @@ where .process_node_announcement(node_announcement.clone()); let peer_id = node_announcement.peer_id(); - state.on_node_announcement(peer_id, node_announcement.addresses.clone()); + state.save_announced_peer_addresses( + peer_id, + node_announcement.addresses.clone(), + ); Ok(()) } _ => { @@ -2308,9 +2322,14 @@ pub struct NetworkActorState { #[serde_as] #[derive(Default, Clone, Serialize, Deserialize)] pub struct PersistentNetworkActorState { - // when we restarting a node, we will reconnect to these peers + // These addresses are announced by the peer itself to the network. + // When a new NodeAnnouncement message is received, we will overwrite the old addresses. + #[serde_as(as = "Vec<(DisplayFromStr, _)>")] + announced_peer_addresses: HashMap>, + // These addresses are saved by the user (e.g. the user sends a ConnectPeer rpc to the node), + // we will then save these addresses to the peer store. #[serde_as(as = "Vec<(DisplayFromStr, _)>")] - peer_store: HashMap>, + saved_peer_addresses: HashMap>, } impl PersistentNetworkActorState { @@ -2318,10 +2337,26 @@ impl PersistentNetworkActorState { Default::default() } + fn get_peer_addresses(&self, peer_id: &PeerId) -> HashSet { + let empty = vec![]; + self.announced_peer_addresses + .get(peer_id) + .unwrap_or(&empty) + .iter() + .chain( + self.saved_peer_addresses + .get(peer_id) + .unwrap_or(&empty) + .iter(), + ) + .map(|addr| addr.clone()) + .collect::>() + } + /// Save a single peer address to the peer store. If this address for the peer does not exist, /// then return false, otherwise return true. - pub(crate) fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) -> bool { - match self.peer_store.entry(peer_id) { + fn save_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) -> bool { + match self.saved_peer_addresses.entry(peer_id) { Entry::Occupied(mut entry) => { if entry.get().contains(&addr) { false @@ -2337,10 +2372,10 @@ impl PersistentNetworkActorState { } } - /// Save peer addresses to the peer store. If the peer addresses are updated, - /// return true, otherwise return false. This method will NOT keep the old addresses. - pub(crate) fn save_peer_addresses(&mut self, peer_id: PeerId, addr: Vec) -> bool { - match self.peer_store.entry(peer_id) { + /// Save announced peer addresses to the peer store. If the peer addresses are updated, + /// return true, otherwise return false. This method will NOT keep the old announced addresses. + fn save_announced_peer_addresses(&mut self, peer_id: PeerId, addr: Vec) -> bool { + match self.announced_peer_addresses.entry(peer_id) { Entry::Occupied(mut entry) => { if entry.get() == &addr { false @@ -2357,10 +2392,22 @@ impl PersistentNetworkActorState { } pub(crate) fn sample_n_peers_to_connect(&self, n: usize) -> HashMap> { - self.peer_store - .iter() + let nodes = self + .saved_peer_addresses + .keys() + .into_iter() + .chain(self.saved_peer_addresses.keys().into_iter()) + .collect::>(); + + nodes + .into_iter() .take(n) - .map(|(id, addr)| (id.clone(), addr.clone())) + .map(|peer_id| { + ( + peer_id.clone(), + self.get_peer_addresses(peer_id).into_iter().collect(), + ) + }) .collect() } } @@ -3050,14 +3097,30 @@ where self.maybe_tell_syncer_peer_disconnected(id); } - fn on_node_announcement(&mut self, peer_id: PeerId, addresses: Vec) { - self.save_peer_addresses(peer_id, addresses); + pub(crate) fn get_peer_addresses(&self, peer_id: &PeerId) -> HashSet { + self.state_to_be_persisted.get_peer_addresses(peer_id) + } + + pub(crate) fn save_peer_address(&mut self, peer_id: PeerId, address: Multiaddr) -> bool { + if self + .state_to_be_persisted + .save_peer_address(peer_id, address) + { + self.persist_state(); + true + } else { + false + } } - fn save_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec) { + pub(crate) fn save_announced_peer_addresses( + &mut self, + peer_id: PeerId, + addresses: Vec, + ) { if self .state_to_be_persisted - .save_peer_addresses(peer_id, addresses) + .save_announced_peer_addresses(peer_id, addresses) { self.persist_state(); } @@ -3565,7 +3628,7 @@ where for bootnode in &config.bootnode_addrs { let addr = Multiaddr::from_str(bootnode.as_str()).expect("valid bootnode"); let peer_id = extract_peer_id(&addr).expect("valid peer id"); - state_to_be_persisted.add_peer_address(peer_id, addr); + state_to_be_persisted.save_peer_address(peer_id, addr); } let height = graph.get_best_height(); @@ -3642,8 +3705,25 @@ where async fn post_start( &self, myself: ActorRef, - _state: &mut Self::State, + state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { + debug!("Trying to connect to peers with mutual channels"); + for (peer_id, channel_id, channel_state) in self.store.get_channel_states(None) { + let addresses = state.get_peer_addresses(&peer_id); + + debug!( + "Reconnecting channel {:x} peers {:?} in state {:?}", + &channel_id, &peer_id, &channel_state + ); + for addr in addresses { + myself + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::ConnectPeer(addr), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + } + } + myself .send_message(NetworkActorMessage::new_command( NetworkActorCommand::MaintainConnections(NUM_PEER_CONNECTIONS), diff --git a/src/fiber/tests/network.rs b/src/fiber/tests/network.rs index 52040d01..e7952a85 100644 --- a/src/fiber/tests/network.rs +++ b/src/fiber/tests/network.rs @@ -587,3 +587,31 @@ async fn test_connecting_to_bootnode() { ) .await; } + +#[tokio::test] +async fn test_saving_and_connecting_to_node() { + init_tracing(); + + let node1 = NetworkNode::new().await; + let node1_address = node1.get_node_address().clone(); + let node1_id = &node1.peer_id; + + let mut node2 = NetworkNode::new().await; + + node2 + .network_actor + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::SavePeerAddress(node1_address), + )) + .expect("send message to network actor"); + + // Wait for the above message to be processed. + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + node2.restart().await; + + node2.expect_event( + |event| matches!(event, NetworkServiceEvent::PeerConnected(id, _addr) if id == node1_id), + ) + .await; +} diff --git a/src/rpc/peer.rs b/src/rpc/peer.rs index f1ffa4be..b5a7b480 100644 --- a/src/rpc/peer.rs +++ b/src/rpc/peer.rs @@ -9,9 +9,10 @@ use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use tentacle::{multiaddr::MultiAddr, secio::PeerId}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub(crate) struct ConnectPeerParams { address: MultiAddr, + save: bool, } #[serde_as] @@ -45,6 +46,15 @@ impl PeerRpcServer for PeerRpcServerImpl { async fn connect_peer(&self, params: ConnectPeerParams) -> Result<(), ErrorObjectOwned> { let message = NetworkActorMessage::Command(NetworkActorCommand::ConnectPeer(params.address.clone())); + if params.save { + crate::handle_actor_cast!( + self.actor, + NetworkActorMessage::Command(NetworkActorCommand::ConnectPeer( + params.address.clone() + )), + params.clone() + )?; + } crate::handle_actor_cast!(self.actor, message, params) } From ad5675d191cc4d2e024b5b9094d527ee5e6ccffd Mon Sep 17 00:00:00 2001 From: YI Date: Fri, 18 Oct 2024 20:57:36 +0800 Subject: [PATCH 09/11] Add test for reconnect peer on restart --- src/fiber/tests/channel.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/fiber/tests/channel.rs b/src/fiber/tests/channel.rs index 68f7be5f..ee495709 100644 --- a/src/fiber/tests/channel.rs +++ b/src/fiber/tests/channel.rs @@ -1233,3 +1233,20 @@ async fn test_commitment_tx_capacity() { output_capacity as u128 ); } + +#[tokio::test] +async fn test_connect_to_peers_with_mutual_channel_on_restart() { + let node_a_funding_amount = 100000000000; + let node_b_funding_amount = 6200000000; + + let (mut node_a, node_b, _new_channel_id) = + create_nodes_with_established_channel(node_a_funding_amount, node_b_funding_amount, true) + .await; + + node_a.restart().await; + + node_a.expect_event( + |event| matches!(event, NetworkServiceEvent::PeerConnected(id, _addr) if id == &node_b.peer_id), + ) + .await; +} From 21b4ed545936768cc6fe35385f5ce8b292bdf1b0 Mon Sep 17 00:00:00 2001 From: YI Date: Mon, 21 Oct 2024 18:35:50 +0800 Subject: [PATCH 10/11] Make the parameter save to connect_peer optional --- src/rpc/README.md | 1 + src/rpc/peer.rs | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/rpc/README.md b/src/rpc/README.md index b939ecef..2d154b8b 100644 --- a/src/rpc/README.md +++ b/src/rpc/README.md @@ -241,6 +241,7 @@ Attempts to connect to a peer. ###### Params * `address` - The address of the peer to connect to +* `save` - Whether to save the peer address, an optional parameter (default value true) ###### Returns diff --git a/src/rpc/peer.rs b/src/rpc/peer.rs index b5a7b480..7022ccd8 100644 --- a/src/rpc/peer.rs +++ b/src/rpc/peer.rs @@ -12,7 +12,7 @@ use tentacle::{multiaddr::MultiAddr, secio::PeerId}; #[derive(Serialize, Deserialize, Debug, Clone)] pub(crate) struct ConnectPeerParams { address: MultiAddr, - save: bool, + save: Option, } #[serde_as] @@ -46,10 +46,10 @@ impl PeerRpcServer for PeerRpcServerImpl { async fn connect_peer(&self, params: ConnectPeerParams) -> Result<(), ErrorObjectOwned> { let message = NetworkActorMessage::Command(NetworkActorCommand::ConnectPeer(params.address.clone())); - if params.save { + if params.save.unwrap_or(true) { crate::handle_actor_cast!( self.actor, - NetworkActorMessage::Command(NetworkActorCommand::ConnectPeer( + NetworkActorMessage::Command(NetworkActorCommand::SavePeerAddress( params.address.clone() )), params.clone() From a7aac1adac718d0eb404eacfd45ca6e0023e8aa2 Mon Sep 17 00:00:00 2001 From: YI Date: Mon, 21 Oct 2024 20:19:24 +0800 Subject: [PATCH 11/11] Add test for persisting announced node --- src/fiber/network.rs | 2 +- src/fiber/tests/network.rs | 27 +++++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/fiber/network.rs b/src/fiber/network.rs index e31ee8b9..3467266e 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -2396,7 +2396,7 @@ impl PersistentNetworkActorState { .saved_peer_addresses .keys() .into_iter() - .chain(self.saved_peer_addresses.keys().into_iter()) + .chain(self.announced_peer_addresses.keys().into_iter()) .collect::>(); nodes diff --git a/src/fiber/tests/network.rs b/src/fiber/tests/network.rs index e7952a85..6b7cfe46 100644 --- a/src/fiber/tests/network.rs +++ b/src/fiber/tests/network.rs @@ -569,6 +569,33 @@ async fn test_persisting_bootnode() { assert_eq!(peers.get(&boot_peer_id), Some(&vec![address])); } +#[tokio::test] +async fn test_persisting_announced_nodes() { + let mut node = new_synced_node("test").await; + + let announcement = create_fake_node_announcement_mesage_version1(); + let node_pk = announcement.node_id; + let peer_id = node_pk.tentacle_peer_id(); + + node.network_actor + .send_message(NetworkActorMessage::Event(NetworkActorEvent::PeerMessage( + peer_id.clone(), + FiberMessage::BroadcastMessage(FiberBroadcastMessage::NodeAnnouncement( + create_fake_node_announcement_mesage_version1(), + )), + ))) + .expect("send message to network actor"); + + // Wait for the above message to be processed. + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + node.stop().await; + let state = node.store.clone(); + let state = state.get_network_actor_state(&node.peer_id).unwrap(); + let peers = state.sample_n_peers_to_connect(1); + assert!(peers.get(&peer_id).is_some()); +} + #[tokio::test] async fn test_connecting_to_bootnode() { let boot_node = NetworkNode::new().await;