Skip to content

Commit

Permalink
Merge pull request #2 from HSLdevcom/handle_disconnects
Browse files Browse the repository at this point in the history
Handle MQTT disconnects
  • Loading branch information
mjaakko authored Dec 2, 2022
2 parents 7eea718 + 44f26c1 commit faa1e4e
Showing 1 changed file with 96 additions and 42 deletions.
138 changes: 96 additions & 42 deletions mqtt_data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,70 +11,122 @@

load_dotenv()

# MQTT keep alive interval
# This needs to be small enough to detect if the connection is down so that message rate will be calculated correctly
MQTT_KEEP_ALIVE_SECS = 5

IS_DEBUG = os.getenv('IS_DEBUG') == "True"

# How long to listen to the topics until we send data to Azure. Should be 60 in production
MONITOR_PERIOD_IN_SECONDS = 60 if IS_DEBUG == False else 3
MONITOR_PERIOD_IN_SECONDS = 60 if IS_DEBUG == False else 20

class Topic:
is_starting = False # True while connecting to the broker, False when connected or disconnected
is_running = False
msg_count = 0

measuring_started_at = None
measuring_stopped_at = None

def __init__(self, topic_address, topic_name, topic_port):
self.topic_address = topic_address
self.topic_name = topic_name
self.topic_port = topic_port

def get_broker_address(self):
return f"{self.topic_address}:{self.topic_port}"

def print_status(self):
print(f"[Status] {self.topic_name}: msg_count: {self.msg_count}, is_running: {self.is_running}")

def listen_topic(self):
"""
Documentation for paho.mqtt.python: https://github.com/eclipse/paho.mqtt.python
"""
self.is_running = True
if self.is_starting:
print(f"MQTT client is already connecting to {self.get_broker_address()}")
return

self.is_starting = True

self.measuring_started_at = None
self.measuring_stopped_at = None

client = mqtt.Client()

client.on_connect = self._on_connect_callback
client.on_message = self._on_message_callback
client.on_disconnect = self._on_disconnect_callback

try:
# Enable debugging if needed
# client.on_log = on_log_callback
# client.on_disconnect = on_disconnect_callback

client.connect(self.topic_address, int(self.topic_port), MONITOR_PERIOD_IN_SECONDS)
# Enable debugging if needed
#client.on_log = self._on_log_callback

# Call that processes network traffic, dispatches callbacks and
# handles reconnecting.
client.loop_start()
client.connect_async(self.topic_address, int(self.topic_port), MQTT_KEEP_ALIVE_SECS)

except Exception as e:
print(f"Error on topic {self.topic_name} {self.topic_address} {self.topic_port}: {e}")
client.disconnect()
self.is_running = False
print(f"Connecting to MQTT broker at {self.get_broker_address()}")
# Starts thread that processes network traffic and dispatches callbacks
client.loop_start()

# The callback for when the client receives a CONNACK response from the server.
def _on_connect_callback(self, client, userdata, flags, rc):
print(f"Connected to MQTT broker at {self.get_broker_address()}")
self.is_starting = False
if rc == 0:
self.is_running = True
client.subscribe(self.topic_name)
self.measuring_started_at = time.perf_counter()
self.measuring_stopped_at = None
else:
print(f"Error on connecting {client}, is our IP whitelisted for the topic?")

# Called when MQTT is disconnected
def _on_disconnect_callback(self, client, userdata, rc):
print(f"Disconnected from {self.topic_address}, rc: {rc}")
self.measuring_stopped_at = time.perf_counter()
client.loop_stop()
self.is_running = False

# # The callback for when a PUBLISH message is received from the server.
def _on_message_callback(self, client, userdata, msg):
self.msg_count += 1
# print(msg.topic+" "+str(msg.payload))

# Enable debugging if needed
# def on_log_callback(self, client, userdata, level, buf):
# print(buf)
def get_msg_count(self):
if self.measuring_started_at == None:
print(f"No data was measured for {self.get_broker_address()} on topic {self.topic_name}. Maybe the client was not connected?")
return None

if self.measuring_stopped_at != None:
elapsed_time = self.measuring_stopped_at - self.measuring_started_at

# If data was collected for too short period, we can't accurately calculate the message rate
if elapsed_time < min(25, 10*MQTT_KEEP_ALIVE_SECS):
# Return None if elapsed_time is too small to calculate accurate result
return None

"""
Adjust elapsed_time to account for the time it took to detect that the connection was down.
This should take roughly 2 times the duration of MQTT keep alive interval.
This adjustment can cause the message rate to be slightly inflated, but this is less of a problem than too small message rate, which would cause unnecessary alerts.
"""
elapsed_time -= 2*MQTT_KEEP_ALIVE_SECS
else:
elapsed_time = time.perf_counter() - self.measuring_started_at

# Enable debugging if needed
# def on_disconnect_callback(self, client, userdata, rc):
# print("Disconnected")
if IS_DEBUG:
print(f"started: {self.measuring_started_at}, stopped: {self.measuring_started_at + elapsed_time}")
print(f"Elapsed time {elapsed_time}, messages: {self.msg_count}")

msg_per_second = self.msg_count / elapsed_time
self.msg_count = 0
self.measuring_started_at = time.perf_counter()
self.measuring_stopped_at = None

return msg_per_second

# Enable debugging if needed
# def _on_log_callback(self, client, userdata, level, buf):
# print(buf)

def main():
"""
Expand Down Expand Up @@ -105,45 +157,45 @@ def main():
topic = Topic(topic_address, topic_name, topic_port)
topic_list.append(topic)

time_end = time.time() + MONITOR_PERIOD_IN_SECONDS
for topic in topic_list:
topic.listen_topic()

time_end = time.perf_counter() + MONITOR_PERIOD_IN_SECONDS
# Keep listening to topics forever
while True:
# (Re)start threads that are in is_running == False state
for topic in topic_list:
if topic.is_running == False:
print(f"Topic {topic.topic_name} was not running, starting it.")

# Run listen_topic in a thread because we don't want to wait e.g. client.connect()
# If we didn't use threads here, one connection being stuck could cause long
# timeout for sending data to Azure.
t = Thread(target=topic.listen_topic)
t.daemon = False
t.start()

sleep_time = time_end - time.time()
print(sleep_time)
sleep_time = time_end - time.perf_counter()
print(f"Sleeping for {sleep_time} secs")

# Only sleep if sleep_time is positive. This can happen if sending data to Azure took longer than MONITOR_PERIOD_IN_SECONDS
if sleep_time > 0:
# Sleep while listen period is going, after that we send data to Azure
time.sleep(sleep_time)

# TODO: remove this logging later when not needed
print("After sleep.")
# print("After sleep.")

# Set time_end as MONITOR_PERIOD_IN_SECONDS in the future
time_end = time.time() + MONITOR_PERIOD_IN_SECONDS
time_end = time.perf_counter() + MONITOR_PERIOD_IN_SECONDS
topic_data_map = {}

# Save message counters into topic_data_map and reset them in each topic
for topic in topic_list:
topic_data_map_key = f"{topic.topic_address}:{topic.topic_name}:{topic.topic_port}"
topic_data_map[topic_data_map_key] = topic.msg_count
topic.msg_count = 0
topic_data_map_value = topic.get_msg_count()
if topic_data_map_value != None:
topic_data_map[topic_data_map_key] = topic_data_map_value

t = Thread(target=send_mqtt_msg_count_to_azure, args=(topic_data_map,))
t.start()

# (Re)start threads that are in is_running == False state
for topic in topic_list:
if topic.is_running == False:
print(f"Topic {topic.topic_name} was not running, starting it.")

topic.listen_topic()


def send_mqtt_msg_count_to_azure(topic_data_map):
"""
Send custom metrics into azure. Documentation for the required format can be found from here:
Expand All @@ -158,6 +210,9 @@ def send_mqtt_msg_count_to_azure(topic_data_map):
time_str = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")

series_array = get_series_array(topic_data_map)
if not series_array:
print("No data to send to Azure")
return

custom_metric_object = {
# Time (timestamp): Date and time at which the metric is measured or collected
Expand Down Expand Up @@ -204,8 +259,7 @@ def get_series_array(topic_data_map):
for key in topic_data_map:
topic_msg_count = topic_data_map[key]

# We want message count to be messages per second
topic_msg_count = round(topic_msg_count/MONITOR_PERIOD_IN_SECONDS, 2)
topic_msg_count = round(topic_msg_count, 2)

# If over 10, round to whole number
if topic_msg_count > 10:
Expand Down

0 comments on commit faa1e4e

Please sign in to comment.