diff --git a/Makefile b/Makefile index bdfa3da..96df9db 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,7 @@ lint : # stop the build if there are Python syntax errors or undefined names flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # exit-zero treats all errors as warnings - flake8 . --count --exit-zero --max-complexity=10 --max-line-length=100 --statistics + flake8 . --count --exit-zero --max-complexity=12 --max-line-length=100 --statistics .PHONY: format format : diff --git a/README.md b/README.md index 2900d6e..1d69d5e 100644 --- a/README.md +++ b/README.md @@ -2,31 +2,41 @@ Hop Client ============= ![](https://github.com/scimma/hop-client/workflows/build/badge.svg) - [![codecov](https://codecov.io/gh/scimma/hop-client/branch/master/graph/badge.svg)](https://codecov.io/gh/scimma/hop-client) +| | | +| ------------ | ------ | +| **Docs:** | https://hop-client.readthedocs.io/en/stable/ | + **hop-client** is a pub-sub client library for Multimessenger Astrophysics. ## Quickstart -publish a message to kafka: +By default, authentication is enabled, reading in configuration settings +from `auth.conf`. The path to this configuration can be found by running +`hop auth locate`. One can initialize this configuration with default +settings by running `hop auth setup`. To disable authentication in the CLI +client, one can run `--no-auth`. + +Publish a message: ``` -hop publish kafka://hostname:port/gcn -f circular example.gcn3 +hop publish kafka://hostname:port/gcn -f CIRCULAR example.gcn3 ``` -Subscribe to the earliest offset of a Kafka topic and print to stdout: +Example messages are provided in `tests/data` including: +* A GCN circular (`example.gcn3`) +* A VOEvent (`example_voevent.xml`) + + +Consume messages: + ``` -hop subscribe kafka://hostname:port/gcn -e +hop subscribe kafka://hostname:port/gcn -s EARLIEST ``` -Two example messages (an RFC 822 formatted GCN circular `example.gcn3` and a VOEvent 2.0 -schema xml `example_voevent.xml` are provided in `tests/data`. - -Client [configuration](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) -properties can be passed to `hop publish` via `-X property=value` or in a configuration -file specified by `-F `, mimicking the behavior of `kafkacat`. This can be -used to connect to a Kafka broker with SSL authentication enabled, for example. +This will read messages from the gcn topic from the earliest offset +and read messages until an end of stream (EOS) is received. ## Installation diff --git a/doc/api/api.rst b/doc/api/api.rst index d1367fd..b75390e 100644 --- a/doc/api/api.rst +++ b/doc/api/api.rst @@ -6,8 +6,10 @@ hop-client API .. toctree:: :maxdepth: 2 + auth cli io publish subscribe models + version diff --git a/doc/api/auth.rst b/doc/api/auth.rst new file mode 100644 index 0000000..4386e37 --- /dev/null +++ b/doc/api/auth.rst @@ -0,0 +1,7 @@ +.. _auth: + +hop.auth +################ + +.. automodule:: hop.auth + :members: diff --git a/doc/api/version.rst b/doc/api/version.rst new file mode 100644 index 0000000..589ae62 --- /dev/null +++ b/doc/api/version.rst @@ -0,0 +1,7 @@ +.. _version: + +hop.version +################ + +.. automodule:: hop.version + :members: diff --git a/doc/index.rst b/doc/index.rst index 48231f3..df37ff7 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -9,8 +9,8 @@ User's Guide user/installation user/quickstart - user/commands user/stream + user/commands API Reference ------------- diff --git a/doc/user/commands.rst b/doc/user/commands.rst index b925489..353ce05 100755 --- a/doc/user/commands.rst +++ b/doc/user/commands.rst @@ -8,14 +8,29 @@ Commands **hop-client** provides a command line interface for various tasks: -:code:`hop publish`: parse and publish messages such as GCN circulars and notices -:code:`hop subscribe`: parse and print messages such as GCN circulars and notices +:code:`hop auth`: Authentication utilities +:code:`hop publish`: Publish messages such as GCN circulars and notices +:code:`hop subscribe`: Listen to messages such as GCN circulars and notices +:code:`hop version`: Print version dependencies of :code:`hop-client` + +:code:`hop auth` +~~~~~~~~~~~~~~~~~~~~~~ + +This command allows a user to handle auth-based configuration. + +.. program-output:: hop auth --help + :nostderr: :code:`hop publish` ~~~~~~~~~~~~~~~~~~~~~~ -This command allows a user to parse an `RFC 822 formatted GCN circular `_, an XML formatted `GCN/VOEvent notice `_, or an unstructured string or text file and publish it as via Kafka. Structured messages -such as GCN circulars and VOEvents are published as JSON-formatted text. +This command allows a user to publish various structured and unstructured messages, including: + +* `RFC 822 formatted GCN circular `_ +* An XML formatted `GCN/VOEvent notice `_ +* Unstructured messages such as byte-encoded or JSON-serializable data. + +Structured messages such as GCN circulars and VOEvents are published as JSON-formatted text. .. program-output:: hop publish --help :nostderr: @@ -24,8 +39,7 @@ such as GCN circulars and VOEvents are published as JSON-formatted text. :code:`hop subscribe` ~~~~~~~~~~~~~~~~~~~~~~ -This command allows a user to parse a message from a Kafka topic and display it on stdout via -a pretty-print or JSON dump. +This command allows a user to subscribe to messages and print them to stdout. .. program-output:: hop subscribe --help :nostderr: @@ -36,4 +50,4 @@ a pretty-print or JSON dump. This command prints all the versions of the dependencies .. program-output:: hop version --help - :nostderr: \ No newline at end of file + :nostderr: diff --git a/doc/user/quickstart.rst b/doc/user/quickstart.rst index 10b23ef..231cac3 100644 --- a/doc/user/quickstart.rst +++ b/doc/user/quickstart.rst @@ -9,63 +9,114 @@ Quickstart Using the CLI ------------- -Publish a message +By default, authentication is enabled, reading in configuration settings +from :code:`auth.conf`. The path to this configuration can be found by running +:code:`hop auth locate`. One can initialize this configuration with default +settings by running :code:`hop auth setup`. To disable authentication in the CLI +client, one can run :code:`--no-auth`. + +Publish messages ^^^^^^^^^^^^^^^^^ .. code:: bash - hop publish kafka://hostname:port/gcn -f circular example.gcn3 + hop publish kafka://hostname:port/gcn -f CIRCULAR example.gcn3 -Two example messages (an RFC 822 formatted GCN circular (:code:`example.gcn3`) and a VOEvent 2.0 -schema xml (:code: `example_voevent.xml`)) are provided in :code:`tests/data`. +Example messages are provided in :code:`tests/data` including: -Client `configuration `_ -properties can be passed to :code:`hop publish` via :code:`-X property=value` or in a configuration -file specified by :code:`-F `, mimicking the behavior of :code:`kafkacat`. This can be -used to connect to a Kafka broker with SSL authentication enabled, for example. +* A GCN circular (:code:`example.gcn3`) +* A VOEvent (:code:`example_voevent.xml`) -Consume a message +Consume messages ^^^^^^^^^^^^^^^^^ .. code:: bash - hop subscribe kafka://hostname:port/gcn -e + hop subscribe kafka://hostname:port/gcn -s EARLIEST + +This will read messages from the gcn topic from the earliest offset +and read messages until an end of stream (EOS) is received. + +Using the Python API +---------------------- + +Publish messages +^^^^^^^^^^^^^^^^^ + +Using the python API, we can publish various types of messages, including +structured messages such as GCN Circulars and VOEvents: + +.. code:: python -Configuration properties can be passed in a manner identical to :code:`hop publish` above. + from hop import stream + from hop.models import GCNCircular + # read in a GCN circular + with open("path/to/circular.gcn3", "r") as f: + circular = GCNCircular.load(f) -Reading messages from stream ----------------------------- + with stream.open("kafka://hostname:port/topic", "w") as s: + s.write(circular) -The hop client supports a python-based API for reading messages from a stream, as follows: +In addition, we can also publish unstructured messages as long as they are +JSON serializable: .. code:: python from hop import stream - with stream.open("kafka://hostname:port/topic", "r", format="json") as s: - for idx, msg in s: - print(msg) + with stream.open("kafka://hostname:port/topic", "w") as s: + s.write({"my": "message"}) -This block will hang forever, listening to new messages and processing them as they arrive. -By default, this will only process new messages since the connection was opened. The :code:`start_at` -option lets you control where in the stream you can start listening from. For example, -if you'd like to listen to all messages stored in a topic, you can do: +By default, authentication is enabled for the Hop broker. In order to authenticate, one +can pass in an :code:`Auth` instance with credentials: .. code:: python - with stream.open("kafka://hostname:port/topic", "r", format="json", start_at="latest") as s: - for idx, msg in s: - print(msg) + from hop import stream + from hop.auth import Auth -Writing messages to stream --------------------------- + auth = Auth("my-username", "my-password") -We can also publish messages to a topic, as follows: + with stream.open("kafka://hostname:port/topic", "w", auth=auth) as s: + s.write({"my": "message"}) + +A convenience function is also provided to read in auth configuration in the same way +as in the CLI client: .. code:: python from hop import stream + from hop.auth import load_auth - with stream.open("kafka://hostname:port/topic", "w", format="json") as s: + with stream.open("kafka://hostname:port/topic", "w", auth=load_auth()) as s: s.write({"my": "message"}) + +Consume messages +^^^^^^^^^^^^^^^^^ + +One can consume messages through the python API as follows: + +.. code:: python + + from hop import stream + + with stream.open("kafka://hostname:port/topic", "r") as s: + for message in s: + print(message) + +This will listen to the Hop broker, listening to new messages and printing them to +stdout as they arrive until there are no more messages in the stream. +By default, this will only process new messages since the connection was opened. +The :code:`start_at` option lets you control where in the stream you can start listening +from. For example, if you'd like to listen to all messages stored in a topic, you can do: + +.. code:: python + + from hop import stream + from hop.io import StartPosition + + with stream.open("kafka://hostname:port/topic", "r", start_at=StartPosition.EARLIEST) as s: + for message in s: + print(message) + diff --git a/doc/user/stream.rst b/doc/user/stream.rst index e216a1b..7da04e5 100644 --- a/doc/user/stream.rst +++ b/doc/user/stream.rst @@ -1,6 +1,6 @@ -========== -Stream -========== +================ +Streaming +================ .. contents:: :local: @@ -8,38 +8,53 @@ Stream The Stream Object ----------------- -The Stream object allows a user to connect to a Kafka broker and read +The :code:`Stream` object allows a user to connect to a Kafka broker and read in a variety of alerts, such as GCN circulars. It also allows one to specify default settings used across all streams opened from the Stream instance. -Let's open up a stream and show the Stream object in action: +Let's open up a stream and show the :code:`Stream` object in action: .. code:: python from hop import Stream - stream = Stream(format="json") + stream = Stream(persist=True) with stream.open("kafka://hostname:port/topic", "r") as s: - for idx, msg in s: - print(msg) + for message in s: + print(message) -A common use case is to not specify any defaults, so a shorthand is -provided for using one: +The :code:`persist` option allows one to listen to messages forever +and keeps the connection open after an end of stream (EOS) is received. +This is to allow long-lived connections where one may set up a service +to process incoming GCNs, for example. + +A common use case is to not specify any defaults ahead of time, +so a shorthand is provided for using one: .. code:: python from hop import stream with stream.open("kafka://hostname:port/topic", "r") as s: - for _, msg in s: - print(msg) + for message in s: + print(message) + +Both :code:`stream.open` and :code:`Stream` can be passed with similar +configuration options, including: + +* :code:`auth`: An :code:`auth.Auth` instance to provide authentication +* :code:`start_at`: The message offset to start at, by passing in an :code:`io.StartPosition` +* :code:`persist`: Whether to keep a long-live connection to the client beyond EOS -You can also configure the open stream handle with various options, -including a timeout, a progress bar, and a message limit: +In addition, :code:`stream.open` provides an option to retrieve Kafka message metadata as well +as the message itself, such as the Kafka topic, key, timestamp and offset. This may +be useful in the case of listening to multiple topics at once: .. code:: python - with stream.open("kafka://hostname:port/topic", "r") as s: - for _, msg in s(timeout=10, limit=20): - print(msg) + from hop import stream + + with stream.open("kafka://hostname:port/topic1,topic2", "r", metadata=True) as s: + for message, metadata in s: + print(message, metadata.topic) diff --git a/hop/__init__.py b/hop/__init__.py index adcb61c..f26d142 100644 --- a/hop/__init__.py +++ b/hop/__init__.py @@ -1,9 +1,6 @@ __path__ = __import__("pkgutil").extend_path(__path__, __name__) # declare namespace -try: - from ._version import version as __version__ -except ImportError: - pass +from ._version import version as __version__ from .io import Stream diff --git a/hop/__main__.py b/hop/__main__.py index f912b41..fae1e67 100755 --- a/hop/__main__.py +++ b/hop/__main__.py @@ -6,6 +6,7 @@ import signal from . import __version__ +from . import auth from . import publish from . import subscribe from . import version @@ -30,6 +31,10 @@ def append_subparser(subparser, cmd, func): def set_up_cli(): """Set up CLI commands for hop entry point. + Returns: + An ArgumentParser instance with hop-based commands and relevant + arguments added for all commands. + """ parser = argparse.ArgumentParser(prog="hop") parser.add_argument( @@ -41,6 +46,9 @@ def set_up_cli(): subparser.required = True # register commands + p = append_subparser(subparser, "auth", auth._main) + auth._add_parser_args(p) + p = append_subparser(subparser, "publish", publish._main) publish._add_parser_args(p) diff --git a/hop/auth.py b/hop/auth.py new file mode 100644 index 0000000..fe2c3af --- /dev/null +++ b/hop/auth.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python + +__author__ = "Patrick Godwin (patrick.godwin@psu.edu)" +__description__ = "a module for auth utilities" + +import argparse +import errno +import logging +import os + +from adc import auth + +DEFAULT_AUTH_CONFIG = """\ +security.protocol=SASL_SSL +sasl.mechanism=SCRAM-SHA-512 +sasl.username=username +sasl.password=password +""" + +logger = logging.getLogger("hop") + +# expose auth options from adc +SASLAuth = auth.SASLAuth +SASLMethod = auth.SASLMethod + +# map default auth option to Auth +Auth = auth.SASLAuth + + +def get_auth_path(): + """Determines the default location for auth configuration. + + Returns: + The path to the authentication configuration file. + + """ + auth_filepath = os.path.join("hop", "auth.conf") + if "XDG_CONFIG_HOME" in os.environ: + return os.path.join(os.getenv("XDG_CONFIG_HOME"), auth_filepath) + else: + return os.path.join(os.getenv("HOME"), ".config", auth_filepath) + + +def load_auth(authfile=get_auth_path()): + """Configures an Auth instance given a configuration file. + + Args: + authfile: Path to a configuration file, loading from + the default location if not given. + + Returns: + A configured Auth instance. + + Raises: + KeyError: An error occurred parsing the configuration file. + + """ + if not os.path.exists(authfile): + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), authfile) + + # load config + with open(authfile, "r") as f: + config = {opt[0]: opt[1] for opt in (line.split("=") for line in f.read().splitlines())} + + # translate config options + ssl = True + extra_kwargs = {} + try: + # SSL options + if config["security.protocol"] == "SASL_PLAINTEXT": + ssl = False + elif "ssl.ca.location" in config: + extra_kwargs["ssl_ca_location"] = config["ssl.ca.location"] + + # SASL options + user = config["sasl.username"] + password = config["sasl.password"] + method = SASLMethod[config["sasl.mechanism"].replace("-", "_")] + except KeyError: + raise KeyError("configuration file is not configured correctly") + else: + return Auth(user, password, ssl=ssl, method=method, **extra_kwargs) + + +def _add_parser_args(parser): + subparser = parser.add_subparsers(title="Commands", metavar="", dest="command") + subparser.add_parser( + "locate", + formatter_class=argparse.RawDescriptionHelpFormatter, + help="display authentication config path", + ) + + setup_subparser = subparser.add_parser( + "setup", + formatter_class=argparse.RawDescriptionHelpFormatter, + help="set up authentication config with defaults", + ) + setup_subparser.add_argument( + "-f", "--force", action="store_true", help="If set, overrides current configuration", + ) + + +def _main(args): + """Authentication utilities. + + """ + authfile = get_auth_path() + logging.basicConfig( + level=logging.INFO, format="%(asctime)s | %(name)s : %(levelname)s : %(message)s", + ) + + if args.command == "locate": + print(authfile) + elif args.command == "setup": + if os.path.exists(authfile) and not args.force: + logger.warning("configuration already exists, overwrite file with --force") + else: + os.makedirs(os.path.dirname(authfile), exist_ok=True) + with open(authfile, "w") as f: + f.write(DEFAULT_AUTH_CONFIG) diff --git a/hop/cli.py b/hop/cli.py index 7aacdf6..e32df62 100644 --- a/hop/cli.py +++ b/hop/cli.py @@ -4,31 +4,19 @@ __description__ = "a module for CLI utilities" -def add_url_opts(parser): +def add_client_opts(parser): + """Add general client options to an argument parser. + + Args: + parser: An ArgumentParser instance to add client options to. + + """ + parser.add_argument( "url", metavar="URL", help="Sets the URL (kafka://host[:port]/topic) to publish messages to.", ) - - -def add_config_opts(parser): - config = parser.add_mutually_exclusive_group() - config.add_argument( - "-F", "--config-file", help="Set client configuration from file.", - ) - config.add_argument( - "-X", - "--config", - action="append", - help="Set client configuration via prop=val. Can be specified multiple times.", + parser.add_argument( + "--no-auth", action="store_true", help="If set, disable authentication." ) - - -def load_config(args): - if args.config_file: - return args.config_file - elif args.config: - return {opt[0]: opt[1] for opt in (kv.split("=") for kv in args.config)} - else: - return None diff --git a/hop/io.py b/hop/io.py index 87cb657..ec58c21 100644 --- a/hop/io.py +++ b/hop/io.py @@ -3,44 +3,232 @@ __author__ = "Patrick Godwin (patrick.godwin@psu.edu)" __description__ = "a module for i/o utilities" +from contextlib import contextmanager +from dataclasses import dataclass +import getpass +from enum import Enum +import json +import logging +import random +import string +from typing import Union +import warnings -from adc import streaming +from adc import consumer, errors, kafka, producer + +from . import models + +logger = logging.getLogger("hop") + +StartPosition = consumer.ConsumerStartPosition class Stream(object): """Defines an event stream. - Sets up defaults used within the genesis client, so that when a + Sets up defaults used within the client so that when a stream connection is opened, it will use defaults specified here. Args: - format: the message serialization format - start_at: the message offset to start at - config: librdkafka style options, either a `dict` or a file path + auth: An Auth instance. + start_at: The message offset to start at. + persist: Whether to listen to new messages forever or stop + when EOS is received. """ - def __init__(self, format=None, start_at=None, config=None): - self._options = {} - if format is not None: - self._options["format"] = format - if start_at is not None: - self._options["start_at"] = start_at - if config is not None: - self._options["config"] = config + def __init__(self, auth=None, start_at=None, persist=None): + self.auth = auth + self.start_at = start_at + self.persist = persist - def open(self, *args, **kwargs): + def open(self, url, mode="r", auth=None, start_at=None, persist=None, metadata=False): """Opens a connection to an event stream. Args: - broker_url: sets the broker URL to connect to + url: Sets the broker URL to connect to. Kwargs: - mode: read ('r') or write ('w') from the stream - format: the message serialization format - start_at: the message offset to start at - config: librdkafka style options, either a `dict` or a file path + mode: Read ('r') or write ('w') from the stream. + auth: An Auth instance. + start_at: The message offset to start at (read only). + persist: Whether to listen to new messages forever or stop + when EOS is received (read only). + metadata: Whether to receive message metadata along + with payload (read only). + + Returns: + An open connection to the client, either an adc Producer instance + in write mode or an adc Consumer instance in read mode. + + Raises: + ValueError: If the mode is not set to read/write or if more than + one topic is specified in write mode. """ - opts = {**self._options, **kwargs} - return streaming.open(*args, **opts) + group_id, broker_addresses, topics = kafka.parse_kafka_url(url) + logger.debug("connecting to addresses=%s group_id=%s topics=%s", + broker_addresses, group_id, topics) + if not auth and self.auth: + auth = self.auth + if mode == "w": + if len(topics) != 1: + raise ValueError("must specify exactly one topic in write mode") + if group_id is not None: + warnings.warn("group ID has no effect when opening a stream in write mode") + if start_at is not None: + warnings.warn("start_at has no effect when opening a stream in write mode") + if persist is not None: + warnings.warn("read_forever has no effect when opening a stream in write mode") + return _open_producer(broker_addresses, topics[0], auth=auth) + elif mode == "r": + if group_id is None: + group_id = _generate_group_id(10) + logger.info(f"group ID not specified, generating a random group ID: {group_id}") + # set up extra options if provided + opts = {} + if start_at or self.start_at: + opts["start_at"] = start_at if start_at else self.start_at + if persist is not None or self.persist is not None: + opts["read_forever"] = persist if persist is not None else self.persist + return _open_consumer( + group_id, + broker_addresses, + topics, + auth=auth, + metadata=metadata, + **opts, + ) + else: + raise ValueError("mode must be either 'w' or 'r'") + + +class Deserializer(Enum): + CIRCULAR = models.GCNCircular + VOEVENT = models.VOEvent + BLOB = models.MessageBlob + + @classmethod + def deserialize(cls, message): + """Deserialize a stream message and instantiate a model. + + Args: + message: A serialized message. + + Returns: + A data container corresponding to the format in the + serialized message. + + Raises: + ValueError: If the message is incorrectly formatted or + if the message format is not recognized. + + """ + try: + format = message["format"].upper() + content = message["content"] + except (KeyError, TypeError): + raise ValueError("Message is incorrectly formatted") + else: + if format == Deserializer.BLOB.name: + return cls[format].value(content=content) + elif format in cls.__members__: + return cls[format].value(**content) + else: + logger.warning(f"Message format {format} not recognized, returning a MessageBlob") + return models.MessageBlob(content=content, missing_schema=True) + + def load(self, input_): + return self.value.load(input_) + + def load_file(self, input_file): + return self.value.load_file(input_file) + + +def _generate_group_id(n): + """Generate a random Kafka group ID. + + Args: + n: Length of randomly generated string. + + Returns: + The generated group ID. + + """ + alphanum = string.ascii_uppercase + string.digits + rand_str = ''.join(random.SystemRandom().choice(alphanum) for _ in range(n)) + return '-'.join((getpass.getuser(), rand_str)) + + +@dataclass +class _Metadata: + """Broker-specific metadata that accompanies a consumed message. + + """ + + topic: str + partition: int + offset: int + timestamp: int + key: Union[str, bytes] + + +class _Consumer(consumer.Consumer): + def stream(self, metadata=False, **kwargs): + for message in super().stream(**kwargs): + yield self.unpack(message, metadata=metadata) + + @staticmethod + def unpack(message, metadata=False): + payload = json.loads(message.value().decode("utf-8")) + payload = Deserializer.deserialize(payload) + if metadata: + return ( + payload, + _Metadata( + message.topic(), + message.partition(), + message.offset(), + message.timestamp()[1], + message.key(), + ) + ) + else: + return payload + + +class _Producer(producer.Producer): + def write(self, message): + super().write(self.pack(message)) + + @staticmethod + def pack(message): + try: + payload = message.serialize() + except AttributeError: + payload = {"format": "blob", "content": message} + return json.dumps(payload).encode("utf-8") + + +@contextmanager +def _open_consumer(group_id, broker_addresses, topics, metadata=False, **kwargs): + client = _Consumer(consumer.ConsumerConfig( + broker_urls=broker_addresses, + group_id=group_id, + **kwargs, + )) + for t in topics: + client.subscribe(t) + try: + yield client.stream(metadata=metadata) + finally: + client.close() + + +def _open_producer(broker_addresses, topic, **kwargs): + return _Producer(producer.ProducerConfig( + broker_urls=broker_addresses, + topic=topic, + delivery_callback=errors.raise_delivery_errors, + **kwargs, + )) diff --git a/hop/models.py b/hop/models.py index f44ea87..edde9ef 100644 --- a/hop/models.py +++ b/hop/models.py @@ -37,16 +37,16 @@ def asdict(self): """Represents the VOEvent as a dictionary. Returns: - dict: the dict representation of the VOEvent. + A dictionary representation of the VOEvent. """ return asdict(self) - def wrap_message(self): + def serialize(self): """Wrap the message with its format and content. Returns: - A dictionary with "format" and "content" key-value pairs + A dictionary with "format" and "content" key-value pairs. """ @@ -57,11 +57,11 @@ def __str__(self): return json.dumps(self.asdict(), indent=2) @classmethod - def from_xml(cls, xml_input): + def load(cls, xml_input): """Create a new VOEvent from an XML-formatted VOEvent. Args: - xml_input: a file object, string, or generator + xml_input: A file object, string, or generator. Returns: The VOEvent. @@ -73,18 +73,18 @@ def from_xml(cls, xml_input): return cls(**{k: v for k, v in vo["voe:VOEvent"].items() if ":" not in k}) @classmethod - def from_xml_file(cls, filename): + def load_file(cls, filename): """Create a new VOEvent from an XML-formatted VOEvent file. Args: - filename: name of the VOEvent file + filename: Name of the VOEvent file. Returns: - The VOEvent + The VOEvent. """ with open(filename, "rb") as f: - return cls.from_xml(f) + return cls.load(f) @dataclass @@ -105,16 +105,16 @@ def asdict(self): """Represents the GCN Circular as a dictionary. Returns: - dict: the dict representation of the Circular. + The dictionary representation of the Circular. """ return asdict(self) - def wrap_message(self): + def serialize(self): """Wrap the message with its format and content. Returns: - A dictionary with "format" and "content" key-value pairs + A dictionary with "format" and "content" key-value pairs. """ @@ -126,11 +126,11 @@ def __str__(self): return "\n".join(headers + ["", self.body]) @classmethod - def from_email(cls, email_input): + def load(cls, email_input): """Create a new GCNCircular from an RFC 822 formatted circular. Args: - email_input: a file object or string + email_input: A file object or string. Returns: The GCNCircular. @@ -148,41 +148,41 @@ def from_email(cls, email_input): ) @classmethod - def from_email_file(cls, filename): + def load_file(cls, filename): """Create a new GCNCircular from an RFC 822 formatted circular file. Args: - filename: the GCN filename + filename: The GCN filename. Returns: The GCNCircular. """ - with open(filename, "r") as f: - return cls.from_email(f) + return cls.load(f) @dataclass class MessageBlob(object): """Defines an unformatted message structure. - This is included as a dataclass to mirror the implementation of structured formats. + This is included to mirror the implementation of structured formats. """ content: str + missing_schema: bool = False def asdict(self): """Represents the message as a dictionary. Returns: - dict: the dict representation of the message + The dictionary representation of the message. """ - return asdict(self) + return asdict(self) if self.missing_schema else {"content": self.content} - def wrap_message(self): + def serialize(self): """Wrap the message with its format and content. Returns: @@ -190,25 +190,38 @@ def wrap_message(self): """ - wrapped_message = {"format": "blob", "content": self.asdict()} + wrapped_message = {"format": "blob", "content": self.content} return wrapped_message def __str__(self): - return self.content + return str(self.content) @classmethod - def from_text(cls, blob_input): - """Create a blob message from input text or file + def load(cls, blob_input): + """Create a blob message from input text. Args: - blob_input: a file or string + blob_input: The unstructured message text or file object. Returns: The Blob. """ - try: - with open(blob_input, "r") as f: - return cls(f.read()) - except FileNotFoundError: + if hasattr(blob_input, "read"): + return cls(blob_input.read()) + else: return cls(blob_input) + + @classmethod + def load_file(cls, filename): + """Create a blob message from an input file. + + Args: + filename: A filename. + + Returns: + The Blob. + + """ + with open(filename, "r") as f: + return cls(f.read()) diff --git a/hop/publish.py b/hop/publish.py index 6af5f94..ec3d20b 100644 --- a/hop/publish.py +++ b/hop/publish.py @@ -4,12 +4,9 @@ __description__ = "tools to parse and publish messages" -import argparse -import warnings - +from .auth import load_auth from . import cli -from .io import Stream -from .models import GCNCircular, VOEvent, MessageBlob +from . import io # ------------------------------------------------ @@ -17,51 +14,28 @@ def _add_parser_args(parser): - cli.add_url_opts(parser) + cli.add_client_opts(parser) parser.add_argument( "message", metavar="MESSAGE", nargs="+", help="One or more messages to publish.", ) - cli.add_config_opts(parser) - parser.add_argument( "-f", "--format", - type=str, - default="blob", - help="Specifies the format of the message(s), such as 'circular' or 'voevent'. " - "Specify 'blob' if sending an unstructured message. Default: 'blob'.", + choices=io.Deserializer.__members__, + default=io.Deserializer.BLOB.name, + help="Specify the message format. Defaults to BLOB for an unstructured message.", ) -def _main(args=None): +def _main(args): """Parse and publish messages. """ + auth = load_auth() if not args.no_auth else None + loader = io.Deserializer[args.format] + stream = io.Stream(auth=auth) - if not args: - parser = argparse.ArgumentParser() - _add_parser_args(parser) - args = parser.parse_args() - - # load config if specified - config = cli.load_config(args) - - # create map of possible message formats and their model loader - model_loader = { - "circular": GCNCircular.from_email_file, - "voevent": VOEvent.from_xml_file, - "blob": MessageBlob.from_text, - } - - stream = Stream(format="json", config=config) with stream.open(args.url, "w") as s: - if args.format in model_loader: - loader = model_loader[args.format] - else: - warnings.warn( - "Warning: format not recognized. Sending as unstructured blob") - loader = model_loader["blob"] - for message_file in args.message: - message_model = loader(message_file) - s.write(message_model.wrap_message()) + message = loader.load_file(message_file) + s.write(message) diff --git a/hop/subscribe.py b/hop/subscribe.py index 498c0bb..2c0b3c9 100644 --- a/hop/subscribe.py +++ b/hop/subscribe.py @@ -4,48 +4,11 @@ __description__ = "tools to receive and parse messages" -import argparse import json +from .auth import load_auth from . import cli -from .io import Stream -from .models import GCNCircular, VOEvent, MessageBlob - - -def classify_message(message): - """Check the format of a message obtained from an ADC stream and - use it to instantiate a data model corresponding to that format. - - Args: - message: wrapped message from an ADC stream - - Returns: - message_model: dataclass model object for the message - - """ - - try: - fmt = message["format"] - content = message["content"] - except TypeError: - raise ValueError("Message is not wrapped with format/content keys") - except KeyError: - raise KeyError("Message does not contain format/content keys") - - # create map of message formats and dataclass models for parsing - model_creator = { - "circular": GCNCircular, - "voevent": VOEvent, - "blob": MessageBlob, - } - - if fmt in model_creator: - creator = model_creator[fmt] - message_model = creator(**content) - else: - raise ValueError(f"Message format {fmt} not recognized") - - return message_model +from . import io def print_message(message_model, json_dump=False): @@ -71,54 +34,36 @@ def print_message(message_model, json_dump=False): def _add_parser_args(parser): - cli.add_url_opts(parser) - cli.add_config_opts(parser) + cli.add_client_opts(parser) # consumer options parser.add_argument( - "-j", "--json", help="Request message output as raw json", action="store_true", + "-s", + "--start-at", + choices=io.StartPosition.__members__, + default=str(io.StartPosition.LATEST).upper(), + help="Set the message offset offset to start at. Default: LATEST.", ) parser.add_argument( - "-e", - "--earliest", - help="Request to stream from the earliest available Kafka offset", + "-p", + "--persist", action="store_true", + help="If set, persist or listen to messages indefinitely. " + "Otherwise, will stop listening when EOS is received.", ) parser.add_argument( - "-t", - "--timeout", - type=float, - default=10, - help="Specifies the time (in seconds) to wait for messages before timing out; " - "specify -1 to wait indefinitely. Default: 10 seconds", + "-j", "--json", help="Request message output as raw json", action="store_true", ) -def _main(args=None): +def _main(args): """Receive and parse messages. """ - if not args: - parser = argparse.ArgumentParser() - _add_parser_args(parser) - args = parser.parse_args() - - # load config if specified - config = cli.load_config(args) - - # set offset - start_offset = "earliest" if args.earliest else "latest" - - # set timeout - timeout = None if args.timeout == -1 else args.timeout - - # read from topic - - # assume json format for the message stream - stream_format = "json" + auth = load_auth() if not args.no_auth else None + start_at = io.StartPosition[args.start_at] + stream = io.Stream(auth=auth, start_at=start_at, persist=args.persist) - stream = Stream(format=stream_format, config=config, start_at=start_offset) with stream.open(args.url, "r") as s: - for message in s(timeout=timeout): - message_model = classify_message(message) - print_message(message_model, args.json) + for message in s: + print_message(message, args.json) diff --git a/hop/version.py b/hop/version.py index 7db97b1..fb55ae8 100644 --- a/hop/version.py +++ b/hop/version.py @@ -6,8 +6,8 @@ def print_packages_versions(): - """ - Print versions for the passed packages + """Print versions for the passed packages. + """ packages = get_packages() for pkg in packages: @@ -18,6 +18,9 @@ def print_packages_versions(): def get_packages(): + """Returns the package dependencies used within hop-client. + + """ return ["hop-client", "adc_streaming", "confluent_kafka", "librdkafka"] @@ -25,8 +28,8 @@ def get_packages(): # -- main -def _main(args=None): - """List all the dependencies' versions +def _main(args): + """List all the dependencies' versions. """ diff --git a/recipe/meta.yaml b/recipe/meta.yaml index 4737368..cd3b846 100644 --- a/recipe/meta.yaml +++ b/recipe/meta.yaml @@ -25,7 +25,7 @@ requirements: - setuptools_scm run: - python - - adc-streaming ==0.3.1 + - adc-streaming >=1.0.0 - xmltodict >=0.9.0 - dataclasses # [py==36] diff --git a/requirements.txt b/requirements.txt index 7190389..4750465 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -adc==0.3.1 +adc==1.0.0 diff --git a/setup.cfg b/setup.cfg index 20da30d..92fd02e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -13,6 +13,7 @@ exclude = per-file-ignores = __init__.py:F401, tests/conftest.py:E501 + tests/*:F841 [tool:pytest] log_cli = True diff --git a/setup.py b/setup.py index 8a29c3f..c5940c6 100755 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ # requirements install_requires = [ - "adc-streaming == 0.3.1", + "adc-streaming >= 1.0.0", "dataclasses ; python_version < '3.7'", "xmltodict >= 0.9.0" ] diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..762868f --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python + +__author__ = "Patrick Godwin (patrick.godwin@psu.edu)" +__description__ = "a module that tests the auth utilities" + + +from unittest.mock import patch, mock_open + +import pytest + +from hop import auth + + +def test_load_auth(): + with patch("builtins.open", mock_open(read_data=auth.DEFAULT_AUTH_CONFIG)) as mock_file: + + # check error handling + with pytest.raises(FileNotFoundError): + auth.load_auth() + + # check auth loads correctly + with patch("os.path.exists") as mock_exists: + mock_exists.return_value = True + auth.load_auth() diff --git a/tests/test_cli.py b/tests/test_cli.py index 8c3531c..453da2d 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -47,7 +47,8 @@ def test_cli_publish(script_runner, message_format, message_parameters_dict): broker_url = "kafka://hostname:port/message" ret = script_runner.run( - "hop", "publish", broker_url, "-f", message_format, test_file) + "hop", "publish", broker_url, "-f", message_format.upper(), test_file, "--no-auth", + ) # verify CLI output assert ret.success @@ -69,7 +70,7 @@ def test_cli_subscribe(script_runner): with patch("hop.io.Stream.open", mock_open()) as mock_stream: broker_url = "kafka://hostname:port/message" - ret = script_runner.run("hop", "subscribe", broker_url) + ret = script_runner.run("hop", "subscribe", broker_url, "--no-auth") # verify CLI output assert ret.success @@ -77,3 +78,23 @@ def test_cli_subscribe(script_runner): # verify broker url was processed mock_stream.assert_called_with(broker_url, "r") + + +def test_cli_auth(script_runner): + ret1 = script_runner.run("hop", "auth", "--help") + assert ret1.success + assert ret1.stderr == "" + + ret = script_runner.run("hop", "auth", "locate") + assert ret.success + assert ret.stderr == "" + + +def test_cli_version(script_runner): + ret = script_runner.run("hop", "version", "--help") + assert ret.success + assert ret.stderr == "" + + ret = script_runner.run("hop", "version") + assert ret.success + assert ret.stderr == "" diff --git a/tests/test_io.py b/tests/test_io.py index b549365..a4cb739 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -4,29 +4,190 @@ __description__ = "a module that tests the io utilities" -from unittest.mock import patch, mock_open +from dataclasses import fields +import json +import logging +from pathlib import Path +from unittest.mock import patch, MagicMock, Mock -from hop import Stream +import pytest +from hop.auth import Auth +from hop import io +from hop.models import GCNCircular, VOEvent, MessageBlob -def test_stream(circular_msg, circular_text): - with patch("hop.io.streaming.open", mock_open()) as mock_stream: - broker_url = "kafka://hostname:port/gcn" - format = "json" - start_at = "beginning" - config = {} +logger = logging.getLogger("hop") - stream = Stream(format=format, start_at=start_at, config=config) - # verify defaults are stored correctly - assert stream._options["format"] == format - assert stream._options["start_at"] == start_at - assert stream._options["config"] == config +def content_mock(message_model): + """Mock content to pass during the message_model creation since MagicMock() + is unable to mock __init__ of the model dataclass in tests. + """ + content = {} + for field in fields(message_model): + content.update({field.name: "test"}) + return content - with stream.open(broker_url, "w") as s: + +# test the deserializer for each message format +@pytest.mark.parametrize("message", [ + {"format": "voevent", "content": content_mock(VOEvent)}, + {"format": "circular", "content": content_mock(GCNCircular)}, + {"format": "blob", "content": content_mock(MessageBlob)}, + {"format": "other", "content": "other"}, + ["wrong_datatype"], + {"wrong_key": "value"}, +]) +def test_deserialize(message, message_parameters_dict, caplog): + + # test a non-dict message + if not isinstance(message, dict): + with pytest.raises(ValueError): + test_model = io.Deserializer.deserialize(message) + return + # test a dict message with wrong key values + elif not (("format" in message) and ("content" in message)): + with pytest.raises(ValueError): + test_model = io.Deserializer.deserialize(message) + return + + message_format = message["format"] + message_content = message["content"] + + # load parameters from conftest for valid formats + if message_format in message_parameters_dict: + message_parameters = message_parameters_dict[message_format] + model_name = message_parameters["model_name"] + expected_model = message_parameters["expected_model"] + + # test valid formats + with patch(f"hop.models.{model_name}", MagicMock()): + test_model = io.Deserializer.deserialize(message) + + # verify the message is classified properly + assert isinstance(test_model, expected_model) + + else: # test an invalid format + with caplog.at_level(logging.WARNING): + test_model = io.Deserializer.deserialize(message) + + # verify a message blob was produced with warnings + output = f"Message format {message_format.upper()} " \ + "not recognized, returning a MessageBlob" + assert isinstance(test_model, MessageBlob) + assert output in caplog.text + assert test_model.missing_schema + + +def test_stream_read(circular_msg, circular_text): + with patch("hop.io._Consumer", MagicMock()) as mock_consumer: + mock_consumer.stream.return_value = [{'hey', 'you'}] + + broker_url1 = "kafka://hostname:port/gcn" + broker_url2 = "kafka://hey@hostname:port/gcn" + start_at = io.StartPosition.EARLIEST + persist = False + + stream = io.Stream(persist=persist) + + with stream.open(broker_url1, "r", start_at=start_at) as s: + for msg in s: + continue + + with stream.open(broker_url2, "r", start_at=start_at) as s: + for msg in s: + continue + + +def test_stream_write(circular_msg, circular_text): + with patch("hop.io._Producer", MagicMock()) as mock_producer: + mock_producer.write = Mock() + + broker_url = "kafka://localhost:port/gcn" + auth = Auth("user", "password") + start_at = io.StartPosition.EARLIEST + persist = False + + stream = io.Stream() + + # verify only 1 topic is allowed in write-mode + with pytest.raises(ValueError): + stream.open("kafka://localhost:9092/topic1,topic2", "w") + + # check various warnings when certain settings are set + with pytest.warns(UserWarning): + stream.open("kafka://group@localhost:9092/topic1", "w") + with pytest.warns(UserWarning): + stream.open(broker_url, "w", start_at=start_at) + with pytest.warns(UserWarning): + stream.open(broker_url, "w", persist=persist) + + with stream.open(broker_url, "w", auth=auth) as s: s.write(circular_msg) - # verify GCN was processed - mock_stream.assert_called_with( - broker_url, "w", format=format, start_at=start_at, config=config - ) + +def test_stream_open(): + stream = io.Stream() + + # verify only read/writes are allowed + with pytest.raises(ValueError): + stream.open("kafka://localhost:9092/topic1", "q") + + +def test_unpack(circular_msg, circular_text): + wrapped_msg = {"format": "circular", "content": circular_msg} + + kafka_msg = MagicMock() + kafka_msg.value.return_value = json.dumps(wrapped_msg).encode("utf-8") + + unpacked_msg = io._Consumer.unpack(kafka_msg) + + +def test_pack(circular_msg, circular_text): + # message class + circular = GCNCircular(**circular_msg) + packed_msg = io._Producer.pack(circular) + + # unstructured message + message = {"hey": "you"} + packed = io._Producer.pack(message) + + +@pytest.mark.parametrize("message", [ + {"format": "voevent", "content": content_mock(VOEvent)}, + {"format": "circular", "content": content_mock(GCNCircular)}, + {"format": "blob", "content": content_mock(MessageBlob)}, +]) +def test_pack_unpack_roundtrip(message, message_parameters_dict, caplog): + format = message["format"] + content = message["content"] + + # load test data + shared_datadir = Path("tests/data") + test_filename = message_parameters_dict[format]["test_file"] + test_file = shared_datadir / "test_data" / test_filename + + # generate a message + expected_model = message_parameters_dict[format]["expected_model"] + if format in ("voevent", "circular"): + orig_message = expected_model.load_file(test_file) + else: + orig_message = test_file.read_text() + + # pack the message + packed_msg = io._Producer.pack(orig_message) + + # mock a kafka message with value being the packed message + kafka_msg = MagicMock() + kafka_msg.value.return_value = packed_msg + + # unpack the message + unpacked_msg = io._Consumer.unpack(kafka_msg) + + # verify based on format + if format in ("voevent", "circular"): + assert isinstance(unpacked_msg, expected_model) + assert unpacked_msg.asdict() == orig_message.asdict() + else: + assert isinstance(unpacked_msg, MessageBlob) + assert unpacked_msg.content == orig_message diff --git a/tests/test_models.py b/tests/test_models.py index 27033d0..286584a 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -10,7 +10,7 @@ def test_voevent(voevent_fileobj): - voevent = models.VOEvent.from_xml(voevent_fileobj) + voevent = models.VOEvent.load(voevent_fileobj) # check a few attributes assert voevent.ivorn == "ivo://gwnet/LVC#S200302c-1-Preliminary" @@ -22,14 +22,14 @@ def test_voevent(voevent_fileobj): assert voevent.WhereWhen["ObsDataLocation"]["ObservatoryLocation"]["id"] == "LIGO Virgo" # verify wrapper format is correct - assert voevent.wrap_message()["format"] == "voevent" + assert voevent.serialize()["format"] == "voevent" def test_gcn_circular(circular_text, circular_msg): with patch("builtins.open", mock_open(read_data=circular_text)): gcn_file = "example.gcn3" with open(gcn_file, "r") as f: - gcn = models.GCNCircular.from_email(f) + gcn = models.GCNCircular.load(f) # verify parsed GCN structure is correct assert gcn.header["title"] == circular_msg["header"]["title"] @@ -41,17 +41,17 @@ def test_gcn_circular(circular_text, circular_msg): assert gcn.body == circular_msg["body"] # verify wrapper format is correct - assert gcn.wrap_message()["format"] == "circular" + assert gcn.serialize()["format"] == "circular" def test_blob(blob_text, blob_msg): with patch("builtins.open", mock_open(read_data=blob_text)): blob_file = "example_blob.txt" with open(blob_file, "r") as f: - blob = models.MessageBlob.from_text(f) + blob = models.MessageBlob.load(f) # verify blob text is correct assert blob.content == blob_msg["content"] # verify wrapper format is correct - assert blob.wrap_message()["format"] == "blob" + assert blob.serialize()["format"] == "blob" diff --git a/tests/test_subscribe.py b/tests/test_subscribe.py index 8ffc3ae..25f025b 100644 --- a/tests/test_subscribe.py +++ b/tests/test_subscribe.py @@ -9,69 +9,13 @@ from pathlib import Path from unittest.mock import patch, MagicMock -from dataclasses import fields import pytest from hop import subscribe -from hop.models import GCNCircular, VOEvent, MessageBlob - - -def content_mock(message_model): - """Mock content to pass during the message_model creation since MagicMock() - is unable to mock __init__ of the model dataclass in tests. - """ - content = {} - for field in fields(message_model): - content.update({field.name: "test"}) - return content - - -@pytest.mark.parametrize( - "message", - [ - {"format": "voevent", "content": content_mock(VOEvent)}, - {"format": "circular", "content": content_mock(GCNCircular)}, - {"format": "blob", "content": content_mock(MessageBlob)}, - {"format": "other", "content": "other"}, - ["wrong_datatype"], - {"wrong_key": "value"}, - ], -) -def test_classify_message(message, message_parameters_dict): - - # test a non-dict message - if not isinstance(message, dict): - with pytest.raises(ValueError): - test_model = subscribe.classify_message(message) - return - # test a dict message with wrong key values - elif not (("format" in message) and ("content" in message)): - with pytest.raises(KeyError): - test_model = subscribe.classify_message(message) - return - - message_format = message["format"] - - # test an invalid format - if message_format not in message_parameters_dict: - with pytest.raises(ValueError): - test_model = subscribe.classify_message(message) - return - - # load parameters from conftest for valid formats - message_parameters = message_parameters_dict[message_format] - model_name = message_parameters["model_name"] - expected_model = message_parameters["expected_model"] - - # test valid formats - with patch(f"hop.models.{model_name}", MagicMock()): - test_model = subscribe.classify_message(message) - # verify the message is classified properly - assert isinstance(test_model, expected_model) +# test the subscribe printer for each message format -# test the subscribe printer for each message format @pytest.mark.parametrize("message_format", ["voevent", "circular", "blob"]) def test_print_message(message_format, message_parameters_dict):