From e3d6b9beac6924fd6914940eb74b248a5167f05b Mon Sep 17 00:00:00 2001 From: Jeremy Aguilon Date: Tue, 24 Oct 2023 09:22:01 -0400 Subject: [PATCH] GH-37796: [C++][Acero] Fix race condition caused by straggling input in the as-of-join node (#37839) ### Rationale for this change ### What changes are included in this PR? While asofjoining some large parquet datasets with many row groups, I ran into a deadlock that I described here: https://github.com/apache/arrow/issues/37796. Copy pasting below for convenience: 1. The left hand side of the asofjoin completes and is matched with the right hand tables, so `InputFinished` proceeds as [expected](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1323). So far so good 2. The right hand table(s) of the join are a huge dataset scan. They're still streaming and can legally still call `AsofJoinNode::InputReceived` all they want ([doc ref](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero8ExecNode13InputReceivedEP8ExecNode9ExecBatch)) 3. Each input batch is blindly pushed to the `InputState`s, which in turn defer to `BackpressureHandler`s to decide whether to pause inputs. ([code pointer](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1689)) 4. If enough batches come in right after `EndFromProcessThread` is called, then we might exceed the [high_threshold](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L575) and tell the input node to pause via the [BackpressureController](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L540) 5. At this point, the process thread has stopped for the asofjoiner, so the right hand table(s) won't be dequeue'd, meaning `BackpressureController::Resume()` will never be called. This causes a [deadlock](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero19BackpressureControl5PauseEv) TLDR this is caused by a straggling input node being paused due to backpressure _after_ the process thread has ended. And since every `PauseInput` needs a corresponding `ResumeInput` to exit gracefully, we deadlock. Turns out this is fairly easy to reproduce with small tables, if you make a slow input node composed of 1-row record batches with a synthetic delay. My solution is to: 1. Create a `ForceShutdown` hook that puts the input nodes in a resumed state, and for good measure we call `StopProducing` 2. Also for good measure, if nodes come after the process thread exits, we short circuit and return OK. This is because `InputReceived` can be called an arbitrary number of times after `StopProducing`, so it makes sense to not enqueue useless batches. ### Are these changes tested? Yes, I added a delay to the batches of one of the already-existing asofjoin backpressure tests. Checkout out `main`, we get a timeout failure. With my changes, it passes. I considered a more deterministic test, but I struggled to create callbacks in a way that wasn't invasive to the Asof implementation. The idea of using delays was inspired by things I saw in `source_node_test.cc`