Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API overhaul in moving to adc-streaming 1.x #82

Merged
merged 31 commits into from
Jul 3, 2020
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
bd8cd9e
io.py: revamp to comply with new version of adc-streaming
myNameIsPatrick Jun 26, 2020
146d822
remove outdated options in publish, subscribe for tests to pass
myNameIsPatrick Jun 26, 2020
0d7435c
io.py: add a Deserializer enum for seamless message unpacking
myNameIsPatrick Jun 26, 2020
cf73b0a
standardize model methods throughout
myNameIsPatrick Jun 26, 2020
3e8bc92
remove config-based CLI options for publish/subscribe, add in consume…
myNameIsPatrick Jun 26, 2020
278eb91
replace classify_message usage with the Deserializer
myNameIsPatrick Jun 26, 2020
439bc6d
address issues raised by pylint
myNameIsPatrick Jun 26, 2020
99a7dde
io.py: split pack, unpacking of messages into separate methods for ea…
myNameIsPatrick Jun 27, 2020
3f221dc
test_cli.py: add hop version test
myNameIsPatrick Jun 27, 2020
1ec8c60
__init__.py: remove silent ImportError exception
myNameIsPatrick Jun 27, 2020
75b66d5
simplify handling of argparse options in CLI commands
myNameIsPatrick Jun 27, 2020
1210e54
add auth module with CLI utils, expose Auth class used to authenticat…
myNameIsPatrick Jun 27, 2020
2491f44
test_cli.py: add --no-auth option to avoid script failures
myNameIsPatrick Jun 27, 2020
ec6b96b
README.md: add docs url, put badges on one line
myNameIsPatrick Jun 27, 2020
9b70850
io.Stream.open: generate a random group ID if one is not provided
myNameIsPatrick Jun 27, 2020
f75cc20
add missing api docs for auth, version modules
myNameIsPatrick Jun 27, 2020
1742baa
test_auth.py: add unit tests for auth module
myNameIsPatrick Jun 29, 2020
531c9f1
update README.md and quickstart with updated API
myNameIsPatrick Jun 29, 2020
1742336
update rest of docs with API change
myNameIsPatrick Jun 29, 2020
34fccdd
setup.cfg: ignore unused local variables in unit tests
myNameIsPatrick Jun 29, 2020
317a7bf
fix docstrings throughout for consistency
myNameIsPatrick Jun 29, 2020
03189fd
quickstart: put auth paragraph first in quickstart, fix headers and r…
myNameIsPatrick Jun 29, 2020
845a515
io.Stream: log the random group ID used if auto-generating one
myNameIsPatrick Jun 29, 2020
70f8619
io.py: switch _Metadata to a dataclass, fix bug in passing metadata k…
myNameIsPatrick Jun 30, 2020
e300338
auth.py: default to SCRAM-SHA-512 as SASL mechanism
myNameIsPatrick Jun 30, 2020
04b25a8
fix comment typos, aesthesics
myNameIsPatrick Jun 30, 2020
ca4ead0
recipe/meta.yaml: update adc-streaming to 1.x
myNameIsPatrick Jun 30, 2020
0578885
allow for arbitrary message formats and unstructured JSON-serializabl…
myNameIsPatrick Jun 30, 2020
5c03506
test_io.py: add a round-trip pack/unpack test for different message t…
myNameIsPatrick Jul 1, 2020
ec10758
models.py: fix issues with dict, string representation of MessageBlob
myNameIsPatrick Jul 3, 2020
1ce155a
publish.py: fix issue with default --format value
myNameIsPatrick Jul 3, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ lint :
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=100 --statistics
flake8 . --count --exit-zero --max-complexity=12 --max-line-length=100 --statistics

.PHONY: format
format :
Expand Down
34 changes: 22 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,41 @@ Hop Client
=============

![](https://github.com/scimma/hop-client/workflows/build/badge.svg)

[![codecov](https://codecov.io/gh/scimma/hop-client/branch/master/graph/badge.svg)](https://codecov.io/gh/scimma/hop-client)

| | |
| ------------ | ------ |
| **Docs:** | https://hop-client.readthedocs.io/en/stable/ |

**hop-client** is a pub-sub client library for Multimessenger Astrophysics.

## Quickstart

publish a message to kafka:
By default, authentication is enabled, reading in configuration settings
from `auth.conf`. The path to this configuration can be found by running
`hop auth locate`. One can initialize this configuration with default
settings by running `hop auth setup`. To disable authentication in the CLI
client, one can run `--no-auth`.

Publish a message:

```
hop publish kafka://hostname:port/gcn -f circular example.gcn3
hop publish kafka://hostname:port/gcn -f CIRCULAR example.gcn3
```

Subscribe to the earliest offset of a Kafka topic and print to stdout:
Example messages are provided in `tests/data` including:
* A GCN circular (`example.gcn3`)
* A VOEvent (`example_voevent.xml`)


Consume messages:

```
hop subscribe kafka://hostname:port/gcn -e
hop subscribe kafka://hostname:port/gcn -s EARLIEST
```

Two example messages (an RFC 822 formatted GCN circular `example.gcn3` and a VOEvent 2.0
schema xml `example_voevent.xml` are provided in `tests/data`.

Client [configuration](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
properties can be passed to `hop publish` via `-X property=value` or in a configuration
file specified by `-F <config-file>`, mimicking the behavior of `kafkacat`. This can be
used to connect to a Kafka broker with SSL authentication enabled, for example.
This will read messages from the gcn topic from the earliest offset
and read messages until an end of stream (EOS) is received.

## Installation

Expand Down
2 changes: 2 additions & 0 deletions doc/api/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ hop-client API
.. toctree::
:maxdepth: 2

auth
cli
io
publish
subscribe
models
version
7 changes: 7 additions & 0 deletions doc/api/auth.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.. _auth:

hop.auth
################

.. automodule:: hop.auth
:members:
7 changes: 7 additions & 0 deletions doc/api/version.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.. _version:

hop.version
################

.. automodule:: hop.version
:members:
2 changes: 1 addition & 1 deletion doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ User's Guide

user/installation
user/quickstart
user/commands
user/stream
user/commands

API Reference
-------------
Expand Down
28 changes: 21 additions & 7 deletions doc/user/commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,29 @@ Commands

**hop-client** provides a command line interface for various tasks:

:code:`hop publish`: parse and publish messages such as GCN circulars and notices
:code:`hop subscribe`: parse and print messages such as GCN circulars and notices
:code:`hop auth`: Authentication utilities
:code:`hop publish`: Publish messages such as GCN circulars and notices
:code:`hop subscribe`: Listen to messages such as GCN circulars and notices
:code:`hop version`: Print version dependencies of :code:`hop-client`

:code:`hop auth`
~~~~~~~~~~~~~~~~~~~~~~

This command allows a user to handle auth-based configuration.

.. program-output:: hop auth --help
:nostderr:

:code:`hop publish`
~~~~~~~~~~~~~~~~~~~~~~

This command allows a user to parse an `RFC 822 formatted GCN circular <https://gcn.gsfc.nasa.gov/gcn3_circulars.html>`_, an XML formatted `GCN/VOEvent notice <https://gcn.gsfc.nasa.gov/tech_describe.html>`_, or an unstructured string or text file and publish it as via Kafka. Structured messages
such as GCN circulars and VOEvents are published as JSON-formatted text.
This command allows a user to publish various structured and unstructured messages, including:

* `RFC 822 formatted GCN circular <https://gcn.gsfc.nasa.gov/gcn3_circulars.html>`_
* An XML formatted `GCN/VOEvent notice <https://gcn.gsfc.nasa.gov/tech_describe.html>`_
* Unstructured messages such as byte-encoded or JSON-serializable data.

Structured messages such as GCN circulars and VOEvents are published as JSON-formatted text.

.. program-output:: hop publish --help
:nostderr:
Expand All @@ -24,8 +39,7 @@ such as GCN circulars and VOEvents are published as JSON-formatted text.
:code:`hop subscribe`
~~~~~~~~~~~~~~~~~~~~~~

This command allows a user to parse a message from a Kafka topic and display it on stdout via
a pretty-print or JSON dump.
This command allows a user to subscribe to messages and print them to stdout.

.. program-output:: hop subscribe --help
:nostderr:
Expand All @@ -36,4 +50,4 @@ a pretty-print or JSON dump.
This command prints all the versions of the dependencies

.. program-output:: hop version --help
:nostderr:
:nostderr:
107 changes: 79 additions & 28 deletions doc/user/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,63 +9,114 @@ Quickstart
Using the CLI
-------------

Publish a message
By default, authentication is enabled, reading in configuration settings
from :code:`auth.conf`. The path to this configuration can be found by running
:code:`hop auth locate`. One can initialize this configuration with default
settings by running :code:`hop auth setup`. To disable authentication in the CLI
client, one can run :code:`--no-auth`.

Publish messages
^^^^^^^^^^^^^^^^^

.. code:: bash

hop publish kafka://hostname:port/gcn -f circular example.gcn3
hop publish kafka://hostname:port/gcn -f CIRCULAR example.gcn3

Two example messages (an RFC 822 formatted GCN circular (:code:`example.gcn3`) and a VOEvent 2.0
schema xml (:code: `example_voevent.xml`)) are provided in :code:`tests/data`.
Example messages are provided in :code:`tests/data` including:

Client `configuration <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>`_
properties can be passed to :code:`hop publish` via :code:`-X property=value` or in a configuration
file specified by :code:`-F <config-file>`, mimicking the behavior of :code:`kafkacat`. This can be
used to connect to a Kafka broker with SSL authentication enabled, for example.
* A GCN circular (:code:`example.gcn3`)
* A VOEvent (:code:`example_voevent.xml`)

Consume a message
Consume messages
^^^^^^^^^^^^^^^^^

.. code:: bash

hop subscribe kafka://hostname:port/gcn -e
hop subscribe kafka://hostname:port/gcn -s EARLIEST

This will read messages from the gcn topic from the earliest offset
and read messages until an end of stream (EOS) is received.

Using the Python API
----------------------

Publish messages
spenczar marked this conversation as resolved.
Show resolved Hide resolved
^^^^^^^^^^^^^^^^^

Using the python API, we can publish various types of messages, including
structured messages such as GCN Circulars and VOEvents:

.. code:: python

Configuration properties can be passed in a manner identical to :code:`hop publish` above.
from hop import stream
from hop.models import GCNCircular

# read in a GCN circular
with open("path/to/circular.gcn3", "r") as f:
circular = GCNCircular.load(f)

Reading messages from stream
----------------------------
with stream.open("kafka://hostname:port/topic", "w") as s:
s.write(circular)

The hop client supports a python-based API for reading messages from a stream, as follows:
In addition, we can also publish unstructured messages as long as they are
JSON serializable:

.. code:: python

from hop import stream

with stream.open("kafka://hostname:port/topic", "r", format="json") as s:
for idx, msg in s:
print(msg)
with stream.open("kafka://hostname:port/topic", "w") as s:
s.write({"my": "message"})

This block will hang forever, listening to new messages and processing them as they arrive.
By default, this will only process new messages since the connection was opened. The :code:`start_at`
option lets you control where in the stream you can start listening from. For example,
if you'd like to listen to all messages stored in a topic, you can do:
By default, authentication is enabled for the Hop broker. In order to authenticate, one
can pass in an :code:`Auth` instance with credentials:

.. code:: python

with stream.open("kafka://hostname:port/topic", "r", format="json", start_at="latest") as s:
for idx, msg in s:
print(msg)
from hop import stream
from hop.auth import Auth

Writing messages to stream
--------------------------
auth = Auth("my-username", "my-password")

We can also publish messages to a topic, as follows:
with stream.open("kafka://hostname:port/topic", "w", auth=auth) as s:
s.write({"my": "message"})

A convenience function is also provided to read in auth configuration in the same way
as in the CLI client:

.. code:: python

from hop import stream
from hop.auth import load_auth

with stream.open("kafka://hostname:port/topic", "w", format="json") as s:
with stream.open("kafka://hostname:port/topic", "w", auth=load_auth()) as s:
s.write({"my": "message"})

Consume messages
^^^^^^^^^^^^^^^^^

One can consume messages through the python API as follows:

.. code:: python

from hop import stream

with stream.open("kafka://hostname:port/topic", "r") as s:
for message in s:
print(message)

This will listen to the Hop broker, listening to new messages and printing them to
stdout as they arrive until there are no more messages in the stream.
By default, this will only process new messages since the connection was opened.
The :code:`start_at` option lets you control where in the stream you can start listening
from. For example, if you'd like to listen to all messages stored in a topic, you can do:

.. code:: python

from hop import stream
from hop.io import StartPosition

with stream.open("kafka://hostname:port/topic", "r", start_at=StartPosition.EARLIEST) as s:
for message in s:
print(message)

49 changes: 32 additions & 17 deletions doc/user/stream.rst
Original file line number Diff line number Diff line change
@@ -1,45 +1,60 @@
==========
Stream
==========
================
Streaming
================

.. contents::
:local:

The Stream Object
-----------------

The Stream object allows a user to connect to a Kafka broker and read
The :code:`Stream` object allows a user to connect to a Kafka broker and read
in a variety of alerts, such as GCN circulars. It also allows one to
specify default settings used across all streams opened from the Stream
instance.

Let's open up a stream and show the Stream object in action:
Let's open up a stream and show the :code:`Stream` object in action:

.. code:: python

from hop import Stream

stream = Stream(format="json")
stream = Stream(persist=True)
with stream.open("kafka://hostname:port/topic", "r") as s:
for idx, msg in s:
print(msg)
for message in s:
print(message)

A common use case is to not specify any defaults, so a shorthand is
provided for using one:
The :code:`persist` option allows one to listen to messages forever
and keeps the connection open after an end of stream (EOS) is received.
This is to allow long-lived connections where one may set up a service
to process incoming GCNs, for example.

A common use case is to not specify any defaults ahead of time,
so a shorthand is provided for using one:

.. code:: python

from hop import stream

with stream.open("kafka://hostname:port/topic", "r") as s:
for _, msg in s:
print(msg)
for message in s:
print(message)

Both :code:`stream.open` and :code:`Stream` can be passed with similar
configuration options, including:

* :code:`auth`: An :code:`auth.Auth` instance to provide authentication
* :code:`start_at`: The message offset to start at, by passing in an :code:`io.StartPosition`
* :code:`persist`: Whether to keep a long-live connection to the client beyond EOS

You can also configure the open stream handle with various options,
including a timeout, a progress bar, and a message limit:
In addition, :code:`stream.open` provides an option to retrieve Kafka message metadata as well
as the message itself, such as the Kafka topic, key, timestamp and offset. This may
be useful in the case of listening to multiple topics at once:

.. code:: python

with stream.open("kafka://hostname:port/topic", "r") as s:
for _, msg in s(timeout=10, limit=20):
print(msg)
from hop import stream

with stream.open("kafka://hostname:port/topic1,topic2", "r", metadata=True) as s:
for message, metadata in s:
print(message, metadata.topic)
5 changes: 1 addition & 4 deletions hop/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
__path__ = __import__("pkgutil").extend_path(__path__, __name__) # declare namespace

try:
from ._version import version as __version__
except ImportError:
pass
from ._version import version as __version__

from .io import Stream

Expand Down
Loading