Skip to content

Commit

Permalink
Merge pull request #209 from atlanticwave-sdx/102.place-connection-tests
Browse files Browse the repository at this point in the history
Test `place_connection()` more
  • Loading branch information
sajith authored Mar 18, 2024
2 parents 0ba5a60 + 5f39dca commit ca5d85a
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 118 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:
- "3.8"
- "3.9"
- "3.10"
- "3.11"

steps:

Expand Down
9 changes: 3 additions & 6 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ services:
mongodb:
# See https://hub.docker.com/_/mongo/ for documentation about
# MongoDB Docker images.
image: mongo:7.0.5
image: mongo:7.0.5
ports:
- ${MONGO_PORT}:${MONGO_PORT}
environment:
Expand All @@ -21,11 +21,8 @@ services:
- type: volume
source: mongodb
target: /data/db:Z
# healthcheck:
# test: ["CMD", "mongo", "--eval", "db.version()"]
# interval: 10s
# timeout: 5s
# retries: 3
healthcheck:
test: ["CMD", "mongosh", "localhost:${MONGO_PORT}/test", "--quiet"]

sdx-controller:
image: sdx-controller
Expand Down
14 changes: 11 additions & 3 deletions sdx_controller/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from queue import Queue

import connexion
from sdx_pce.topology.manager import TopologyManager
from sdx_pce.topology.temanager import TEManager

from sdx_controller import encoder
from sdx_controller.messaging.rpc_queue_consumer import RpcConsumer
Expand All @@ -19,10 +19,9 @@ def create_rpc_thread(app):
"""
Start a thread to get items off the message queue.
"""
topology_manager = TopologyManager()
thread_queue = Queue()

app.rpc_consumer = RpcConsumer(thread_queue, "", topology_manager)
app.rpc_consumer = RpcConsumer(thread_queue, "", app.te_manager)
rpc_thread = threading.Thread(
target=app.rpc_consumer.start_sdx_consumer,
kwargs={"thread_queue": thread_queue, "db_instance": app.db_instance},
Expand Down Expand Up @@ -64,6 +63,15 @@ def create_app(run_listener: bool = True):
app.db_instance = DbUtils()
app.db_instance.initialize_db()

# Get a handle to PCE.
app.te_manager = TEManager(topology_data=None)

# TODO: This is a hack, until we find a better way to get a handle
# to TEManager from Flask current_app, which are typically
# available to request handlers. There must be a better way to
# pass this around.
app.app.te_manager = app.te_manager

if run_listener:
create_rpc_thread(app)
else:
Expand Down
28 changes: 4 additions & 24 deletions sdx_controller/controllers/connection_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging

import connexion
from flask import current_app

from sdx_controller.handlers.connection_handler import ConnectionHandler
from sdx_controller.utils.db_utils import DbUtils
Expand All @@ -20,29 +21,6 @@
connection_handler = ConnectionHandler(db_instance)


def is_json(myjson):
try:
json.loads(myjson)
except ValueError:
return False
return True


def find_between(s, first, last):
"""
Find the substring of `s` that is betwen `first` and `last`.
"""
if s is None or first is None or last is None:
return None

try:
start = s.index(first) + len(first)
end = s.index(last, start)
return s[start:end]
except ValueError:
return ""


def delete_connection(connection_id):
"""
Delete connection order by ID.
Expand Down Expand Up @@ -87,7 +65,9 @@ def place_connection(body):
db_instance.add_key_value_pair_to_db("connection_data", json.dumps(body))
logger.info("Saving to database complete.")

reason, code = connection_handler.place_connection(body)
logger.info(f"Handling request with te_manager: {current_app.te_manager}")

reason, code = connection_handler.place_connection(current_app.te_manager, body)
logger.info(f"place_connection result: reason='{reason}', code={code}")

return reason, code
19 changes: 3 additions & 16 deletions sdx_controller/controllers/topology_controller.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import json

from flask import current_app
from sdx_pce.topology.grenmlconverter import GrenmlConverter
from sdx_pce.topology.manager import TopologyManager

from sdx_controller.utils.db_utils import DbUtils

# Get DB connection and tables set up.
db_instance = DbUtils()
db_instance.initialize_db()
manager = TopologyManager()


def get_topology(): # noqa: E501
Expand Down Expand Up @@ -41,18 +38,8 @@ def get_topologyby_grenml(): # noqa: E501
:rtype: Topology
"""

num_domain_topos = 0

if db_instance.read_from_db("num_domain_topos") is not None:
num_domain_topos = db_instance.read_from_db("num_domain_topos")

for i in range(1, int(num_domain_topos) + 1):
curr_topo_str = db_instance.read_from_db("LC-" + str(i))
curr_topo_json = json.loads(curr_topo_str)
manager.add_topology(curr_topo_json)

converter = GrenmlConverter(manager.get_topology())
topology = current_app.te_manager.get_topology()
converter = GrenmlConverter(topology)
converter.read_topology()
return converter.get_xml_str()

Expand Down
58 changes: 14 additions & 44 deletions sdx_controller/handlers/connection_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ def _send_breakdown_to_lc(self, breakdown, connection_request):
# leading up to this point were successful.
return "Connection published", 200

def place_connection(self, connection_request: dict) -> Tuple[str, int]:
def place_connection(
self, te_manager: TEManager, connection_request: dict
) -> Tuple[str, int]:
"""
Do the actual work of creating a connection.
Expand All @@ -96,51 +98,14 @@ def place_connection(self, connection_request: dict) -> Tuple[str, int]:
Note that we can return early if things fail. Return value is
a tuple of the form (reason, HTTP code).
"""
num_domain_topos = 0

if self.db_instance.read_from_db("num_domain_topos"):
num_domain_topos = self.db_instance.read_from_db("num_domain_topos")[
"num_domain_topos"
]

# Initializing TEManager with `None` topology data is a
# work-around for
# https://github.com/atlanticwave-sdx/sdx-controller/issues/145
temanager = TEManager(topology_data=None)
lc_domain_topo_dict = {}

# Read LC-1, LC-2, LC-3, and LC-4 topologies because of
# https://github.com/atlanticwave-sdx/sdx-controller/issues/152
for i in range(1, int(num_domain_topos) + 2):
lc = f"LC-{i}"
logger.debug(f"Reading {lc} from DB")
curr_topo = self.db_instance.read_from_db(lc)
if curr_topo is None:
logger.debug(f"Read {lc} from DB: {curr_topo}")
continue
else:
# Get the actual thing minus the Mongo ObjectID.
curr_topo_str = curr_topo.get(lc)
# Just log a substring, not the whole thing.
logger.debug(f"Read {lc} from DB: {curr_topo_str[0:50]}...")

curr_topo_json = json.loads(curr_topo_str)
lc_domain_topo_dict[curr_topo_json["domain_name"]] = curr_topo_json[
"lc_queue_name"
]
logger.debug(
f"Adding #{i} topology {curr_topo_json.get('id')} to TEManager"
)
temanager.add_topology(curr_topo_json)

for num, val in enumerate(temanager.get_topology_map().values()):
for num, val in enumerate(te_manager.get_topology_map().values()):
logger.info(f"TE topology #{num}: {val}")

graph = temanager.generate_graph_te()
graph = te_manager.generate_graph_te()
if graph is None:
return "Could not generate a graph", 400

traffic_matrix = temanager.generate_traffic_matrix(
traffic_matrix = te_manager.generate_traffic_matrix(
connection_request=connection_request
)
if traffic_matrix is None:
Expand All @@ -155,9 +120,14 @@ def place_connection(self, connection_request: dict) -> Tuple[str, int]:
if solution is None or solution.connection_map is None:
return "Could not solve the request", 400

breakdown = temanager.generate_connection_breakdown(solution)
self._send_breakdown_to_lc(breakdown, connection_request)
return "Successfully placed connection", 200
try:
breakdown = te_manager.generate_connection_breakdown(solution)
status, code = self._send_breakdown_to_lc(breakdown, connection_request)
logger.debug(f"Breakdown status: {status}, code: {code}")
return status, code
except Exception as e:
logger.debug(f"Error when generating/publishing breakdown: {e}")
return f"Error: {e}", 400

def handle_link_failure(self, msg_json):
logger.debug("---Handling connections that contain failed link.---")
Expand Down
14 changes: 9 additions & 5 deletions sdx_controller/handlers/lc_message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@


class LcMessageHandler:
def __init__(self, db_instance, manager):
def __init__(self, db_instance, te_manager):
self.db_instance = db_instance
self.manager = manager
self.te_manager = te_manager
self.parse_helper = ParseHelper()
self.connection_handler = ConnectionHandler(db_instance)

Expand Down Expand Up @@ -43,7 +43,7 @@ def process_lc_json_msg(
if domain_name in domain_list:
logger.info("Updating topo")
logger.debug(msg_json)
self.manager.update_topology(msg_json)
self.te_manager.update_topology(msg_json)
if "link_failure" in msg_json:
logger.info("Processing link failure.")
self.connection_handler.handle_link_failure(msg_json)
Expand All @@ -53,7 +53,7 @@ def process_lc_json_msg(
self.db_instance.add_key_value_pair_to_db("domain_list", domain_list)

logger.info("Adding topo")
self.manager.add_topology(msg_json)
self.te_manager.add_topology(msg_json)

if self.db_instance.read_from_db("num_domain_topos") is None:
num_domain_topos = 1
Expand All @@ -71,7 +71,11 @@ def process_lc_json_msg(
db_key = "LC-" + str(num_domain_topos)
self.db_instance.add_key_value_pair_to_db(db_key, json.dumps(msg_json))

latest_topo = json.dumps(self.manager.get_topology().to_dict())
# TODO: use TEManager API directly; but TEManager does not
# expose a `get_topology()` method yet.
latest_topo = json.dumps(
self.te_manager.topology_manager.get_topology().to_dict()
)
# use 'latest_topo' as PK to save latest topo to db
self.db_instance.add_key_value_pair_to_db("latest_topo", latest_topo)
logger.info("Save to database complete.")
11 changes: 6 additions & 5 deletions sdx_controller/messaging/rpc_queue_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


class RpcConsumer(object):
def __init__(self, thread_queue, exchange_name, topology_manager):
def __init__(self, thread_queue, exchange_name, te_manager):
self.logger = logging.getLogger(__name__)
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=MQ_HOST)
Expand All @@ -30,7 +30,7 @@ def __init__(self, thread_queue, exchange_name, topology_manager):
self.channel.queue_declare(queue=SUB_QUEUE)
self._thread_queue = thread_queue

self.manager = topology_manager
self.te_manager = te_manager

self._exit_event = threading.Event()

Expand Down Expand Up @@ -61,11 +61,12 @@ def start_consumer(self):
def start_sdx_consumer(self, thread_queue, db_instance):
MESSAGE_ID = 0
HEARTBEAT_ID = 0
rpc = RpcConsumer(thread_queue, "", self.manager)

rpc = RpcConsumer(thread_queue, "", self.te_manager)
t1 = threading.Thread(target=rpc.start_consumer, args=(), daemon=True)
t1.start()

lc_message_handler = LcMessageHandler(db_instance, self.manager)
lc_message_handler = LcMessageHandler(db_instance, self.te_manager)
parse_helper = ParseHelper()

latest_topo = {}
Expand Down Expand Up @@ -103,7 +104,7 @@ def start_sdx_consumer(self, thread_queue, db_instance):
# Get the actual thing minus the Mongo ObjectID.
topology = topology[db_key]
topo_json = json.loads(topology)
self.manager.add_topology(topo_json)
self.te_manager.add_topology(topo_json)
logger.debug(f"Read {db_key}: {topology}")

while not self._exit_event.is_set():
Expand Down
25 changes: 23 additions & 2 deletions sdx_controller/test/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import logging
import os

import connexion
from flask_testing import TestCase

try:
# Use stdlib modules with Python > 3.8.
from importlib.resources import files
except ImportError:
# Use compatibility library with Python 3.8.
from importlib_resources import files

from sdx_controller import create_app


Expand All @@ -14,4 +19,20 @@ def create_app(self):
# RabbitMQ insteance just for testing, since the test suite
# doesn't use a message queue right now.
app = create_app(run_listener=True if os.getenv("MQ_HOST") else False)

# We need a handle to the TEManager instance in tests, but
# accessing it this way feels like a work-around. There must
# be a better way to get a handle to TEManager.
self.te_manager = app.te_manager

return app.app


class TestData:
TOPOLOGY_DIR = files("sdx_datamodel") / "data" / "topologies"
TOPOLOGY_FILE_ZAOXI = TOPOLOGY_DIR / "zaoxi.json"
TOPOLOGY_FILE_SAX = TOPOLOGY_DIR / "sax.json"
TOPOLOGY_FILE_AMLIGHT = TOPOLOGY_DIR / "amlight.json"

REQUESTS_DIR = files("sdx_datamodel") / "data" / "requests"
CONNECTION_REQ = REQUESTS_DIR / "test_request.json"
Loading

0 comments on commit ca5d85a

Please sign in to comment.