[Improvement](pipeline) Terminate early for short-circuit join (#23378)

This commit is contained in:
Gabriel
2023-08-23 19:40:17 +08:00
committed by GitHub
parent f0c16ab20d
commit 9d1f2cd8e0
5 changed files with 49 additions and 6 deletions

View File

@ -134,6 +134,8 @@ public:
bool can_read() const { return _can_read; }
[[nodiscard]] virtual bool can_terminate_early() { return false; }
// Sink Data to ExecNode to do some stock work, both need impl with method: get_result
// `eos` means source is exhausted, exec node should do some finalize work
// Eg: Aggregation, Sort

View File

@ -216,6 +216,8 @@ public:
virtual bool can_write() { return false; } // for sink
[[nodiscard]] virtual bool can_terminate_early() { return false; }
/**
* The main method to execute a pipeline task.
* Now it is a pull-based pipeline and operators pull data from its child by this method.
@ -341,6 +343,8 @@ public:
~StreamingOperator() override = default;
[[nodiscard]] bool can_terminate_early() override { return _node->can_terminate_early(); }
Status prepare(RuntimeState* state) override {
_node->increase_ref();
_use_projection = _node->has_output_row_descriptor();

View File

@ -238,11 +238,11 @@ Status PipelineTask::execute(bool* eos) {
set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
return Status::OK();
}
if (!_source->can_read()) {
if (!source_can_read()) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
return Status::OK();
}
if (!_sink->can_write()) {
if (!sink_can_write()) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
return Status::OK();
}
@ -250,11 +250,11 @@ Status PipelineTask::execute(bool* eos) {
this->set_begin_execute_time();
while (!_fragment_context->is_canceled()) {
if (_data_state != SourceState::MORE_DATA && !_source->can_read()) {
if (_data_state != SourceState::MORE_DATA && !source_can_read()) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
break;
}
if (!_sink->can_write()) {
if (!sink_can_write()) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
break;
}

View File

@ -155,13 +155,48 @@ public:
return false;
}
virtual bool source_can_read() { return _source->can_read(); }
virtual bool source_can_read() { return _source->can_read() || ignore_blocking_source(); }
virtual bool runtime_filters_are_ready_or_timeout() {
return _source->runtime_filters_are_ready_or_timeout();
}
virtual bool sink_can_write() { return _sink->can_write(); }
/**
* Consider the query plan below:
*
* ExchangeSource JoinBuild1
* \ /
* JoinProbe1 (Right Outer) JoinBuild2
* \ /
* JoinProbe2 (Right Outer)
* |
* Sink
*
* Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should not be blocked by ExchangeSource
* because we have a determined conclusion that JoinProbe1/JoinProbe2 will also output 0 rows.
*
* Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked by Sink because JoinProbe2 will
* produce more data.
*
* Assume both JoinBuild2 outputs 0 rows this pipeline task should not be blocked by ExchangeSource
* and Sink because JoinProbe2 will always produce 0 rows and terminate early.
*
* In a nutshell, we should follow the rules:
* 1. if any operator in pipeline can terminate early, this task should never be blocked by source operator.
* 2. if the last operator (except sink) can terminate early, this task should never be blocked by sink operator.
*/
[[nodiscard]] virtual bool ignore_blocking_sink() { return _root->can_terminate_early(); }
[[nodiscard]] virtual bool ignore_blocking_source() {
for (size_t i = 1; i < _operators.size(); i++) {
if (_operators[i]->can_terminate_early()) {
return true;
}
}
return false;
}
virtual bool sink_can_write() { return _sink->can_write() || ignore_blocking_sink(); }
virtual Status finalize();

View File

@ -74,6 +74,8 @@ public:
virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
[[nodiscard]] bool can_terminate_early() override { return _short_circuit_for_probe; }
protected:
// Construct the intermediate blocks to store the results from join operation.
void _construct_mutable_join_block();