Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/deadlock detection #299

Open
wants to merge 19 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 15 additions & 23 deletions libmuscle/cpp/src/libmuscle/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ std::tuple<Message, double> Communicator::receive_message(
Port & port = (port_name == "muscle_settings_in") ?
port_manager_.muscle_settings_in() : port_manager_.get_port(port_name);

std::string port_and_slot = port_name;
maarten-ic marked this conversation as resolved.
Show resolved Hide resolved
if (slot.is_set())
logger_.debug("Waiting for message on ", port_name, "[", slot.get(), "]");
else
logger_.debug("Waiting for message on ", port_name);
port_and_slot = port_name + "[" + std::to_string(slot.get()) + "]";
logger_.debug("Waiting for message on ", port_and_slot);
std::vector<int> slot_list;
if (slot.is_set())
slot_list.emplace_back(slot.get());
Expand All @@ -146,7 +146,7 @@ std::tuple<Message, double> Communicator::receive_message(
manager_, peer_instance, port_name, slot, receive_timeout_);
ReceiveTimeoutHandler *timeout_handler = receive_timeout_ < 0 ? nullptr : &handler;
auto msg_and_profile = try_receive_(
client, recv_endpoint.ref(), snd_endpoint.kernel, timeout_handler);
client, recv_endpoint.ref(), snd_endpoint.kernel, port_and_slot, timeout_handler);
auto & msg = std::get<0>(msg_and_profile);

ProfileEvent recv_decode_event(
Expand Down Expand Up @@ -210,37 +210,24 @@ std::tuple<Message, double> Communicator::receive_message(
if (expected_message_number != mpp_message.message_number) {
if (expected_message_number - 1 == mpp_message.message_number and
port.is_resuming(slot)) {
if (slot.is_set())
logger_.debug("Discarding received message on ", port_name,
"[", slot.get(), "]: resuming from weakly",
" consistent snapshot");
else
logger_.debug("Discarding received message on ", port_name,
": resuming from weakly consistent snapshot");
logger_.debug("Discarding received message on ", port_and_slot,
": resuming from weakly consistent snapshot");
port.set_resumed(slot);
return receive_message(port_name, slot, default_msg);
}
std::ostringstream oss;
oss << "Received message on " << port_name;
if (slot.is_set())
oss << "[" << slot.get() << "]";
oss << "Received message on " << port_and_slot;
oss << " with unexpected message number " << mpp_message.message_number;
oss << ". Was expecting " << expected_message_number;
oss << ". Are you resuming from an inconsistent snapshot?";
throw std::runtime_error(oss.str());
}
port.increment_num_messages(slot);

if (slot.is_set())
logger_.debug("Received message on ", port_name, "[", slot.get(), "]");
else
logger_.debug("Received message on ", port_name);
logger_.debug("Received message on ", port_and_slot);

if (is_close_port(message.data())) {
if (slot.is_set())
logger_.debug("Port ", port_name, "[", slot.get(), "] is now closed");
else
logger_.debug("Port ", port_name, " is now closed");
logger_.debug("Port ", port_and_slot, " is now closed");
}
return std::make_tuple(message, mpp_message.saved_until);
}
Expand Down Expand Up @@ -297,9 +284,14 @@ Endpoint Communicator::get_endpoint_(

std::tuple<std::vector<char>, mcp::ProfileData> Communicator::try_receive_(
MPPClient & client, Reference const & receiver, Reference const & peer,
ReceiveTimeoutHandler *timeout_handler) {
std::string const & port_and_slot, ReceiveTimeoutHandler *timeout_handler) {
try {
return client.receive(receiver, timeout_handler);
} catch(Deadlock const & err) {
throw std::runtime_error(
"Deadlock detected when when receiving a message on '" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when when!

port_and_slot +
"'. See manager logs for more detail.");
} catch(std::runtime_error const & err) {
throw std::runtime_error(
"Error while receiving a message: connection with peer '" +
Expand Down
3 changes: 2 additions & 1 deletion libmuscle/cpp/src/libmuscle/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ class Communicator {

std::tuple<std::vector<char>, mcp::ProfileData> try_receive_(
MPPClient & client, ymmsl::Reference const & receiver,
ymmsl::Reference const & peer, ReceiveTimeoutHandler *handler);
ymmsl::Reference const & peer, std::string const & port_and_slot,
ReceiveTimeoutHandler *handler);

void close_port_(std::string const & port_name, Optional<int> slot = {});

Expand Down
59 changes: 32 additions & 27 deletions libmuscle/cpp/src/libmuscle/mcp/tcp_transport_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ int connect(std::string const & address) {
+ port);
}

/** Poll until timeout (in seconds) is reached. Retry when interrupted with EINTR. */
inline int poll_retry_eintr(pollfd *fds, nfds_t nfds, double timeout) {
maarten-ic marked this conversation as resolved.
Show resolved Hide resolved
using std::chrono::duration;
using std::chrono::steady_clock;
using std::chrono::milliseconds;
using std::chrono::duration_cast;

const auto timeout_duration = duration<double>(timeout);
const auto deadline = steady_clock::now() + timeout_duration;
while (1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while (true), for consistent nits.

int timeout_ms = duration_cast<milliseconds>(deadline - steady_clock::now()).count();
int poll_result = poll(fds, nfds, timeout_ms);

if (poll_result >= 0)
return poll_result;

if (errno != EINTR)
throw std::runtime_error(
"Unexpected error during poll(): " +
std::string(std::strerror(errno)));
// poll() was interrupted by a signal: retry with re-calculated timeout
}
}

}


Expand Down Expand Up @@ -140,38 +164,19 @@ std::tuple<std::vector<char>, ProfileData> TcpTransportClient::call(
if (timeout_handler == nullptr) {
length = recv_int64(socket_fd_);
} else {
using std::chrono::duration;
using std::chrono::steady_clock;
using std::chrono::milliseconds;
using std::chrono::duration_cast;

const auto timeout_duration = duration<double>(timeout_handler->get_timeout());
const auto deadline = steady_clock::now() + timeout_duration;
int poll_result;
bool did_timeout = false;
pollfd socket_poll_fd;
socket_poll_fd.fd = socket_fd_;
socket_poll_fd.events = POLLIN;
do {
int timeout_ms = duration_cast<milliseconds>(deadline - steady_clock::now()).count();
poll_result = poll(&socket_poll_fd, 1, timeout_ms);

if (poll_result >= 0)
break;

if (errno != EINTR)
throw std::runtime_error("Unexpected error during poll(): "+std::to_string(errno));

// poll() was interrupted by a signal: retry with re-calculated timeout
} while (1);

if (poll_result == 0) {
// time limit expired
while (poll_retry_eintr(&socket_poll_fd, 1, timeout_handler->get_timeout()) == 0) {
maarten-ic marked this conversation as resolved.
Show resolved Hide resolved
timeout_handler->on_timeout();
length = recv_int64(socket_fd_);
did_timeout = true;
}
// socket is ready for a receive, this call shouldn't block:
length = recv_int64(socket_fd_);

if (did_timeout) {
timeout_handler->on_receive();
} else {
// socket is ready for a receive, this call shouldn't block:
length = recv_int64(socket_fd_);
}
}

Expand Down
9 changes: 9 additions & 0 deletions libmuscle/cpp/src/libmuscle/mmp_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,15 @@ void MMPClient::waiting_for_receive_done(
auto response = call_manager_(request);
}

bool MMPClient::is_deadlocked() {
auto request = Data::list(
static_cast<int>(RequestType::waiting_for_receive_done),
static_cast<std::string>(instance_id_));

auto response = call_manager_(request);
return response[1].as<bool>();
}

DataConstRef MMPClient::call_manager_(DataConstRef const & request) {
std::lock_guard<std::mutex> lock(mutex_);

Expand Down
3 changes: 3 additions & 0 deletions libmuscle/cpp/src/libmuscle/mmp_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ class MMPClient {
std::string const & peer_instance_id, std::string const & port_name,
Optional<int> slot);

/** Ask the manager if we're part of a deadlock. */
bool is_deadlocked();

private:
ymmsl::Reference instance_id_;
mcp::TcpTransportClient transport_client_;
Expand Down
14 changes: 11 additions & 3 deletions libmuscle/cpp/src/libmuscle/receive_timeout_handler.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "receive_timeout_handler.hpp"

#include <cmath>

namespace libmuscle { namespace _MUSCLE_IMPL_NS {

ReceiveTimeoutHandler::ReceiveTimeoutHandler(
Expand All @@ -9,16 +11,22 @@ ReceiveTimeoutHandler::ReceiveTimeoutHandler(
, peer_instance_(peer_instance)
, port_name_(port_name)
, slot_(slot)
, timeout_(timeout) {}
, timeout_(timeout)
, num_timeout_(0) {}

double ReceiveTimeoutHandler::get_timeout()
{
return timeout_;
// Increase timeout by a factor 1.5 with every timeout we hit:
return timeout_ * std::pow(1.5, (double)num_timeout_);
maarten-ic marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static_cast<double> is the C++ way to do (double) here, but can't it just be omitted? Or is that a narrowing conversion?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(double) can be omitted 👍

}

void ReceiveTimeoutHandler::on_timeout()
{
manager_.waiting_for_receive(peer_instance_, port_name_, slot_);
if (num_timeout_ == 0)
manager_.waiting_for_receive(peer_instance_, port_name_, slot_);
else
if (manager_.is_deadlocked())
throw Deadlock();
}

void ReceiveTimeoutHandler::on_receive()
Expand Down
10 changes: 9 additions & 1 deletion libmuscle/cpp/src/libmuscle/receive_timeout_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@

namespace libmuscle { namespace _MUSCLE_IMPL_NS {

/** Error thrown when a deadlock is detected. */
class Deadlock : public std::runtime_error {
public:
Deadlock() : std::runtime_error("Deadlock detected") {};
virtual ~Deadlock() = default;
};


/** Timeout handler when receiving messages from peers.
*
* This handler sends a message to the Muscle Manager when the receive times out (and
Expand Down Expand Up @@ -34,7 +42,7 @@ class ReceiveTimeoutHandler : public mcp::TimeoutHandler {
std::string const & port_name_;
Optional<int> slot_;
double timeout_;

int num_timeout_;
};

} }
3 changes: 3 additions & 0 deletions libmuscle/cpp/src/libmuscle/tests/mocks/mock_mmp_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class MockMMPClient : public MockClass<MockMMPClient> {
NAME_MOCK_MEM_FUN(MockMMPClient, deregister_instance);
NAME_MOCK_MEM_FUN(MockMMPClient, waiting_for_receive);
NAME_MOCK_MEM_FUN(MockMMPClient, waiting_for_receive_done);
NAME_MOCK_MEM_FUN(MockMMPClient, is_deadlocked);

// Create some empty return objects for return values with a complex
// structure, to make it easier to set them in the tests or fixtures.
Expand Down Expand Up @@ -105,6 +106,8 @@ class MockMMPClient : public MockClass<MockMMPClient> {
MockFun<Void,
Val<std::string const &>, Val<std::string const &>, Val<Optional<int>>
> waiting_for_receive_done;

MockFun<Val<bool>> is_deadlocked;
};

using MMPClient = MockMMPClient;
Expand Down
9 changes: 8 additions & 1 deletion libmuscle/python/libmuscle/communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from libmuscle.profiler import Profiler
from libmuscle.profiling import (
ProfileEvent, ProfileEventType, ProfileTimestamp)
from libmuscle.receive_timeout_handler import ReceiveTimeoutHandler
from libmuscle.receive_timeout_handler import Deadlock, ReceiveTimeoutHandler


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -257,6 +257,13 @@ def receive_message(
"Error while receiving a message: connection with peer"
f" '{snd_endpoint.kernel}' was lost. Did the peer crash?"
) from exc
except Deadlock:
# Profiler messages may be used for debugging the deadlock
self._profiler.shutdown()
raise RuntimeError(
"Deadlock detected when when receiving a message on "
f"port '{port_and_slot}'. See manager logs for more detail."
) from None

recv_decode_event = ProfileEvent(
ProfileEventType.RECEIVE_DECODE, ProfileTimestamp(), None,
Expand Down
42 changes: 29 additions & 13 deletions libmuscle/python/libmuscle/manager/deadlock_detector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from threading import Thread
from threading import Thread, Lock
import time
from typing import Callable, Dict, List, Optional, Tuple
from queue import Empty, Queue
Expand All @@ -13,11 +13,11 @@ class DeadlockDetector(Thread):
"""The DeadlockDetector attempts to detect when multiple instances are stuck waiting
for each other.

This class is responsible for handling WAITING_FOR_RECEIVE and
This class is responsible for handling WAITING_FOR_RECEIVE, IS_DEADLOCKED and
WAITING_FOR_RECEIVE_DONE MMP messages, which are submitted by the MMPServer.

When a deadlock is detected, the cycle of instances that is waiting on each other is
logged with FATAL severity. If this deadlock does not get resoled in
logged with FATAL severity. If this deadlock does not get resolved in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it's only logged if it isn't resolved, right? That's how it should be, so the comment should be clarified I think.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently logged as soon as the manager identifies a cycle of waiting instances. I think this will never be resolved, but I'm not sure.

I can also update the logic to only print the deadlock cycle when any of the deadlocked instances calls is_deadlocked and starts shutting down. At that point we're sure that there was a deadlock and that the simulation will shut down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer that. Either we're convinced that the grace period is unnecessary and that as soon as a cycle is detected there's an actual deadlock, in which case we should remove the grace period, or we decide that we need it, but then the warning should be consistent. Otherwise you could get a false positive in the log, and that could confuse people who are doing everything right.

``wait_before_shutdown`` seconds, the simulation is shut down.
"""

Expand Down Expand Up @@ -49,6 +49,9 @@ def __init__(
self._waiting_instance_ports: Dict[str, Tuple[str, Optional[int]]] = {}
"""Maps instance IDs to the port/slot they are waiting on.."""

# detected deadlocks may be accessed from any thread by is_deadlocked, so it
# needs a mutex:
self._detected_deadlocks_mutex = Lock()
self._detected_deadlocks: List[List[str]] = []
"""List of deadlocked instance cycles. Set by _handle_potential_deadlock.
"""
Expand All @@ -69,7 +72,8 @@ def run(self) -> None:
item = self._queue.get(timeout=seconds_until_abort)
if item is None: # On shutdown, None is pushed to the queue
return # exit thread
self._process_queue_item(item)
with self._detected_deadlocks_mutex:
self._process_queue_item(item)

except Empty:
# Timeout was set and has expired without any new messages:
Expand All @@ -82,15 +86,16 @@ def run(self) -> None:
# (otherwise this would have triggered a _process_queue_item, clearing
# self._shutdown_time and we had not set another timeout). And so a
# deadlock is still present. Assert it to be absolutely certain:
assert self._detected_deadlocks
formatted_deadlocks = "\n\n".join(
self._format_deadlock(instances)
for instances in self._detected_deadlocks)
_logger.fatal(
"Aborting simulation: deadlock detected.\n%s",
formatted_deadlocks)
self._shutdown_callback()
return
with self._detected_deadlocks_mutex:
assert self._detected_deadlocks
formatted_deadlocks = "\n\n".join(
self._format_deadlock(instances)
for instances in self._detected_deadlocks)
_logger.fatal(
"Aborting simulation: deadlock detected.\n%s",
formatted_deadlocks)
self._shutdown_callback()
return

def shutdown(self) -> None:
"""Stop the deadlock detector thread."""
Expand Down Expand Up @@ -128,6 +133,17 @@ def put_waiting_done(
"""
self._queue.put((False, instance_id, peer_instance_id, port_name, slot))

def is_deadlocked(self, instance_id: str) -> bool:
"""Check if the provided instance is part of a detected deadlock.

This method can be called from any thread.
"""
with self._detected_deadlocks_mutex:
for deadlock_instances in self._detected_deadlocks:
if instance_id in deadlock_instances:
return True
return False

def _process_queue_item(self, item: _QueueItem) -> None:
"""Actually process a WAITING_FOR_RECEIVE[_DONE] request.

Expand Down
Loading