Skip to content

Commit

Permalink
Expose the capability from adc-streaming 2.4.0 to publish to multiple…
Browse files Browse the repository at this point in the history
… topics
  • Loading branch information
cnweaver committed Jun 14, 2024
1 parent f597eb6 commit 4574bbf
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 29 deletions.
25 changes: 22 additions & 3 deletions doc/user/stream.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,32 @@ Kafka topics, and takes the form:

.. code:: bash
kafka://[username@]broker/topic[,topic2[,...]]
kafka://[username@]broker/[topic[,topic2[,...]]]
The broker takes the form :code:`hostname[:port]` and gives the URL to connect to a
Kafka broker. Optionally, a :code:`username` can be provided, which is used to select
among available credentials to use when communicating with the broker.
Finally, one can publish to a topic or subscribe to one or more topics to consume messages
from.
Finally, one can specify a number of topics to which to publish or subscribe.
Publishing to Multiple Topics
-------------------------------
A single stream object can be used to publish to multiple topics, and doing so uses resources
more efficiently by spawning fewer worker threads, opening fewer sockets, etc., than opening a
separate stream for each of several topics, but requires attention to one extra detail: When a
stream is opened for multiple topics, the topic must be specified when calling :code:`write()`,
in order to make unambiguous to which topic that particular message should be published:
.. code:: python
from hop import stream
with stream.open("kafka://hostname:port/topic1,topic2", "w") as s:
s.write({"my": "message"}, topic="topic2")
In fact, when opening a stream for writing, it is not necessary for the target URL to contain
a topic at all; if it does not, the topic to which to publish must always be specified when
calling :code:`write()`.
Committing Messages Manually
------------------------------
Expand Down
29 changes: 17 additions & 12 deletions hop/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ def open(self, url, mode="r", group_id=None, ignoretest=True, **kwargs):
in write mode or a :class:`Consumer` instance in read mode.
Raises:
ValueError: If the mode is not set to read/write, if more than
one topic is specified in write mode, or if more than one broker is specified
ValueError: If the mode is not set to read/write, if no topic
is specified in read mode, or if more than one broker is specified
"""
username, broker_addresses, topics = kafka.parse_kafka_url(url)
Expand All @@ -98,21 +98,20 @@ def open(self, url, mode="r", group_id=None, ignoretest=True, **kwargs):
logger.debug("connecting to addresses=%s username=%s topics=%s",
broker_addresses, group_id, topics)

if topics is None:
raise ValueError("no topic(s) specified in kafka URL")

if self.auth is not None:
credential = select_matching_auth(self.auth, broker_addresses[0], username)
else:
credential = None

if mode == "w":
if len(topics) != 1:
raise ValueError("must specify exactly one topic in write mode")
if topics is None or len(topics) != 1:
topics = [None]
if group_id is not None:
warnings.warn("group ID has no effect when opening a stream in write mode")
return Producer(broker_addresses, topics[0], auth=credential, **kwargs)
elif mode == "r":
if topics is None or len(topics) == 0:
raise ValueError("no topic(s) specified in kafka URL")
if group_id is None:
username = credential.username if credential is not None else None
group_id = _generate_group_id(username, 10)
Expand Down Expand Up @@ -220,7 +219,7 @@ def from_format(data, format, deserialize=True):
return models.JSONBlob(content=old)
# if we can't tell what the data is, pass it on unchanged
except (UnicodeDecodeError, json.JSONDecodeError):
logger.warning("Unknown message format; returning a Blob")
logger.info("Unknown message format; returning a Blob")
return models.Blob(content=message.value())

def load(self, input_):
Expand Down Expand Up @@ -470,7 +469,7 @@ def __init__(self, broker_addresses, topic, auth, **kwargs):
logger.info(f"publishing to topic: {topic}")

def write(self, message, headers=None,
delivery_callback=errors.raise_delivery_errors, test=False):
delivery_callback=errors.raise_delivery_errors, test=False, topic=None):
"""Write messages to a stream.
Expand All @@ -484,12 +483,15 @@ def write(self, message, headers=None,
is either delivered or permenantly fails to be delivered.
test: Message should be marked as a test message by adding a header
with key '_test'.
topic: The topic to which the message should be sent. This need not be specified if
the stream was opened with a URL containing exactly one topic name.
"""
message, headers = self._pack(message, headers, test=test)
self._producer.write(message, headers=headers, delivery_callback=delivery_callback)
self._producer.write(message, headers=headers, delivery_callback=delivery_callback,
topic=topic)

def write_raw(self, packed_message, headers=None,
delivery_callback=errors.raise_delivery_errors):
delivery_callback=errors.raise_delivery_errors, topic=None):
"""Write a pre-encoded message to the stream.
This is an advanced interface; for most purposes it is preferrable to use
Expand All @@ -502,9 +504,12 @@ def write_raw(self, packed_message, headers=None,
mapping strings to strings, or as a list of 2-tuples of strings.
delivery_callback: A callback which will be called when each message
is either delivered or permenantly fails to be delivered.
topic: The topic to which the message should be sent. This need not be specified if
the stream was opened with a URL containing exactly one topic name.
"""

self._producer.write(packed_message, headers=headers, delivery_callback=delivery_callback)
self._producer.write(packed_message, headers=headers, delivery_callback=delivery_callback,
topic=topic)

@staticmethod
def pack(message, headers=None, test=False, auth=None):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

# requirements
install_requires = [
"adc-streaming >= 2.2.0",
"adc-streaming >= 2.4.0",
"dataclasses ; python_version < '3.7'",
"fastavro >= 1.4.0",
"pluggy >= 0.11",
Expand Down
9 changes: 7 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,13 @@ def __init__(self, broker, topic):
self.broker = broker
self.topic = topic

def write(self, msg, headers=[], delivery_callback=None):
self.broker.write(self.topic, msg, headers)
def write(self, msg, headers=[], delivery_callback=None, topic=None):
if topic is None:
if self.topic is not None:
topic = self.topic
else:
raise Exception("No topic specified for write")
self.broker.write(topic, msg, headers)

def close(self):
pass
Expand Down
53 changes: 42 additions & 11 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,7 @@ def test_stream_stop(circular_msg):

def test_stream_write(circular_msg, circular_text, mock_broker, mock_producer):
topic = "gcn"
mock_adc_producer = mock_producer(mock_broker, topic)
expected_msg = make_message_standard(circular_msg)

fixed_uuid = uuid4()

auth = Auth("user", "password")
Expand All @@ -298,7 +296,8 @@ def test_stream_write(circular_msg, circular_text, mock_broker, mock_producer):
none_test_headers = [("_id", fixed_uuid.bytes), ("_sender", auth.username.encode("utf-8")),
('_test', b"true"), ("_format", b"circular")]

with patch("hop.io.producer.Producer", autospec=True, return_value=mock_adc_producer), \
mb = mock_broker
with patch("hop.io.producer.Producer", side_effect=lambda c: mock_producer(mb, c.topic)), \
patch("hop.io.uuid4", MagicMock(return_value=fixed_uuid)):

broker_url = f"kafka://localhost:port/{topic}"
Expand All @@ -307,10 +306,6 @@ def test_stream_write(circular_msg, circular_text, mock_broker, mock_producer):

stream = io.Stream(start_at=start_at, until_eos=until_eos, auth=auth)

# verify only 1 topic is allowed in write mode
with pytest.raises(ValueError):
stream.open("kafka://localhost:9092/topic1,topic2", "w")

# verify warning is raised when groupid is set in write mode
with pytest.warns(UserWarning):
stream.open("kafka://localhost:9092/topic1", "w", group_id="group")
Expand All @@ -337,14 +332,40 @@ def test_stream_write(circular_msg, circular_text, mock_broker, mock_producer):
s.close()
assert mock_broker.has_message(topic, expected_msg.value(), canonical_headers)

mock_broker.reset()
# more than one topics should now be allowed in write mode
with stream.open("kafka://localhost:9092/topic1,topic2", "w") as s:
with pytest.raises(Exception):
# however, a topic must be specified when calling write with multiple topics
# specified on construction
s.write(circular_msg, headers)

# selecting a topic explicitly when calling write should work
s.write(circular_msg, headers, topic="topic1")
assert mock_broker.has_message("topic1", expected_msg.value(), canonical_headers)
s.write(circular_msg, headers, topic="topic2")
assert mock_broker.has_message("topic2", expected_msg.value(), canonical_headers)

mock_broker.reset()
# no topic can also be specified in write mode
with stream.open("kafka://localhost:9092/", "w") as s:
with pytest.raises(Exception):
# however, a topic must be specified when calling write
s.write(circular_msg, headers)

s.write(circular_msg, headers, topic="topic1")
assert mock_broker.has_message("topic1", expected_msg.value(), canonical_headers)
s.write(circular_msg, headers, topic="topic2")
assert mock_broker.has_message("topic2", expected_msg.value(), canonical_headers)


def test_stream_write_raw(circular_msg, circular_text, mock_broker, mock_producer):
topic = "gcn"
mock_adc_producer = mock_producer(mock_broker, topic)
encoded_msg = io.Producer.pack(circular_msg)
headers = {"some header": "some value"}
canonical_headers = list(headers.items())
with patch("hop.io.producer.Producer", autospec=True, return_value=mock_adc_producer):
mb = mock_broker
with patch("hop.io.producer.Producer", side_effect=lambda c: mock_producer(mb, c.topic)):
stream = io.Stream(auth=False)

broker_url = f"kafka://localhost:9092/{topic}"
Expand All @@ -361,6 +382,14 @@ def test_stream_write_raw(circular_msg, circular_text, mock_broker, mock_produce
s.close()
assert mock_broker.has_message(topic, encoded_msg, canonical_headers)

with stream.open("kafka://localhost:9092/topic1,topic2", "w") as s:
with pytest.raises(Exception):
s.write_raw(encoded_msg, canonical_headers)
s.write_raw(encoded_msg, canonical_headers, topic="topic1")
assert mock_broker.has_message("topic1", encoded_msg, canonical_headers)
s.write_raw(encoded_msg, canonical_headers, topic="topic2")
assert mock_broker.has_message("topic2", encoded_msg, canonical_headers)


def test_stream_auth(auth_config, tmpdir):
# turning off authentication should give None for the auth property
Expand All @@ -385,7 +414,7 @@ def test_stream_auth(auth_config, tmpdir):
assert s4.auth == "blarg"


def test_stream_open(auth_config, tmpdir):
def test_stream_open(auth_config, mock_broker, mock_producer, tmpdir):
stream = io.Stream(auth=False)

# verify only read/writes are allowed
Expand All @@ -398,7 +427,7 @@ def test_stream_open(auth_config, tmpdir):
stream.open("bad://example.com/topic", "r")
assert "invalid kafka URL: must start with 'kafka://'" in err.value.args

# verify that URLs with no topic are rejected
# verify that URLs with no topic are rejected when reading
with pytest.raises(ValueError) as err:
stream.open("kafka://example.com/", "r")
assert "no topic(s) specified in kafka URL" in err.value.args
Expand All @@ -409,7 +438,9 @@ def test_stream_open(auth_config, tmpdir):
assert "Multiple broker addresses are not supported" in err.value.args

# verify that complete URLs are accepted
mb = mock_broker
with temp_config(tmpdir, auth_config) as config_dir, temp_environ(XDG_CONFIG_HOME=config_dir), \
patch("hop.io.producer.Producer", side_effect=lambda c: mock_producer(mb, c.topic)), \
patch("adc.consumer.Consumer.subscribe", MagicMock()) as subscribe:
stream = io.Stream()
# opening a valid URL for reading should succeed
Expand Down

0 comments on commit 4574bbf

Please sign in to comment.