-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ConcurrentMappingIterable: introduce futuretools.FutureResultCollection to optimize FDFO concurrent mapping (closes #20) #21
Conversation
…on to optimize FDFO concurrent mapping
21838bd
to
dcb20ea
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
❗ Your organization needs to install the Codecov GitHub app to enable full functionality. Additional details and impacted files@@ Coverage Diff @@
## main #21 +/- ##
=========================================
Coverage 100.00% 100.00%
=========================================
Files 13 14 +1
Lines 1198 1248 +50
=========================================
+ Hits 1198 1248 +50
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
dcb20ea
to
fcda22b
Compare
fyi @erezsh 👁️ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me.
I added a couple of style comments that you're welcome to ignore.
streamable/futuretools.py
Outdated
self._results.put(future.result()) | ||
|
||
def add_future(self, future: "Future[T]") -> None: | ||
future.add_done_callback(lambda f: self._done_callback(f)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just:
future.add_done_callback(self._done_callback)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch! (no reason, I dragged this lambda from previous iterations 🤦🏻)
streamable/iters.py
Outdated
# wait, queue, yield | ||
while future_results: | ||
result = next(future_results) | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be a little bit cleaner with with suppress(StopIteration): ...
Also applies to the except above, by putting it outside the while.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh great!
Optimize First Done First Out aka unordereded mapping.
Removes the overhead of using futures waiting functions (
asyncio.wait(..., FIRST_COMPLETED)
orconcurrent.futures.wait(..., FIRST_COMPLETED)
) that register and remove waiters/callbacks for all futures for eachnext
.