Skip to content

Commit

Permalink
Refactored the connections map from HashMap to DashMap (#180)
Browse files Browse the repository at this point in the history
* Refactored the connections map from HashMap to DashMap to minimize write lock contention on the connection container

* Replaced the connection map key from ArcStr to String
  • Loading branch information
barshaul authored Aug 7, 2024
1 parent e6635ac commit 374c7bb
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 78 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ crc16 = { version = "0.4", optional = true }
rand = { version = "0.8", optional = true }
derivative = { version = "2.2.0", optional = true }

# Only needed for async cluster
dashmap = { version = "6.0", optional = true }

# Only needed for async_std support
async-std = { version = "1.8.0", optional = true }
async-trait = { version = "0.1.24", optional = true }
Expand Down Expand Up @@ -124,7 +127,7 @@ tokio-native-tls-comp = ["tokio-comp", "tls-native-tls", "tokio-native-tls"]
tokio-rustls-comp = ["tokio-comp", "tls-rustls", "tokio-rustls"]
connection-manager = ["futures", "aio", "tokio-retry"]
streams = []
cluster-async = ["cluster", "futures", "futures-util"]
cluster-async = ["cluster", "futures", "futures-util", "dashmap"]
keep-alive = ["socket2"]
sentinel = ["rand"]
tcp_nodelay = []
Expand Down
96 changes: 51 additions & 45 deletions redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use crate::cluster_async::ConnectionFuture;
use arcstr::ArcStr;
use futures::FutureExt;
use rand::seq::IteratorRandom;
use std::collections::HashMap;
use std::net::IpAddr;

use crate::cluster_routing::{Route, SlotAddr};
use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};
use crate::cluster_topology::TopologyHash;
use dashmap::DashMap;
use futures::FutureExt;
use rand::seq::IteratorRandom;
use std::net::IpAddr;

/// A struct that encapsulates a network connection along with its associated IP address.
#[derive(Clone, Eq, PartialEq, Debug)]
Expand Down Expand Up @@ -86,11 +84,12 @@ pub(crate) enum ConnectionType {
PreferManagement,
}

pub(crate) struct ConnectionsMap<Connection>(pub(crate) HashMap<ArcStr, ClusterNode<Connection>>);
pub(crate) struct ConnectionsMap<Connection>(pub(crate) DashMap<String, ClusterNode<Connection>>);

impl<Connection> std::fmt::Display for ConnectionsMap<Connection> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for (address, node) in self.0.iter() {
for item in self.0.iter() {
let (address, node) = (item.key(), item.value());
match node.user_connection.ip {
Some(ip) => writeln!(f, "{address} - {ip}")?,
None => writeln!(f, "{address}")?,
Expand All @@ -101,7 +100,7 @@ impl<Connection> std::fmt::Display for ConnectionsMap<Connection> {
}

pub(crate) struct ConnectionsContainer<Connection> {
connection_map: HashMap<ArcStr, ClusterNode<Connection>>,
connection_map: DashMap<String, ClusterNode<Connection>>,
pub(crate) slot_map: SlotMap,
read_from_replica_strategy: ReadFromReplicaStrategy,
topology_hash: TopologyHash,
Expand All @@ -118,7 +117,7 @@ impl<Connection> Default for ConnectionsContainer<Connection> {
}
}

pub(crate) type ConnectionAndAddress<Connection> = (ArcStr, Connection);
pub(crate) type ConnectionAndAddress<Connection> = (String, Connection);

impl<Connection> ConnectionsContainer<Connection>
where
Expand All @@ -139,7 +138,7 @@ where
}

/// Returns true if the address represents a known primary node.
pub(crate) fn is_primary(&self, address: &ArcStr) -> bool {
pub(crate) fn is_primary(&self, address: &String) -> bool {
self.connection_for_address(address).is_some()
&& self
.slot_map
Expand Down Expand Up @@ -213,9 +212,10 @@ where
pub(crate) fn all_node_connections(
&self,
) -> impl Iterator<Item = ConnectionAndAddress<Connection>> + '_ {
self.connection_map
.iter()
.map(move |(address, node)| (address.clone(), node.user_connection.conn.clone()))
self.connection_map.iter().map(move |item| {
let (node, address) = (item.key(), item.value());
(node.clone(), address.user_connection.conn.clone())
})
}

pub(crate) fn all_primary_connections(
Expand All @@ -228,16 +228,19 @@ where
}

pub(crate) fn node_for_address(&self, address: &str) -> Option<ClusterNode<Connection>> {
self.connection_map.get(address).cloned()
self.connection_map
.get(address)
.map(|item| item.value().clone())
}

pub(crate) fn connection_for_address(
&self,
address: &str,
) -> Option<ConnectionAndAddress<Connection>> {
self.connection_map
.get_key_value(address)
.map(|(address, conn)| (address.clone(), conn.user_connection.conn.clone()))
self.connection_map.get(address).map(|item| {
let (address, conn) = (item.key(), item.value());
(address.clone(), conn.user_connection.conn.clone())
})
}

pub(crate) fn random_connections(
Expand All @@ -249,24 +252,27 @@ where
.iter()
.choose_multiple(&mut rand::thread_rng(), amount)
.into_iter()
.map(move |(address, node)| {
.map(move |item| {
let (address, node) = (item.key(), item.value());
let conn = node.get_connection(&conn_type);
(address.clone(), conn)
})
}

pub(crate) fn replace_or_add_connection_for_address(
&mut self,
address: impl Into<ArcStr>,
&self,
address: impl Into<String>,
node: ClusterNode<Connection>,
) -> ArcStr {
) -> String {
let address = address.into();
self.connection_map.insert(address.clone(), node);
address
}

pub(crate) fn remove_node(&mut self, address: &ArcStr) -> Option<ClusterNode<Connection>> {
self.connection_map.remove(address)
pub(crate) fn remove_node(&self, address: &String) -> Option<ClusterNode<Connection>> {
self.connection_map
.remove(address)
.map(|(_key, value)| value)
}

pub(crate) fn len(&self) -> usize {
Expand Down Expand Up @@ -302,13 +308,13 @@ mod tests {
}
}
}
fn remove_nodes(container: &mut ConnectionsContainer<usize>, addresss: &[&str]) {
for address in addresss {
fn remove_nodes(container: &ConnectionsContainer<usize>, addresses: &[&str]) {
for address in addresses {
container.remove_node(&(*address).into());
}
}

fn remove_all_connections(container: &mut ConnectionsContainer<usize>) {
fn remove_all_connections(container: &ConnectionsContainer<usize>) {
remove_nodes(
container,
&[
Expand Down Expand Up @@ -366,7 +372,7 @@ mod tests {
],
ReadFromReplicaStrategy::AlwaysFromPrimary, // this argument shouldn't matter, since we overload the RFR strategy.
);
let mut connection_map = HashMap::new();
let connection_map = DashMap::new();
connection_map.insert(
"primary1".into(),
create_cluster_node(1, use_management_connections),
Expand Down Expand Up @@ -514,7 +520,7 @@ mod tests {

#[test]
fn get_replica_connection_for_replica_route_if_some_but_not_all_replicas_were_removed() {
let mut container = create_container();
let container = create_container();
container.remove_node(&"replica3-2".into());

assert_eq!(
Expand All @@ -540,8 +546,8 @@ mod tests {

#[test]
fn get_primary_connection_for_replica_route_if_all_replicas_were_removed() {
let mut container = create_container();
remove_nodes(&mut container, &["replica2-1", "replica3-1", "replica3-2"]);
let container = create_container();
remove_nodes(&container, &["replica2-1", "replica3-1", "replica3-2"]);

assert_eq!(
2,
Expand Down Expand Up @@ -593,15 +599,15 @@ mod tests {

#[test]
fn get_connection_by_address_returns_none_if_connection_was_removed() {
let mut container = create_container();
let container = create_container();
container.remove_node(&"primary1".into());

assert!(container.connection_for_address("primary1").is_none());
}

#[test]
fn get_connection_by_address_returns_added_connection() {
let mut container = create_container();
let container = create_container();
let address = container.replace_or_add_connection_for_address(
"foobar",
ClusterNode::new_only_with_user_conn(4),
Expand Down Expand Up @@ -630,8 +636,8 @@ mod tests {

#[test]
fn get_random_connections_returns_none_if_all_connections_were_removed() {
let mut container = create_container();
remove_all_connections(&mut container);
let container = create_container();
remove_all_connections(&container);

assert_eq!(
0,
Expand All @@ -643,8 +649,8 @@ mod tests {

#[test]
fn get_random_connections_returns_added_connection() {
let mut container = create_container();
remove_all_connections(&mut container);
let container = create_container();
remove_all_connections(&container);
let address = container.replace_or_add_connection_for_address(
"foobar",
ClusterNode::new_only_with_user_conn(4),
Expand Down Expand Up @@ -694,7 +700,7 @@ mod tests {

#[test]
fn get_all_user_connections_returns_added_connection() {
let mut container = create_container();
let container = create_container();
container.replace_or_add_connection_for_address(
"foobar",
ClusterNode::new_only_with_user_conn(4),
Expand All @@ -711,7 +717,7 @@ mod tests {

#[test]
fn get_all_user_connections_does_not_return_removed_connection() {
let mut container = create_container();
let container = create_container();
container.remove_node(&"primary1".into());

let mut connections: Vec<_> = container
Expand All @@ -738,7 +744,7 @@ mod tests {

#[test]
fn get_all_primaries_does_not_return_removed_connection() {
let mut container = create_container();
let container = create_container();
container.remove_node(&"primary1".into());

let mut connections: Vec<_> = container
Expand All @@ -752,7 +758,7 @@ mod tests {

#[test]
fn len_is_adjusted_on_removals_and_additions() {
let mut container = create_container();
let container = create_container();

assert_eq!(container.len(), 6);

Expand All @@ -769,7 +775,7 @@ mod tests {
#[test]
fn len_is_not_adjusted_on_removals_of_nonexisting_connections_or_additions_of_existing_connections(
) {
let mut container = create_container();
let container = create_container();

assert_eq!(container.len(), 6);

Expand All @@ -785,7 +791,7 @@ mod tests {

#[test]
fn remove_node_returns_connection_if_it_exists() {
let mut container = create_container();
let container = create_container();

let connection = container.remove_node(&"primary1".into());
assert_eq!(connection, Some(ClusterNode::new_only_with_user_conn(1)));
Expand All @@ -796,7 +802,7 @@ mod tests {

#[test]
fn test_is_empty() {
let mut container = create_container();
let container = create_container();

assert!(!container.is_empty());
container.remove_node(&"primary1".into());
Expand Down Expand Up @@ -829,7 +835,7 @@ mod tests {

#[test]
fn is_primary_returns_false_for_removed_node() {
let mut container = create_container();
let container = create_container();
let address = "primary1".into();
container.remove_node(&address);

Expand Down
Loading

0 comments on commit 374c7bb

Please sign in to comment.