diff --git a/src/sdx/pce/models.py b/src/sdx/pce/models.py index 3000c53b..23b7f5fb 100644 --- a/src/sdx/pce/models.py +++ b/src/sdx/pce/models.py @@ -59,3 +59,73 @@ class ConnectionSolution: connection_map: Mapping[ConnectionRequest, List[ConnectionPath]] cost: float + + +# The classess below should help us construct a breakdown of the below +# form that pertains to one domain: +# +# { +# "name": "TENET_vlan_201_203_Ampath_Tenet", +# "dynamic_backup_path": true, +# "uni_a": { +# "tag": { +# "value": 203, +# "tag_type": 1 +# }, +# "interface_id": "cc:00:00:00:00:00:00:07:41" +# }, +# "uni_z": { +# "tag": { +# "value": 201, +# "tag_type": 1 +# }, +# "interface_id": "cc:00:00:00:00:00:00:08:50" +# } +# } + + +@dataclass_json +@dataclass(frozen=True) +class VlanTag: + """ + Representation of a VLAN tag. + + TODO: document tag_type. + """ + + value: int + tag_type: int + + +@dataclass_json +@dataclass(frozen=True) +class VlanTaggedPort: + """ + Representation of a port. + """ + + tag: VlanTag + port_id: str + + +@dataclass_json +@dataclass(frozen=True) +class VlanTaggedBreakdown: + """ + Path breakdown within a single domain with VLAN assignments. + """ + + name: str + dynamic_backup_path: bool + uni_a: VlanTaggedPort + uni_z: VlanTaggedPort + + +@dataclass_json +@dataclass(frozen=True) +class VlanTaggedBreakdowns: + """ + Mapping from domain to breakdown. + """ + + breakdowns: Mapping[str, VlanTaggedBreakdown] diff --git a/src/sdx/pce/topology/temanager.py b/src/sdx/pce/topology/temanager.py index d978453b..41803ff1 100644 --- a/src/sdx/pce/topology/temanager.py +++ b/src/sdx/pce/topology/temanager.py @@ -1,3 +1,7 @@ +import threading +from itertools import chain +from typing import List, Optional + import networkx as nx from networkx.algorithms import approximation as approx @@ -7,6 +11,10 @@ ConnectionRequest, ConnectionSolution, TrafficMatrix, + VlanTag, + VlanTaggedBreakdown, + VlanTaggedBreakdowns, + VlanTaggedPort, ) from sdx.pce.topology.manager import TopologyManager @@ -21,6 +29,8 @@ class TEManager: - generate inputs to the PCE solver - converter the solver output. + + - VLAN reservation and unreservation. """ def __init__(self, topology_data, connection_data): @@ -29,12 +39,24 @@ def __init__(self, topology_data, connection_data): self.topology_manager = TopologyManager() self.connection_handler = ConnectionHandler() + # A lock to safely perform topology operations. + self._topology_lock = threading.Lock() + + # A {domain, {port, {vlan, in_use}}} mapping. + self._vlan_tags_table = {} + # Making topology_data optional while investigating # https://github.com/atlanticwave-sdx/sdx-controller/issues/145. + # # TODO: a nicer thing to do would be to keep less state around. + # https://github.com/atlanticwave-sdx/pce/issues/122 if topology_data: self.topology_manager.add_topology(topology_data) self.graph = self.generate_graph_te() + self._update_vlan_tags_table( + domain_name=topology_data.get("id"), + port_list=self.topology_manager.port_list, + ) else: self.graph = None @@ -54,6 +76,15 @@ def add_topology(self, topology_data: dict): """ self.topology_manager.add_topology(topology_data) + # Ports appear in two places in the combined topology + # maintained by TopologyManager: attached to each of the + # nodes, and attached to links. Here we are using the ports + # attached to links. + self._update_vlan_tags_table( + domain_name=topology_data.get("id"), + port_list=self.topology_manager.port_list, + ) + def update_topology(self, topology_data: dict): """ Update an existing topology in TEManager. @@ -62,6 +93,80 @@ def update_topology(self, topology_data: dict): """ self.topology_manager.update_topology(topology_data) + # TODO: careful here when updating VLAN tags table -- what do + # we do when an in use VLAN tag becomes invalid in the update? + # See https://github.com/atlanticwave-sdx/pce/issues/123 + # + # self._update_vlan_tags_table_from_links( + # domain_name=topology_data.get("id"), + # port_list=self.topology_manager.port_list, + # ) + + def _update_vlan_tags_table(self, domain_name, port_list): + """ + Update VLAN tags table. + """ + self._vlan_tags_table[domain_name] = {} + + for port_id, link in port_list.items(): + # TODO: port here seems to be a dict, not sdx.datamodel.models.Port + for port in link.ports: + # TODO: sometimes port_id and "inner" port_id below + # can be different. Why? For example, port_id of + # urn:sdx:port:amlight.net:B1:2 and port_id_inner of + # urn:sdx:port:amlight.net:B2:2. + # + # See https://github.com/atlanticwave-sdx/pce/issues/124 + # + # port_id_inner = port.get("id") + # print(f"port_id: {port_id}, port_id_inner: {port_id_inner}") + # assert port_id == port_id_inner + + label_range = port.get("label_range") + + # TODO: why is label_range sometimes None, and what to + # do when that happens? + if label_range is None: + continue + + # assert label_range is not None, "label_range is None" + + # label_range is of the form ['100-200', '1000']; let + # us expand it. Would have been ideal if this was + # already in some parsed form, but it is not, so this + # is a work-around. + all_labels = self._expand_label_range(label_range) + + # Make a map lik: `{tag1: True, tag2: True, tag3: True...}` + labels_available = {label: True for label in all_labels} + + self._vlan_tags_table[domain_name][port_id] = labels_available + + def _expand_label_range(self, label_range: List[str]) -> List[int]: + """ + Expand the label range to a list of numbers. + """ + labels = [self._expand_label(label) for label in label_range] + # flatten result and return it. + return list(chain.from_iterable(labels)) + + def _expand_label(self, label: str) -> List[int]: + """ + Expand items in label range to a list of numbers. + + Items in label ranges can be of the form "100-200" or "100". + For the first case, we return [100,101,...200]; for the second + case, we return [100]. + """ + if not isinstance(label, str): + raise ValueError("Label must be a string.") + + parts = label.split("-") + start = int(parts[0]) + stop = int(parts[-1]) + 1 + + return list(range(start, stop)) + def generate_connection_te(self) -> TrafficMatrix: """ Generate a Traffic Matrix from the connection request we have. @@ -252,11 +357,26 @@ def _generate_connection_breakdown_tm(self, connection: ConnectionSolution) -> d i = i + 1 print(f"generate_connection_breakdown(): domain_breakdown: {domain_breakdown}") - return domain_breakdown + + tagged_breakdown = self._reserve_vlan_breakdown(domain_breakdown) + print(f"generate_connection_breakdown(): tagged_breakdown: {tagged_breakdown}") + + # Make tests pass, temporarily. + if tagged_breakdown is None: + return None + + assert isinstance(tagged_breakdown, VlanTaggedBreakdowns) + + # Return a dict containing VLAN-tagged breakdown in the + # expected format. + return tagged_breakdown.to_dict().get("breakdowns") def _generate_connection_breakdown_old(self, connection): """ Take a connection and generate a breakdown. + + TODO: remove this when convenient. + https://github.com/atlanticwave-sdx/pce/issues/125 """ assert connection is not None @@ -346,3 +466,217 @@ def _generate_connection_breakdown_old(self, connection): print(f"generate_connection_breakdown(): domain_breakdown: {domain_breakdown}") return domain_breakdown + + """ + functions for vlan reservation. + + Operations are: + + - obtain the available vlan lists + + - find the vlan continuity on a path if possible. + + - find the vlan translation on the multi-domain path if + continuity not possible + + - reserve the vlan on all the ports on the path + + - unreserve the vlan when the path is removed + """ + + def _reserve_vlan_breakdown( + self, domain_breakdown: dict + ) -> Optional[VlanTaggedBreakdowns]: + """ + Upate domain breakdown with VLAN reservation information. + + This is the top-level function, to be called after + _generate_connection_breakdown_tm(), and should be a private + implementation detail. It should be always called, meaning, + the VLAN tags should be present in the final breakdown, + regardless of whether the connection request explicitly asked + for it or not. + + For this to work, TEManager should maintain a table of VLAN + allocation from each of the domains. The ones that are not in + use can be reserved, and the ones that are not in use anymore + should be returned to the pool by calling unreserve(). + + :param domain_breakdown: per port available vlan range is + pased in datamodel._parse_available_vlans(self, vlan_str) + + :return: Updated domain_breakdown with the VLAN assigned to + each port along a path, or None if failure. + """ + + # # Check if there exist a path of vlan continuity. This is + # # disabled for now, until the simple case is handled. + # selected_vlan = self.find_vlan_on_path(domain_breakdown) + # if selected_vlan is not None: + # return self._reserve_vlan_on_path(domain_breakdown, selected_vlan) + + # if not, assuming vlan translation on the domain border port + + print(f"reserve_vlan_breakdown: domain_breakdown: {domain_breakdown}") + + breakdowns = {} + + # upstream_o_vlan = "" + for domain, segment in domain_breakdown.items(): + ingress_port = segment.get("ingress_port") + egress_port = segment.get("egress_port") + + print( + f"VLAN reservation: domain: {domain}, " + f"ingress_port: {ingress_port}, egress_port: {egress_port}" + ) + + if ingress_port is None or egress_port is None: + return None + + ingress_vlan = self._reserve_vlan(domain, ingress_port) + egress_vlan = self._reserve_vlan(domain, egress_port) + + ingress_port_id = ingress_port.get("id") + egress_port_id = egress_port.get("id") + + print( + f"VLAN reservation: domain: {domain}, " + f"ingress_vlan: {ingress_vlan}, egress_vlan: {egress_vlan}" + ) + + # if one has empty vlan range, first resume reserved vlans + # in the previous domain, then return false. + if egress_vlan is None: + self._unreserve_vlan(ingress_vlan) + return None + + if ingress_vlan is None: + self._unreserve_vlan(egress_vlan) + return None + + # # vlan translation from upstream_o_vlan to i_vlan + # segment["ingress_upstream_vlan"] = upstream_o_vlan + # segment["ingress_vlan"] = ingress_vlan + # segment["egress_vlan"] = egress_vlan + # upstream_o_vlan = egress_vlan + + port_a = VlanTaggedPort( + VlanTag(value=ingress_vlan, tag_type=1), port_id=ingress_port_id + ) + port_z = VlanTaggedPort( + VlanTag(value=egress_vlan, tag_type=1), port_id=egress_port_id + ) + + # Names look like "AMLIGHT_vlan_201_202_Ampath_Tenet". We + # can form the initial part, but where did the + # `Ampath_Tenet` at the end come from? + domain_name = domain.split(":")[-1].split(".")[0].upper() + name = f"{domain_name}_vlan_{ingress_vlan}_{egress_vlan}" + + breakdowns[domain] = VlanTaggedBreakdown( + name=name, + dynamic_backup_path=True, + uni_a=port_a, + uni_z=port_z, + ) + + return VlanTaggedBreakdowns(breakdowns=breakdowns) + + def _find_vlan_on_path(self, path): + """ + Find an unused available VLAN on path. + + Finds a VLAN that's not being used at the moment on a provided + path. Returns an available VLAN if possible, None if none are + available on the submitted path. + + output: vlan_tag string or None + """ + + # TODO: implement this + # https://github.com/atlanticwave-sdx/pce/issues/126 + + assert False, "Not implemented" + + def _reserve_vlan_on_path(self, domain_breakdown, selected_vlan): + # TODO: what is the difference between reserve_vlan and + # reserve_vlan_on_path? + + # TODO: implement this + # https://github.com/atlanticwave-sdx/pce/issues/126 + + # return domain_breakdown + assert False, "Not implemented" + + def _reserve_vlan(self, domain: str, port: dict, tag=None): + # with self._topology_lock: + # pass + + port_id = port.get("id") + print(f"reserve_vlan domain: {domain} port_id: {port_id}") + + if port_id is None: + return None + + # Look up available VLAN tags by domain and port ID. + domain_table = self._vlan_tags_table.get(domain) + + if domain_table is None: + print(f"reserve_vlan domain: {domain} entry: {domain_table}") + return None + + vlan_table = domain_table.get(port_id) + + print(f"reserve_vlan domain: {domain} vlan_table: {vlan_table}") + + # TODO: figure out when vlan_table can be None + if vlan_table is None: + print(f"Can't find a mapping for domain:{domain} port:{port_id}") + return None + + available_tag = None + + if tag is None: + # Find the first available VLAN tag from the table. + for vlan_tag, vlan_available in vlan_table.items(): + if vlan_available: + available_tag = vlan_tag + else: + if vlan_table[tag] is True: + available_tag = tag + else: + return None + + if available_tag is not None: + # mark the tag as in-use. + vlan_table[available_tag] = False + + # available_tag = 200 + return available_tag + + # to be called by delete_connection() + def _unreserve_vlan_breakdown(self, break_down): + # TODO: implement this. + # https://github.com/atlanticwave-sdx/pce/issues/127 + # with self._topology_lock: + # pass + assert False, "Not implemented" + + def _unreserve_vlan(self, domain: str, port: dict, tag=None): + """ + Mark a VLAN tag as not in use. + """ + # TODO: implement this. + # https://github.com/atlanticwave-sdx/pce/issues/127 + + # with self._topology_lock: + # pass + assert False, "Not implemented" + + def _print_vlan_tags_table(self): + import pprint + + print("------ VLAN TAGS TABLE -------") + pprint.pprint(self._vlan_tags_table) + print("------------------------------") diff --git a/tests/test_te_manager.py b/tests/test_te_manager.py index 03afab12..c7924160 100644 --- a/tests/test_te_manager.py +++ b/tests/test_te_manager.py @@ -23,13 +23,9 @@ class TEManagerTests(unittest.TestCase): """ def setUp(self): - with open(TestData.TOPOLOGY_FILE_AMLIGHT, "r", encoding="utf-8") as fp: - topology_data = json.load(fp) - - with open(TestData.CONNECTION_REQ_AMLIGHT, "r", encoding="utf-8") as fp: - connection_data = json.load(fp) - - self.temanager = TEManager(topology_data, connection_data) + topology = json.loads(TestData.TOPOLOGY_FILE_AMLIGHT.read_text()) + request = json.loads(TestData.CONNECTION_REQ_AMLIGHT.read_text()) + self.temanager = TEManager(topology, request) def tearDown(self): self.temanager = None @@ -41,8 +37,9 @@ def test_generate_solver_input(self): def test_connection_breakdown_none_input(self): # Expect an error to be raised. - with self.assertRaises(AssertionError): - self.temanager.generate_connection_breakdown(None) + self.assertRaises( + AssertionError, self.temanager.generate_connection_breakdown, None + ) def test_connection_breakdown_simple(self): # Test that the old way, which had plain old dicts and arrays @@ -112,14 +109,12 @@ def test_connection_breakdown_two_similar_requests(self): def test_connection_breakdown_three_domains(self): # SDX already exists in the known topology from setUp # step. Add SAX topology. - with open(TestData.TOPOLOGY_FILE_SAX, "r", encoding="utf-8") as fp: - topology_data = json.load(fp) - self.temanager.add_topology(topology_data) + sax_topology = json.loads(TestData.TOPOLOGY_FILE_SAX.read_text()) + self.temanager.add_topology(sax_topology) # Add ZAOXI topology as well. - with open(TestData.TOPOLOGY_FILE_ZAOXI, "r", encoding="utf-8") as fp: - topology_data = json.load(fp) - self.temanager.add_topology(topology_data) + zaoxi_topology = json.loads(TestData.TOPOLOGY_FILE_ZAOXI.read_text()) + self.temanager.add_topology(zaoxi_topology) request = [ { @@ -142,36 +137,35 @@ def test_connection_breakdown_three_domains(self): self.assertEqual(len(breakdown), 3) amlight = breakdown.get("urn:ogf:network:sdx:topology:amlight.net") - print(f"amlight: {amlight}") - self.assertIsInstance(amlight, dict) - self.assertIsInstance(amlight.get("ingress_port"), dict) - self.assertIsInstance(amlight.get("egress_port"), dict) - + zaoxi = breakdown.get("urn:ogf:network:sdx:topology:zaoxi.net") sax = breakdown.get("urn:ogf:network:sdx:topology:sax.net") - print(f"sax: {sax}") - self.assertIsInstance(sax, dict) - self.assertIsInstance(sax.get("ingress_port"), dict) - self.assertIsInstance(sax.get("egress_port"), dict) - zaoxi = breakdown.get("urn:ogf:network:sdx:topology:zaoxi.net") - print(f"zaoxi: {zaoxi}") - self.assertIsInstance(zaoxi, dict) - self.assertIsInstance(zaoxi.get("ingress_port"), dict) - self.assertIsInstance(zaoxi.get("egress_port"), dict) + for segment in [zaoxi, sax, amlight]: + self.assertIsInstance(segment, dict) + self.assertIsInstance(segment.get("name"), str) + self.assertIsInstance(segment.get("dynamic_backup_path"), bool) + self.assertIsInstance(segment.get("uni_a"), dict) + self.assertIsInstance(segment.get("uni_a").get("tag"), dict) + self.assertIsInstance(segment.get("uni_a").get("tag").get("value"), int) + self.assertIsInstance(segment.get("uni_a").get("tag").get("tag_type"), int) + self.assertIsInstance(segment.get("uni_a").get("port_id"), str) + self.assertIsInstance(segment.get("uni_z"), dict) + self.assertIsInstance(segment.get("uni_z").get("tag"), dict) + self.assertIsInstance(segment.get("uni_z").get("tag").get("value"), int) + self.assertIsInstance(segment.get("uni_z").get("tag").get("tag_type"), int) + self.assertIsInstance(segment.get("uni_z").get("port_id"), str) def test_connection_breakdown_three_domains_sax_connection(self): """ Test case added to investigate https://github.com/atlanticwave-sdx/sdx-controller/issues/146 """ - with open(TestData.TOPOLOGY_FILE_SAX, "r", encoding="utf-8") as fp: - topology_data = json.load(fp) - self.temanager.add_topology(topology_data) + sax_topology = json.loads(TestData.TOPOLOGY_FILE_SAX.read_text()) + self.temanager.add_topology(sax_topology) # Add ZAOXI topology as well. - with open(TestData.TOPOLOGY_FILE_ZAOXI, "r", encoding="utf-8") as fp: - topology_data = json.load(fp) - self.temanager.add_topology(topology_data) + zaoxi_topology = json.loads(TestData.TOPOLOGY_FILE_ZAOXI.read_text()) + self.temanager.add_topology(zaoxi_topology) request = [ { @@ -225,15 +219,10 @@ def test_generate_graph_and_connection_with_sax_2_invalid(self): TODO: Use a better name for this method. """ - with open(TestData.TOPOLOGY_FILE_SAX_2, "r", encoding="utf-8") as fp: - topology_data = json.load(fp) + topology = json.loads(TestData.TOPOLOGY_FILE_SAX_2.read_text()) + request = json.loads(TestData.CONNECTION_REQ_FILE_SAX_2_INVALID.read_text()) - with open( - TestData.CONNECTION_REQ_FILE_SAX_2_INVALID, "r", encoding="utf-8" - ) as fp: - connection_data = json.load(fp) - - temanager = TEManager(topology_data, connection_data) + temanager = TEManager(topology, request) self.assertIsNotNone(temanager) graph = temanager.generate_graph_te() @@ -253,15 +242,10 @@ def test_generate_graph_and_connection_with_sax_2_valid(self): TODO: Use a better name for this method. """ - with open(TestData.TOPOLOGY_FILE_SAX_2, "r", encoding="utf-8") as fp: - topology_data = json.load(fp) - - with open( - TestData.CONNECTION_REQ_FILE_SAX_2_VALID, "r", encoding="utf-8" - ) as fp: - connection_data = json.load(fp) + topology = json.loads(TestData.TOPOLOGY_FILE_SAX_2.read_text()) + request = json.loads(TestData.CONNECTION_REQ_FILE_SAX_2_VALID.read_text()) - temanager = TEManager(topology_data, connection_data) + temanager = TEManager(topology, request) self.assertIsNotNone(temanager) graph = temanager.generate_graph_te() @@ -330,9 +314,45 @@ def test_connection_amlight_to_zaoxi(self): # Note that the "domain" key is correct in the breakdown # result when we initialize TEManager with None for topology, # and later add individual topologies with add_topology(). - self.assertIsNotNone(breakdown.get("urn:ogf:network:sdx:topology:zaoxi.net")) - self.assertIsNotNone(breakdown.get("urn:ogf:network:sdx:topology:sax.net")) - self.assertIsNotNone(breakdown.get("urn:ogf:network:sdx:topology:amlight.net")) + zaoxi = breakdown.get("urn:ogf:network:sdx:topology:zaoxi.net") + sax = breakdown.get("urn:ogf:network:sdx:topology:sax.net") + amlight = breakdown.get("urn:ogf:network:sdx:topology:amlight.net") + + # Per https://github.com/atlanticwave-sdx/pce/issues/101, each + # breakdown should be of the below form: + # + # { + # "name": "TENET_vlan_201_203_Ampath_Tenet", + # "dynamic_backup_path": true, + # "uni_a": { + # "tag": { + # "value": 203, + # "tag_type": 1 + # }, + # "interface_id": "cc:00:00:00:00:00:00:07:41" + # }, + # "uni_z": { + # "tag": { + # "value": 201, + # "tag_type": 1 + # }, + # "interface_id": "cc:00:00:00:00:00:00:08:50" + # } + # } + for segment in [zaoxi, sax, amlight]: + self.assertIsInstance(segment, dict) + self.assertIsInstance(segment.get("name"), str) + self.assertIsInstance(segment.get("dynamic_backup_path"), bool) + self.assertIsInstance(segment.get("uni_a"), dict) + self.assertIsInstance(segment.get("uni_a").get("tag"), dict) + self.assertIsInstance(segment.get("uni_a").get("tag").get("value"), int) + self.assertIsInstance(segment.get("uni_a").get("tag").get("tag_type"), int) + self.assertIsInstance(segment.get("uni_a").get("port_id"), str) + self.assertIsInstance(segment.get("uni_z"), dict) + self.assertIsInstance(segment.get("uni_z").get("tag"), dict) + self.assertIsInstance(segment.get("uni_z").get("tag").get("value"), int) + self.assertIsInstance(segment.get("uni_z").get("tag").get("tag_type"), int) + self.assertIsInstance(segment.get("uni_z").get("port_id"), str) def test_connection_amlight_to_zaoxi_with_merged_topology(self): """