Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Oct 23, 2024
1 parent 4b086d2 commit 42bd17e
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 5 deletions.
11 changes: 9 additions & 2 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holde
namespace pipeline {

ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id,
RuntimeState* state)
RuntimeState* state, bool keep_order)
: HasTaskExecutionCtx(state),
_queue_capacity(0),
_is_finishing(false),
_query_id(query_id),
_dest_node_id(dest_node_id),
_fragment_state(state),
_context(state->get_query_ctx()) {}
_context(state->get_query_ctx()),
_keep_order(keep_order) {}

void ExchangeSinkBuffer::close() {
// Could not clear the queue here, because there maybe a running rpc want to
Expand Down Expand Up @@ -321,6 +322,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
dep->set_ready();
}
}
if (_keep_order) {
return Status::OK();
}
}

while (!broadcast_q.empty()) {
Expand Down Expand Up @@ -404,6 +408,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
static_cast<void>(brpc_request->release_block());
}
broadcast_q.pop();
if (_keep_order) {
return Status::OK();
}
}
if (is_empty) {
_turn_off_channel(id);
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {
// Each ExchangeSinkOperator have one ExchangeSinkBuffer
class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
public:
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, RuntimeState* state);
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, RuntimeState* state,
bool keep_order);
~ExchangeSinkBuffer() override = default;
void construct_request(TUniqueId);

Expand Down Expand Up @@ -255,6 +256,7 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx {

// only use to set_reach_limit
std::vector<ExchangeSinkLocalState*> _local_states;
const bool _keep_order;
};

} // namespace pipeline
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -733,12 +733,13 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
}

DataDistribution ExchangeSinkOperatorX::required_data_distribution() const {
if (_child && _enable_local_merge_sort) {
if (_child) {
// SORT_OPERATOR -> DATA_STREAM_SINK_OPERATOR
// SORT_OPERATOR -> LOCAL_MERGE_SORT -> DATA_STREAM_SINK_OPERATOR
if (auto sort_source = std::dynamic_pointer_cast<SortSourceOperatorX>(_child);
sort_source && sort_source->use_local_merge()) {
// Sort the data local
_keep_order = true;
return ExchangeType::LOCAL_MERGE_SORT;
}
}
Expand All @@ -749,7 +750,7 @@ void ExchangeSinkOperatorX::create_buffer() {
PUniqueId id;
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, state());
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, state(), _keep_order);
for (const auto& _dest : _dests) {
const auto& dest_fragment_instance_id = _dest.fragment_instance_id;
_sink_buffer->construct_request(dest_fragment_instance_id);
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
size_t _data_processed = 0;
int _writer_count = 1;
const bool _enable_local_merge_sort;

mutable bool _keep_order = false;
};

} // namespace pipeline
Expand Down

0 comments on commit 42bd17e

Please sign in to comment.