Skip to content

Commit

Permalink
dimValue key includes topic address and port
Browse files Browse the repository at this point in the history
  • Loading branch information
niemijoe committed Aug 12, 2022
1 parent 4698a4c commit 20cad47
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 16 deletions.
27 changes: 22 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,38 @@

Collects MQTT and Pulsar data and sends it to Azure Monitor so that alerts can monitor the data and alert when needed.

### Run locally
## Run locally

Have `.env` file at the project directory containing all of the secret values (you can get secrets from a pulsar-proxy VM from pulsar-dev resource group)

### Run pulsar data collector

To run `pulsar_data_collector.py`, you need to have a tunnel open to pulsar_dev_proxy so that `ADMIN_URL` env variable points to pulsar admin's port.

Have `.env` file at the project directory containing all of the secret values (you can get secrets from a pulsar-proxy VM from pulsar-dev resource group)
and then run either:
```
python3 pulsar_data_collector.py
```
or

### Run mqtt data collector

To run `mqtt_data_collector.py`, some of the addresses might require having a tunnel open to pulsar_bastion and then listening through the tunnel.

Example of opening two tunnels into localhost ports 9001 and 9002:

```
ssh -L 9001:<topic_1_address>:<topic_2_address> -L 9002:<topic_1_address>:<topic_2_address> <pulsar_bastion_private_ip>
```

and in `.env` file, you need to have topics that require a tunnel configured like so: TOPIC<topic_index>=localhost,<topic_name>,9001
for those topics that require tunneling.

Now you can run:
```
python3 mqtt_data_collector.py
```

### Send custom metrics manually to Azure Monitor
## Send custom metrics manually to Azure Monitor

If you need to send new custom metrics to Azure Monitor,
you can firstly test sending by editing
Expand All @@ -28,6 +45,6 @@ Notes:
- Edit what you need in `custom_metric_example.json` (at least the timestamp)
- You need a fresh `access token` for this command, you can get it by running `main.py` locally (see access_token.txt file)

### Deployment
## Deployment

Deployment is done with ansible on the pulsar proxy server. In order to update this app, create a new release in github: https://github.com/HSLdevcom/transitdata-monitor-data-collector/releases/new and then run the pulsar proxy playbook.
24 changes: 13 additions & 11 deletions mqtt_data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ def listen_topic_thread(topic_data_string):
topic_port = topic_data_array[2]
if (topic_address is None or topic_name is None or topic_port is None):
raise Exception(f"Some required topic data was missing, topic_address: {topic_address}, topic_name: {topic_name}, topic_port: {topic_port}")
topic_data_collection[topic_name] = 0
listen_topic(topic_data_collection, topic_address, topic_name, topic_port)
topic_data_collection_key = f"{topic_address}:{topic_name}:{topic_port}"
topic_data_collection[topic_data_collection_key] = 0

listen_topic(topic_data_collection, topic_data_collection_key, topic_address, topic_name, topic_port)
t = Thread(target=listen_topic_thread, args=(topic_data_string,))
threads.append(t)

Expand All @@ -59,15 +61,15 @@ def listen_topic_thread(topic_data_string):
send_mqtt_msg_count_into_azure(topic_data_collection)
print(f'Mqtt metrics sent: {datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}')

def listen_topic(topic_data_collection, address, topic, port):
def listen_topic(topic_data_collection, topic_data_collection_key, address, topic, port):
"""
Documentation for paho.mqtt.python: https://github.com/eclipse/paho.mqtt.python
"""
time_end = time.time() + MONITOR_PERIOD_IN_SECONDS

client = mqtt.Client()
client.on_connect = on_connect_callback(topic)
client.on_message = on_message_callback(topic_data_collection, topic)
client.on_message = on_message_callback(topic_data_collection, topic_data_collection_key)
# Enable debugging if needed
# client.on_log = on_log_callback
# client.on_disconnect = on_disconnect_callback
Expand Down Expand Up @@ -96,10 +98,10 @@ def on_connect(client, userdata, flags, rc):
return on_connect

# # The callback for when a PUBLISH message is received from the server.
def on_message_callback(topic_data_collection, topic):
def on_message_callback(topic_data_collection, topic_data_collection_key):

def on_message(client, userdata, msg):
topic_data_collection[topic] += 1
topic_data_collection[topic_data_collection_key] += 1
# print(msg.topic+" "+str(msg.payload))

return on_message
Expand Down Expand Up @@ -152,17 +154,17 @@ def send_mqtt_msg_count_into_azure(topic_data_collection):

def get_series_array(topic_data_collection):
series_array = []
for topic_name in topic_data_collection:
topic_msg_count = topic_data_collection[topic_name]
for key in topic_data_collection:
topic_msg_count = topic_data_collection[key]

# Azure doesn't seem to like # in a dimValue, replace it with *
parsed_topic_name = topic_name.replace("#", "*")
parsed_key = key.replace("#", "*")
# Azure doesn't seem to like + in a dimValue, replace it with ^
parsed_topic_name = parsed_topic_name.replace("+", "^")
parsed_key = parsed_key.replace("+", "^")

dimValue = {
"dimValues": [
parsed_topic_name
parsed_key
],
"sum": topic_msg_count,
"count": 1
Expand Down

0 comments on commit 20cad47

Please sign in to comment.