Skip to content

Commit

Permalink
Try sending MQTT msg count data multiple times
Browse files Browse the repository at this point in the history
  • Loading branch information
niemijoe committed Sep 12, 2022
1 parent 39ca1c1 commit b36fece
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 33 deletions.
47 changes: 19 additions & 28 deletions mqtt_data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,30 +141,8 @@ def main():
topic_data_map[topic_data_map_key] = topic.msg_count
topic.msg_count = 0

try:
with time_limit(10):
send_mqtt_msg_count_to_azure(topic_data_map)
except TimeoutException as e:
print("Sending data to Azure timed out.")
# TODO: remove this logging later when not needed
print("Sent mqtt msg count to Azure.")

class TimeoutException(Exception): pass

@contextmanager
def time_limit(seconds):
"""
Puts a timer to a function call so that it won't hang forever and stop execution of the script.
Taken from https://stackoverflow.com/a/601168/4282381
"""
def signal_handler(signum, frame):
raise TimeoutException("Timed out!")
signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
t = Thread(target=send_mqtt_msg_count_to_azure, args=(topic_data_map,))
t.start()

def send_mqtt_msg_count_to_azure(topic_data_map):
"""
Expand All @@ -177,13 +155,13 @@ def send_mqtt_msg_count_to_azure(topic_data_map):
"""

# Azure wants time in UTC ISO 8601 format
time = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
time_str = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")

series_array = get_series_array(topic_data_map)

custom_metric_object = {
# Time (timestamp): Date and time at which the metric is measured or collected
"time": time,
"time": time_str,
"data": {
"baseData": {
# Metric (name): name of the metric
Expand All @@ -205,8 +183,21 @@ def send_mqtt_msg_count_to_azure(topic_data_map):
if IS_DEBUG:
print(custom_metric_json)
else:
send_custom_metrics_request(custom_metric_json, attempts_remaining=3)
print(f"Mqtt metrics sent: {datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}")
# Try sending data to Azure multiple times, wait between attempts
is_ok = send_custom_metrics_request(custom_metric_json=custom_metric_json, attempts_remaining=3)
if is_ok == False:
print("Sending data to Azure failed, trying again in 5 minutes.")
time.sleep(300) # Wait 5 minutes before the next attempt
is_ok = send_custom_metrics_request(custom_metric_json=custom_metric_json, attempts_remaining=3)
if is_ok == False:
print("Sending data to Azure failed, trying again in 10 minutes.")
time.sleep(600) # Wait 10 minutes before the next attempt
is_ok = send_custom_metrics_request(custom_metric_json=custom_metric_json, attempts_remaining=3)

if is_ok:
print(f"Mqtt metrics sent: {datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}")
else:
print("Failed to send metrics to Azure.")

def get_series_array(topic_data_map):
series_array = []
Expand Down
19 changes: 14 additions & 5 deletions send_data_to_azure_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@
### SECRETS / ENV VARIABLES ###

def send_custom_metrics_request(custom_metric_json, attempts_remaining):
"""
Sends custom metrics request to Azure. Tries to send as many times as given attempts_remaining.
When sending is successful, returns True, otherwise returns False
"""

# Exit if number of attempts reaches zero.
if attempts_remaining == 0:
return
return False

attempts_remaining = attempts_remaining - 1

make_sure_access_token_file_exists()
Expand All @@ -29,28 +35,31 @@ def send_custom_metrics_request(custom_metric_json, attempts_remaining):

request_url = f'https://westeurope.monitoring.azure.com/{MONITOR_DATA_COLLECTOR_RESOURCE_ID}/metrics'
headers = {'Content-type': 'application/json', 'Authorization': f'Bearer {existing_access_token}'}
response = requests.post(request_url, data=custom_metric_json, headers=headers)
response = requests.post(request_url, data=custom_metric_json, headers=headers, timeout=10)

# Return if response is successful
if response.status_code == 200:
return
return True

# Try catch because json.loads(response.text) might not be available
try:
response_dict = json.loads(response.text)
if response_dict['Error']['Code'] == 'TokenExpired':
print("Currently stored access token has expired, getting a new access token.")
request_new_access_token_and_write_it_on_disk()
send_custom_metrics_request(custom_metric_json, attempts_remaining)
return send_custom_metrics_request(custom_metric_json, attempts_remaining)
elif response_dict['Error']['Code'] == 'InvalidToken':
print("Currently stored access token is invalid, getting a new access token.")
request_new_access_token_and_write_it_on_disk()
send_custom_metrics_request(custom_metric_json, attempts_remaining)
return send_custom_metrics_request(custom_metric_json, attempts_remaining)
else:
print(f'Request failed for an unknown reason, response: {response_dict}.')
except Exception as e:
print(f'Request failed for an unknown reason, response: {response}.')

print("Returning False as sending data to Azure was not successful.")
return False

def make_sure_access_token_file_exists():
try:
f = open(ACCESS_TOKEN_PATH, "r")
Expand Down

0 comments on commit b36fece

Please sign in to comment.