Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drain result queue on stop #11

Open
kinnarr opened this issue Dec 7, 2020 · 2 comments
Open

Drain result queue on stop #11

kinnarr opened this issue Dec 7, 2020 · 2 comments
Assignees
Labels
question Further information is requested

Comments

@kinnarr
Copy link
Member

kinnarr commented Dec 7, 2020

The stop method is waiting with result_queue.join(). So we need to empty all results from queue before stop

@kinnarr
Copy link
Member Author

kinnarr commented Jan 22, 2021

For normal stops, this actually prevents data loss. The workflow is
(https://github.com/metricq/metricq-source-bacnet/blob/master/metricq_source_bacnet/source.py#L197-L218)

  1. the stop function is called
  2. all worker tasks are stopped
  3. _bacnet_reader is stopped
    1. _result_queue.join() in the stop function
    2. main task empties _result_queue as usual
  4. _result_queue.join() returns
  5. main task is stopped with _main_task_stop_future
  6. call super.stop(...)

If stop is called because of an exception in the main task, _result_queue.join() may block. So maybe we should call join with a timeout if exception is not None

@kinnarr kinnarr self-assigned this Jan 22, 2021
@kinnarr kinnarr added the question Further information is requested label Jan 22, 2021
@kinnarr
Copy link
Member Author

kinnarr commented Feb 3, 2021

We need a timeout for result_queue.join in case the data connection died

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant