diff --git a/calvin/actorstore/systemactors/net/MQTTDynamicSubscriber.py b/calvin/actorstore/systemactors/net/MQTTDynamicSubscriber.py index 3deacf75a..d0d811615 100644 --- a/calvin/actorstore/systemactors/net/MQTTDynamicSubscriber.py +++ b/calvin/actorstore/systemactors/net/MQTTDynamicSubscriber.py @@ -35,57 +35,101 @@ class MQTTDynamicSubscriber(Actor): "auth": { "username": }, "will": { "topic": , "payload": }, "transport": , + "clean_session": } Input: client_id : MQTT client ID uri : MQTT broker URI (format: schema://host:port) + cmd : command keyword ('subscribe'|'unsubscribe') topic : topic to subscribe to qos : MQTT qos Output: message : dictionary {"topic": , "payload": , "client_id": } """ + CMD_SUBSCRIBE = "subscribe" + CMD_UNSUBSCRIBE = "unsubscribe" - @manage(['settings', 'mqtt_dict']) + @manage(['settings', 'layout']) def init(self, settings): if not settings: settings = {} self.settings = settings + self.layout = {} + self.setup() + + def setup(self): self.mqtt_dict = {} + self.queue = [] + + def will_migrate(self): + for mqtt in self.mqtt_dict.itervalues(): + calvinsys.close(mqtt) + self.mqtt_dict.clear() + + def did_migrate(self): + self.setup() + layout = self.layout + self.layout = {} + for client_id, client_details in layout.iteritems(): + details = dict(client_details) + uri = details["uri"] + for topic, qos in dict(details["topics"]).iteritems(): + self._update_topic(client_id, uri, self.CMD_SUBSCRIBE, topic, qos) """ - Read first available MQTT client message + Read all available MQTT clients for messages and store them in a FIFO queue + The reader will only read the first message in the queue. + + @note The rest of the messages are expected at the next readings """ - @stateguard(lambda self: + @stateguard(lambda self: self.queue or any(calvinsys.can_read(mqtt) for mqtt in self.mqtt_dict.itervalues())) @condition(action_output=['message']) def read_message(self): - client_id, mqtt = next((client_id, mqtt) - for (client_id, mqtt) in self.mqtt_dict.iteritems() - if calvinsys.can_read(mqtt)) - message = calvinsys.read(mqtt) - # add client id to the message - message["client_id"] = client_id + message = "" + for (client_id, mqtt) in self.mqtt_dict.iteritems(): + if calvinsys.can_read(mqtt): + message = calvinsys.read(mqtt) + # add client id to the message + message["client_id"] = client_id + self.queue.append(message) + if self.queue: + message = self.queue.pop(0) return (message,) """ Update MQTT subscribed topics for specific MQTT client """ - @condition(action_input=['client_id', 'uri', 'topic', 'qos']) - def update_topic(self, client_id, uri, topic, qos): - if (topic is None): + @condition(action_input=['client_id', 'uri', 'cmd', 'topic', 'qos']) + def update_topic(self, client_id, uri, cmd, topic, qos): + self._update_topic(client_id, uri, cmd, topic, qos) + + def _update_topic(self, client_id, uri, cmd, topic, qos): + if not topic: _log.warning("Topic is missing!") return - if (not client_id in self.mqtt_dict.keys()): + if not client_id in self.mqtt_dict.keys(): self.mqtt_dict[client_id] = calvinsys.open(self, "mqtt.subscribe", + client_id=client_id, topics=[topic], uri=uri, qos=qos, **self.settings) - calvinsys.write(self.mqtt_dict[client_id], {"topic":topic, "qos":qos}) + if self.mqtt_dict[client_id]: + self.layout[client_id] = {"uri": uri, "topics":{}} + + success = calvinsys.write(self.mqtt_dict[client_id], + {"cmd": cmd, "topic":topic, "qos":qos}) + exist = topic in dict(self.layout[client_id]["topics"]).iterkeys() + if success: + if cmd == self.CMD_SUBSCRIBE and not exist: + self.layout[client_id]["topics"][topic] = qos + elif cmd == self.CMD_UNSUBSCRIBE: + del self.layout[client_id]["topics"][topic] action_priority = (update_topic, read_message) requires = ['mqtt.subscribe'] diff --git a/calvin/actorstore/tests/test_actors.py b/calvin/actorstore/tests/test_actors.py index 1757a742a..36ed5b27d 100644 --- a/calvin/actorstore/tests/test_actors.py +++ b/calvin/actorstore/tests/test_actors.py @@ -56,7 +56,7 @@ def can_write(self, ref): def write(self, ref, data): obj = self._get_capability_object(ref) - obj.write(data) + return obj.write(data) def can_read(self, ref): obj = self._get_capability_object(ref) @@ -104,6 +104,7 @@ def init_done(self, actor_name): if obj: obj.start_verifying_calvinsys() + def fwrite(port, value): if isinstance(value, Token): port.queue.write(value, None) @@ -126,6 +127,7 @@ def pwrite(actor, portname, value): # TODO make all actors' test compliant and change to raise exception pass + def pread(actor, portname, number=1): port = actor.outports.get(portname, None) assert port @@ -138,7 +140,7 @@ def pread(actor, portname, number=1): available = -9999 raise AssertionError("Too few tokens available, %d, expected %d" % (available, number)) else: - if pavailable(actor, portname, number+1): + if pavailable(actor, portname, number + 1): raise AssertionError("Too many tokens available, expected %d" % number) values = [port.queue.peek(actor.id).value for _ in range(number)] @@ -177,9 +179,10 @@ def __init__(self, port): def is_connected(self): return True + def setup_calvinlib(): import calvin.runtime.north.calvinlib as calvinlib - calvinlib.TESTING=True + calvinlib.TESTING = True from calvin.runtime.north.calvinlib import get_calvinlib lib = get_calvinlib() lib.init(capabilities={ @@ -262,7 +265,6 @@ def setup_calvinsys(): "attributes": {"data": [False, True, False, True]} }, - "io.buzzer": { "module": "mock.MockOutput", "attributes": {} @@ -410,6 +412,10 @@ def setup_calvinsys(): "weather.local": { "module": "mock.MockInputOutput", "attributes": {"data": ["dummy"]} + }, + "mqtt.subscribe": { + "module": "web.mqtt.Subscribe", + "attributes": {} } }) return sys @@ -417,9 +423,9 @@ def setup_calvinsys(): def teardown_calvinlib(): import calvin.runtime.north.calvinlib as calvinlib - calvinlib.TESTING=False + calvinlib.TESTING = False del calvinlib._calvinlib - calvinlib._calvinlib=None + calvinlib._calvinlib = None def teardown_calvinsys(): @@ -523,7 +529,7 @@ def test_actor(self, actor, aut): f(aut) except Exception as e: print "Actor %s failed during setup of test %d: %s" % (actor, test_index, e.message) - raise Exception("Failed during setup of test %d" % (test_index, )) + raise Exception("Failed during setup of test %d" % (test_index,)) for port, values in inputs.iteritems(): pwrite(aut, port, values) @@ -547,7 +553,7 @@ def test_actor(self, actor, aut): raise AssertionError("Failed test %d" % (test_index,)) if not all(f(aut) for f in postconds): - raise AssertionError("Failed post condition of test %d" % (test_index, )) + raise AssertionError("Failed post condition of test %d" % (test_index,)) return True @@ -573,6 +579,7 @@ def test_actors(self): return {'pass': test_pass, 'fail': test_fail, 'skipped': no_test, 'errors': self.illegal_actors, 'components': self.components} + def merge_results(result1, result2): result = {} for k in result1.keys(): @@ -588,7 +595,7 @@ def merge_results(result1, result2): def show_result(header, result): print header for actor in result: - print " %s" % (actor, ) + print " %s" % (actor,) def show_issue(header, result): @@ -640,7 +647,7 @@ def test_actors(actor="", show=False, path=None): show_issues(results) if results['errors']: - raise Exception("%d actor(s) had errors" % (len(results['errors']), )) + raise Exception("%d actor(s) had errors" % (len(results['errors']),)) if results['fail']: raise Exception("%d actor(s) failed tests" % (len(results['fail']),)) diff --git a/calvin/runtime/north/calvinsys.py b/calvin/runtime/north/calvinsys.py index b5a0e9870..119730bd9 100644 --- a/calvin/runtime/north/calvinsys.py +++ b/calvin/runtime/north/calvinsys.py @@ -165,7 +165,7 @@ def write(self, ref, data): obj = self._get_capability_object(ref) try: validate(data, obj.write_schema) - obj.write(data) + return obj.write(data) except Exception as e: _log.exception("Failed to validate schema for {}.write(), exception={}".format(obj.__class__.__name__, e)) @@ -206,7 +206,7 @@ def open(self, capability_name, actor, **kwargs): if len(csobjects) == 0: idx = 0 else : - idx = int(csobjects[-1].rsplit('#', 1)[1])+1 + idx = int(csobjects[-1].rsplit('#', 1)[1]) + 1 ref = "{}#{}".format(actor.id, idx) self._objects[ref] = {"name": capability_name, "obj": obj, "args": kwargs} @@ -236,7 +236,7 @@ def serialize(self, actor): serz = {} for ref in references: csobj = self._objects.get(ref) - state = csobj["obj"].serialize() # serialize object + state = csobj["obj"].serialize() # serialize object serz[ref] = {"name": csobj["name"], "obj": state, "args": csobj["args"]} return serz diff --git a/calvin/tests/simple_migration_tests/mqtt_subscriber.calvin b/calvin/tests/simple_migration_tests/mqtt_subscriber.calvin new file mode 100644 index 000000000..8381497b1 --- /dev/null +++ b/calvin/tests/simple_migration_tests/mqtt_subscriber.calvin @@ -0,0 +1,25 @@ +define QOS = 1 +define SETTINGS = {"clean_session": "False"} + +client_id : std.Constantify(constant="dummy_id") +uri : std.Constantify(constant="mqtt://test.mosquitto.org:1883") +cmd : std.Trigger(data="subscribe", tick=1.0) +topic : std.Iterate() +topics : std.Constantify(constant=["$SYS/#"]) +qos : std.Constantify(constant=1) +subscriber : net.MQTTDynamicSubscriber(settings=SETTINGS) +printer : io.Print() +terminator : flow.Terminator() + +"dummy_id" > client_id.in +"mqtt://test.mosquitto.org:1883" > uri.in +["$SYS/#"] > topics.in +QOS > qos.in +client_id.out > subscriber.client_id +uri.out > subscriber.uri +topics.out > topic.token +topic.item > subscriber.topic +qos.out > subscriber.qos +cmd.data > subscriber.cmd +subscriber.message > printer.token +topic.index > terminator.void diff --git a/calvinextras/calvinsys/web/mqtt/Subscribe.py b/calvinextras/calvinsys/web/mqtt/Subscribe.py index ef7354ae5..adb3bb141 100644 --- a/calvinextras/calvinsys/web/mqtt/Subscribe.py +++ b/calvinextras/calvinsys/web/mqtt/Subscribe.py @@ -21,6 +21,9 @@ from calvin.runtime.south.calvinsys import base_calvinsys_object from calvin.utilities.calvinlogger import get_logger +from collections import deque +from grpc._channel import _unsubscribe + _log = get_logger(__name__) @@ -116,7 +119,7 @@ class Subscribe(base_calvinsys_object.BaseCalvinsysObject): "type": "array", "items": { "type": "string", - "minItems": 1 + "minItems": 0 } }, "payload_only": { @@ -146,27 +149,57 @@ class Subscribe(base_calvinsys_object.BaseCalvinsysObject): } can_write_schema = { - "description": "Does nothing, always return true", + "description": "Always return true, allowing configuration of MQTT client", "type": "boolean" } write_schema = { - "description": "Does nothing" + "description": "Update topic subscriptions", + "type": "object", + "properties": { + "topic": { + "type": "string" + }, + "qos": { + "description": "The message Quality of Service 0, 1 or 2" + "The default value is 0", + "type": "integer" + }, + "cmd": { + "description": "MQTT topic related command" + "The default value is 'subscribe'", + "type": "string", + "enum": ["subscribe", "unsubscribe"] + } + }, + "required": ["topic"] + } + CMD_SUBSCRIBE = "subscribe" + CMD_UNSUBSCRIBE = "unsubscribe" - def init(self, topics, uri=None, hostname=None, port=1883, qos=0, client_id='', will=None, auth=None, tls=None, transport='tcp', payload_only=False, **kwargs): + def init(self, topics=[], uri=None, hostname=None, port=1883, qos=0, client_id='', + will=None, auth=None, tls=None, transport='tcp', payload_only=False, + **kwargs): def on_connect(client, userdata, flags, rc): if rc != 0: _log.warning("Connection to MQTT broker {}:{} failed".format(hostname, port)) else : _log.info("Connected to MQTT broker {}:{}".format(hostname, port)) - client.subscribe(self.topics) + topics = dict(self.topics) + for topic, meta in topics.iteritems(): + if meta["unsubscribe"]: + if self._unsubscribe(topic): + self.topics.remove((topic, meta)) + else: + self._subscribe(topic, meta["qos"]) def on_disconnect(client, userdata, rc): _log.warning("MQTT broker {}:{} disconnected".format(hostname, port)) def on_message(client, userdata, message): + _log.info("New message {}".format(message)) self.data.append({"topic": message.topic, "payload": message.payload}) self.scheduler_wakeup() @@ -209,9 +242,9 @@ def on_log_debug(client, string): } is_tls = False - if (uri): + if uri: result = urlparse(uri) - if (result.scheme.lower() in ["https", "mqtts"]): + if result.scheme.lower() in ["https", "mqtts"]: is_tls = True # override hostname and port hostname = result.hostname @@ -219,10 +252,13 @@ def on_log_debug(client, string): _log.info("TLS: {}".format(tls)) self.payload_only = payload_only - self.topics = [(topic.encode("ascii"), qos) for topic in topics] + # topics is a deque({:{qos:,unsubscribe:]}) + self.topics = deque((topic.encode("ascii"), {"qos":qos, "unsubscribe":False}) + for topic in list(set(topics))) self.data = [] + clean_session = kwargs.get('clean_session', False) - self.client = mqtt.Client(client_id=client_id, transport=transport) + self.client = mqtt.Client(client_id=client_id, transport=transport, clean_session=clean_session) self.client.on_connect = on_connect self.client.on_disconnect = on_disconnect self.client.on_message = on_message @@ -252,30 +288,86 @@ def can_write(self): return True def write(self, data): + ret = True + cmd = data.get("cmd", Subscribe.CMD_SUBSCRIBE) topic = data.get("topic", "").encode("ascii") + if not topic: + _log.error("The topic is missing!") + return False qos = data.get("qos", 0) - update_topic = True - topic_index = -1 - # check if topic already exist - for idx in range(len(self.topics)): - (t, q) = self.topics[idx] - if t == topic: - if (q == qos): - update_topic = False + topics = dict(self.topics) + item = None + if topic in topics.iterkeys(): + item = next((t, meta) for t, meta in topics.iteritems() if t == topic) + + if cmd == Subscribe.CMD_SUBSCRIBE: + if not item or topics[topic]["qos"] != qos: + # the case when qos is changing + if self._validate_qos(qos): + ret = self._subscribe(topic, qos) + # store the topic only if it is subscribed or the subscription + # is postponed until the client connects to the broker + if ret: + if item: + self.topics.remove(item) + self.topics.append((topic, {"qos":qos, "unsubscribe":False})) else: - topic_index = idx - break - - if (update_topic): - status = self.client.subscribe([(topic, qos)]) - if (status[0] == mqtt.MQTT_ERR_SUCCESS): - if (topic_index >= 0): - self.topics[topic_index] = (topic, qos) - else: - self.topics.append((topic, qos)) + _log.error("Invalid QOS value!") + ret = False + else: + _log.debug("Subscription to topic '{}' already exist!") + elif cmd == Subscribe.CMD_UNSUBSCRIBE: + if item: + ret = self._unsubscribe(topic) + self.topics.remove(item) + if not ret: + # postpone subscription removal + item[1]["unsubscribe"] = True + self.topics.append(item) else: - _log.error("Failed to update topic: ({}, {}) Check MQTT logs" - .format(topic, qos)) + _log.error("Unknown topic!") + ret = False + else: + _log.error("Unknown command: {}!", cmd) + ret = False + if ret: + _log.debug("Command {}({},[{}]) successfully finished".format(cmd, topic, qos)) + return ret + + def _subscribe(self, topic, qos): + ret = False + status = self.client.subscribe((topic, qos)) + if status[0] == mqtt.MQTT_ERR_SUCCESS: + _log.info("Successfully subscribed to topic '{}'".format(topic)) + ret = True + elif status[0] == mqtt.MQTT_ERR_NO_CONN: + # the topic will subscribe on next connect + _log.warn("No connection to the MQTT broker. Postpone subscription") + ret = True + else: + _log.error("Failed to subscribe topic: ({}, {}) error code {}" + .format(topic, qos, status[0])) + return ret + + def _validate_qos(self, qos): + # see MQTT client QOS validation + valid = False + if 0 <= qos <= 2: + valid = True + return valid + + def _unsubscribe(self, topic): + ret = False + status = self.client.unsubscribe(topic) + if status[0] == mqtt.MQTT_ERR_SUCCESS: + _log.info("Successfully removed topic '{}' subscription".format(topic)) + ret = True + elif status[0] == mqtt.MQTT_ERR_NO_CONN: + _log.warn("No connection to the MQTT broker. Postpone subscription removal") + else: + _log.error("Failed to unsubscribe topic: ({}) error code {}" + .format(topic, status[0])) + return ret def can_read(self): return bool(self.data) @@ -284,7 +376,7 @@ def read(self): data = self.data.pop(0) if self.payload_only: return data.get("payload") - else : + else: return data def close(self):