Skip to content

Commit

Permalink
some fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
barshaul committed Oct 9, 2024
1 parent 5f96843 commit 8522882
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 39 deletions.
70 changes: 34 additions & 36 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1836,6 +1836,7 @@ where
core: Core<C>,
response_policy: Option<ResponsePolicy>,
) -> OperationResult {
trace!("execute_on_multiple_nodes");
let connections_container = core.conn_lock.read().await;
if connections_container.is_empty() {
return OperationResult::Err((
Expand Down Expand Up @@ -2119,20 +2120,22 @@ where
}
ConnectionCheck::RandomConnection => {
let read_guard = core.conn_lock.read().await;
let random_conns_option = read_guard.random_connections(1, ConnectionType::User);
if let Some(mut random_connections) = random_conns_option {
let (random_address, random_conn_future) =
random_connections.next().ok_or(RedisError::from((
read_guard
.random_connections(1, ConnectionType::User)
.and_then(|mut random_connections| {
random_connections.next().map(
|(random_address, random_conn_future)| async move {
(random_address, random_conn_future.await)
},
)
})
.ok_or_else(|| {
RedisError::from((
ErrorKind::AllConnectionsUnavailable,
"No random connection found",
)))?;
return Ok((random_address, random_conn_future.await));
} else {
return Err(RedisError::from((
ErrorKind::AllConnectionsUnavailable,
"No random connection found",
)));
}
))
})?
.await
}
};

Expand All @@ -2151,23 +2154,21 @@ where
RecoverFuture::RecoverSlots(ref mut future) => match ready!(future.as_mut().poll(cx)) {
Ok(_) => {
trace!("Recovered!");
(Some(ConnectionState::PollComplete), Poll::Ready(Ok(())))
(ConnectionState::PollComplete, Poll::Ready(Ok(())))
}
Err(err) => {
trace!("Recover slots failed!");

let next_state = if err.kind() == ErrorKind::AllConnectionsUnavailable {
Some(ConnectionState::Recover(RecoverFuture::Reconnect(
Box::pin(ClusterConnInner::reconnect_to_initial_nodes(
self.inner.clone(),
)),
ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()),
)))
} else {
Some(ConnectionState::Recover(RecoverFuture::RecoverSlots(
Box::pin(Self::refresh_slots_and_subscriptions_with_retries(
ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
Self::refresh_slots_and_subscriptions_with_retries(
self.inner.clone(),
&RefreshPolicy::Throttable,
)),
),
)))
};
(next_state, Poll::Ready(Err(err)))
Expand All @@ -2176,12 +2177,10 @@ where
RecoverFuture::Reconnect(ref mut future) => {
ready!(future.as_mut().poll(cx));
trace!("Reconnected connections");
(Some(ConnectionState::PollComplete), Poll::Ready(Ok(())))
(ConnectionState::PollComplete, Poll::Ready(Ok(())))
}
};
if let Some(state) = next_state {
self.state = state;
}
self.state = next_state;
poll
}

Expand Down Expand Up @@ -2476,19 +2475,18 @@ async fn calculate_topology_from_random_nodes<'a, C>(
where
C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
{
let requested_nodes = match read_guard
.random_connections(num_of_nodes_to_query, ConnectionType::PreferManagement)
let requested_nodes = if let Some(random_conns) =
read_guard.random_connections(num_of_nodes_to_query, ConnectionType::PreferManagement)
{
Some(random_conns) => random_conns,
None => {
return (
Err(RedisError::from((
ErrorKind::AllConnectionsUnavailable,
"No available connections to refresh slots from",
))),
vec![],
)
}
random_conns
} else {
return (
Err(RedisError::from((
ErrorKind::AllConnectionsUnavailable,
"No available connections to refresh slots from",
))),
vec![],
);
};
let topology_join_results =
futures::future::join_all(requested_nodes.map(|(addr, conn)| async move {
Expand Down
4 changes: 1 addition & 3 deletions redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2584,7 +2584,6 @@ mod cluster_async {
respond_startup_two_nodes(name, cmd)?;
let i = requests.fetch_add(1, atomic::Ordering::SeqCst);
match i {
// Respond that the key exists on a node that does not yet have a connection:
0 => Err(Err(RedisError::from((ErrorKind::IoError, "io-error")))),
_ => {
panic!("Expected not to be retried!")
Expand Down Expand Up @@ -3294,9 +3293,8 @@ mod cluster_async {
}

if use_sharded {
let mut cmd = redis::cmd("SPUBLISH");
// validate SPUBLISH
let result = cmd
let result = redis::cmd("SPUBLISH")
.arg("test_channel_?")
.arg("test_message")
.query_async(&mut publishing_con)
Expand Down

0 comments on commit 8522882

Please sign in to comment.