Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
apacheGH-37796: [C++][Acero] Fix race condition caused by straggling …
…input in the as-of-join node (apache#37839) ### Rationale for this change ### What changes are included in this PR? While asofjoining some large parquet datasets with many row groups, I ran into a deadlock that I described here: apache#37796. Copy pasting below for convenience: 1. The left hand side of the asofjoin completes and is matched with the right hand tables, so `InputFinished` proceeds as [expected](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1323). So far so good 2. The right hand table(s) of the join are a huge dataset scan. They're still streaming and can legally still call `AsofJoinNode::InputReceived` all they want ([doc ref](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero8ExecNode13InputReceivedEP8ExecNode9ExecBatch)) 3. Each input batch is blindly pushed to the `InputState`s, which in turn defer to `BackpressureHandler`s to decide whether to pause inputs. ([code pointer](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1689)) 4. If enough batches come in right after `EndFromProcessThread` is called, then we might exceed the [high_threshold](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L575) and tell the input node to pause via the [BackpressureController](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L540) 5. At this point, the process thread has stopped for the asofjoiner, so the right hand table(s) won't be dequeue'd, meaning `BackpressureController::Resume()` will never be called. This causes a [deadlock](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero19BackpressureControl5PauseEv) TLDR this is caused by a straggling input node being paused due to backpressure _after_ the process thread has ended. And since every `PauseInput` needs a corresponding `ResumeInput` to exit gracefully, we deadlock. Turns out this is fairly easy to reproduce with small tables, if you make a slow input node composed of 1-row record batches with a synthetic delay. My solution is to: 1. Create a `ForceShutdown` hook that puts the input nodes in a resumed state, and for good measure we call `StopProducing` 2. Also for good measure, if nodes come after the process thread exits, we short circuit and return OK. This is because `InputReceived` can be called an arbitrary number of times after `StopProducing`, so it makes sense to not enqueue useless batches. ### Are these changes tested? Yes, I added a delay to the batches of one of the already-existing asofjoin backpressure tests. Checkout out `main`, we get a timeout failure. With my changes, it passes. I considered a more deterministic test, but I struggled to create callbacks in a way that wasn't invasive to the Asof implementation. The idea of using delays was inspired by things I saw in `source_node_test.cc` <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 3. Serve as another way to document the expected behavior of the code ### Are there any user-facing changes? No * Closes: apache#37796 Lead-authored-by: Jeremy Aguilon <jeraguilon@gmail.com> Co-authored-by: Jeremy Aguilon <jaguilon@hudson-trading.com> Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com> Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
- Loading branch information