Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hop client connections on unreliable networks #140

Open
mlinvill opened this issue May 21, 2021 · 6 comments
Open

hop client connections on unreliable networks #140

mlinvill opened this issue May 21, 2021 · 6 comments

Comments

@mlinvill
Copy link

mlinvill commented May 21, 2021

Description

I have noticed that if the network goes away while the hop client is connected to the server, no attempt is made to reconnect. Being able to reconnect upon disconnect/timeout might be a very nice configurable feature for production services which use the hop client. Even throwing a specific named exception might be good enough for client code to respond appropriately and attempt to reconnect.

Example

Here is a representative debug/stack trace from a very old client version:

2020-12-04 15:00:14,914 | model : INFO : processing messages from kafka://dev.hop.scimma.org:9092/snews-experiments-test
%5|1607278405.408|REQTMOUT|rdkafka#consumer-1| [thrd:sasl_ssl://kb-2.dev.hop.scimma.org:9092/2]: sasl_ssl://kb-2.dev.hop.scimma.org:9092/2: Timed out FetchRequest in flight (after 60519ms, timeout #0)
%4|1607278405.408|REQTMOUT|rdkafka#consumer-1| [thrd:sasl_ssl://kb-2.dev.hop.scimma.org:9092/2]: sasl_ssl://kb-2.dev.hop.scimma.org:9092/2: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%3|1607278405.409|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://kb-2.dev.hop.scimma.org:9092/2]: sasl_ssl://kb-2.dev.hop.scimma.org:9092/2: 1 request(s) timed out: disconnect (after 166388889ms in state UP)
2020-12-06 13:13:25,409 | model : ERROR : internal kafka error: KafkaError{code=_TIMED_OUT,val=-185,str="sasl_ssl://kb-2.dev.hop.scimma.org:9092/2: 1 request(s) timed out: disconnect (after 166388889ms in state UP)"}
2020-12-06 13:13:25,729 | model : INFO : shutting down
2020-12-06 13:13:25,732 | model : DEBUG : shutting down producer
2020-12-06 13:13:25,733 | model : DEBUG : flushed all messages
Traceback (most recent call last):
  File "/home/mlinvill/.local/share/virtualenvs/SNalert-model-pilot-BFCIc8b3/bin/snews", line 8, in <module>
    sys.exit(main())
  File "/home/mlinvill/.local/share/virtualenvs/SNalert-model-pilot-BFCIc8b3/lib/python3.6/site-packages/snews/__main__.py", line 53, in main
    args.func(args)
  File "/home/mlinvill/.local/share/virtualenvs/SNalert-model-pilot-BFCIc8b3/lib/python3.6/site-packages/snews/model.py", line 169, in main
    model.run()
  File "/home/mlinvill/.local/share/virtualenvs/SNalert-model-pilot-BFCIc8b3/lib/python3.6/site-packages/snews/model.py", line 114, in run
    for msg, meta in self.source.read(metadata=True, autocommit=False):
  File "/home/mlinvill/.local/share/virtualenvs/SNalert-model-pilot-BFCIc8b3/lib/python3.6/site-packages/hop/io.py", line 257, in read
    for message in self._consumer.stream(autocommit=autocommit, **kwargs):
  File "/home/mlinvill/.local/share/virtualenvs/SNalert-model-pilot-BFCIc8b3/lib/python3.6/site-packages/adc/consumer.py", line 110, in _stream_forever
    messages = self._consumer.consume(batch_size, batch_timeout.total_seconds())
  File "/home/mlinvill/.local/share/virtualenvs/SNalert-model-pilot-BFCIc8b3/lib/python3.6/site-packages/adc/errors.py", line 22, in log_client_errors
    raise(KafkaException.from_kafka_error(kafka_error))
adc.errors.KafkaException: Error communicating with Kafka: code=_TIMED_OUT sasl_ssl://kb-2.dev.hop.scimma.org:9092/2: 1 request(s) timed out: disconnect (after 166388889ms in state UP)
 
@adambrazier
Copy link
Contributor

We are discussing this today/tomorrow (apologies for late reply!)

@spenczar
Copy link
Contributor

spenczar commented Jun 3, 2021

This certainly seems desirable, yeah. I think that you might be able to achieve what you want today with something like this:

import time
import random
import adc.errors
from hop import Stream


def consume_stream_with_reconnections(stream_addr, handler, max_errors=5):
    error_count = 0
    while True:
        try:
            stream = Stream()
            with stream.open(stream_addr, "r") as s:
                for message in s:
                    handler(message)
                    error_count = 0
                return
        except adc.errors.KafkaException as e:
            error_count += 1
            if error_count >= max_errors:
                raise(e)
            # sleep with exponential backoff and a bit of jitter.
            time.sleep((1.5**error_count) * (1 + random.random())/2)

You'd need to make some adjustments for auth, of course, but I think the basic idea is there.

This is kind of tricky stuff, so it would be nice to find a way to encapsulate it. Can you help me brainstorm on what a good API for this might be, @mlinvill?

It might be appealing to say that this should all happen magically inside for message in s - that is, that it should happen inside the __iter__ method of a consumer stream. We could reconnect in case of errors, maybe.

One downside of the code I have written here is that it will do a full reconnection, including setting up topic subscriptions, on every error. That may be overkill for some really little transient errors which don't require tearing down and reconnecting. In fact, all those reconnections may impose extra load on the Kafka brokers. I'm not sure how to quantify that problem, and my gut is that it's fine to ignore it for now.

@spenczar
Copy link
Contributor

spenczar commented Jun 3, 2021

If you install adc-streaming>=1.2.0, you might be able to do a little better than my code snippet above. In particular, you can check whether the error is retriable before attempting to retry - something like this:

except adc.errors.KafkaException as e:
    if not e.retriable:
        raise(e)
    [... etc ...]

@mlinvill
Copy link
Author

mlinvill commented Jun 4, 2021

I'm happy to help with this in any way I am able. It's probably obvious as I was submitting the issue that I have little knowledge of the hop-client code. I had envisioned perhaps an argument to the Stream instance (or open) to trigger this behavior of trying to reconnect (on network errors) for some sane length of time. Doing this in the iterator from the client perspective is pretty dreamy. I hadn't appreciated the kafka (re)subscribing overhead, however. Perhaps it's most sane to implement this feature in the ADC Consumer and Producer objects?

I didn't write the hop-SNalert-app client, so I'll spend some more time in the code and perhaps implement your suggestion as a test case to inform this discussion.

@cnweaver
Copy link
Contributor

After some investigation, @spenczar's consume_stream_with_reconnections example basically works with a couple of caveats:

  • There is currently a bug in confluent-kafka which obscures the types of errors produced, converting the expected adc.errors.KafkaException into a confusing SystemError. I have a fairly simple patch for it which can fix this, but I don't know how the upstream maintainers will react to it.
  • Right now adc.Consumer.subscribe calls describe_topic, which can emit a confluent_kafka.KafkaException instead of an adc.errors.KafkaException, which will break the loop as written. Either both exception types (which are similar but not identical) must be handled, or adc should be updated to be more consistent about what it allows to propagate out.

I think it's quite possible to encapsulate this feature into hop.io.Consumer if we choose, and doing so would still depend on fixing the current issues with error types.

@mlinvill
Copy link
Author

Sorry to ping such an old issue! @cnweaver has there been any progress on implementing this feature into hop.io.Consumer?

I'm looking to implement this in our production code and thought I should check in before doing so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants