[pipelineX](fix) fix correctness problem (#23823)

This commit is contained in:
Gabriel
2023-09-04 15:04:49 +08:00
committed by GitHub
parent 9bca52d002
commit ff92978392
7 changed files with 28 additions and 29 deletions

View File

@ -186,8 +186,8 @@ Status HashJoinProbeOperatorX::get_block(RuntimeState* state, vectorized::Block*
local_state.init_for_probe(state);
if (need_more_input_data(state)) {
local_state._child_block->clear_column_data();
RETURN_IF_ERROR(_child_x->get_block(state, local_state._child_block.get(),
local_state._child_source_state));
RETURN_IF_ERROR(_child_x->get_next_after_projects(state, local_state._child_block.get(),
local_state._child_source_state));
source_state = local_state._child_source_state;
if (local_state._child_block->rows() == 0 &&
local_state._child_source_state != SourceState::FINISHED) {

View File

@ -90,7 +90,8 @@ private:
vectorized::ColumnUInt8::MutablePtr _null_map_column;
// for cases when a probe row matches more than batch size build rows.
bool _is_any_probe_match_row_output = false;
std::unique_ptr<vectorized::HashTableCtxVariants> _process_hashtable_ctx_variants = nullptr;
std::unique_ptr<vectorized::HashTableCtxVariants> _process_hashtable_ctx_variants =
std::make_unique<vectorized::HashTableCtxVariants>();
RuntimeProfile::Counter* _probe_expr_call_timer;
RuntimeProfile::Counter* _probe_next_timer;

View File

@ -61,7 +61,7 @@ Status NestedLoopJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo&
}
Status NestedLoopJoinProbeLocalState::close(RuntimeState* state) {
_left_block.clear();
_child_block->clear();
vectorized::Blocks tmp_build_blocks;
_shared_state->build_blocks.swap(tmp_build_blocks);
@ -137,14 +137,14 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta
// We should try to join rows if there still are some rows from probe side.
while (_join_block.rows() < state->batch_size()) {
while (_current_build_pos == _shared_state->build_blocks.size() ||
_left_block_pos == _left_block.rows()) {
_left_block_pos == _child_block->rows()) {
// if left block is empty(), do not need disprocess the left block rows
if (_left_block.rows() > _left_block_pos) {
if (_child_block->rows() > _left_block_pos) {
_left_side_process_count++;
}
_reset_with_next_probe_row();
if (_left_block_pos < _left_block.rows()) {
if (_left_block_pos < _child_block->rows()) {
if constexpr (set_probe_side_flag) {
_probe_offset_stack.push(mutable_join_block.rows());
}
@ -305,14 +305,14 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableB
} else {
if (!p._is_mark_join) {
auto new_size = column_size;
DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows());
DCHECK_LE(_left_block_start_pos + _left_side_process_count, _child_block->rows());
for (int j = _left_block_start_pos;
j < _left_block_start_pos + _left_side_process_count; ++j) {
if (_cur_probe_row_visited_flags[j] == IsSemi) {
new_size++;
for (size_t i = 0; i < p._num_probe_side_columns; ++i) {
const vectorized::ColumnWithTypeAndName src_column =
_left_block.get_by_position(i);
_child_block->get_by_position(i);
if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) {
DCHECK(p._join_op == TJoinOp::FULL_OUTER_JOIN);
assert_cast<vectorized::ColumnNullable*>(dst_columns[i].get())
@ -341,13 +341,13 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableB
*dst_columns[dst_columns.size() - 1])
.get_data();
mark_data.reserve(mark_data.size() + _left_side_process_count);
DCHECK_LT(_left_block_pos, _left_block.rows());
DCHECK_LT(_left_block_pos, _child_block->rows());
for (int j = _left_block_start_pos;
j < _left_block_start_pos + _left_side_process_count; ++j) {
mark_data.emplace_back(IsSemi != _cur_probe_row_visited_flags[j]);
for (size_t i = 0; i < p._num_probe_side_columns; ++i) {
const vectorized::ColumnWithTypeAndName src_column =
_left_block.get_by_position(i);
_child_block->get_by_position(i);
DCHECK(p._join_op != TJoinOp::FULL_OUTER_JOIN);
dst_columns[i]->insert_from(*src_column.column, j);
}
@ -362,7 +362,7 @@ void NestedLoopJoinProbeLocalState::_append_left_data_with_null(
auto& dst_columns = mutable_block.mutable_columns();
DCHECK(p._is_mark_join);
for (size_t i = 0; i < p._num_probe_side_columns; ++i) {
const vectorized::ColumnWithTypeAndName& src_column = _left_block.get_by_position(i);
const vectorized::ColumnWithTypeAndName& src_column = _child_block->get_by_position(i);
if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) {
auto origin_sz = dst_columns[i]->size();
DCHECK(p._join_op == TJoinOp::RIGHT_OUTER_JOIN ||
@ -397,7 +397,7 @@ void NestedLoopJoinProbeLocalState::_process_left_child_block(
auto& dst_columns = mutable_block.mutable_columns();
const int max_added_rows = now_process_build_block.rows();
for (size_t i = 0; i < p._num_probe_side_columns; ++i) {
const vectorized::ColumnWithTypeAndName& src_column = _left_block.get_by_position(i);
const vectorized::ColumnWithTypeAndName& src_column = _child_block->get_by_position(i);
if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) {
auto origin_sz = dst_columns[i]->size();
DCHECK(p._join_op == TJoinOp::RIGHT_OUTER_JOIN ||
@ -484,7 +484,7 @@ bool NestedLoopJoinProbeOperatorX::need_more_input_data(RuntimeState* state) con
}
Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized::Block* block,
SourceState source_state) {
SourceState source_state) const {
auto& local_state = state->get_local_state(id())->cast<NestedLoopJoinProbeLocalState>();
COUNTER_UPDATE(local_state._probe_rows_counter, block->rows());
local_state._cur_probe_row_visited_flags.resize(block->rows());
@ -514,8 +514,8 @@ Status NestedLoopJoinProbeOperatorX::get_block(RuntimeState* state, vectorized::
auto& local_state = state->get_local_state(id())->cast<NestedLoopJoinProbeLocalState>();
if (need_more_input_data(state)) {
local_state._child_block->clear_column_data();
RETURN_IF_ERROR(_child_x->get_block(state, local_state._child_block.get(),
local_state._child_source_state));
RETURN_IF_ERROR(_child_x->get_next_after_projects(state, local_state._child_block.get(),
local_state._child_source_state));
source_state = local_state._child_source_state;
if (local_state._child_block->rows() == 0 &&
local_state._child_source_state != SourceState::FINISHED) {
@ -538,10 +538,10 @@ Status NestedLoopJoinProbeOperatorX::get_block(RuntimeState* state, vectorized::
}
Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
SourceState& source_state) const {
auto& local_state = state->get_local_state(id())->cast<NestedLoopJoinProbeLocalState>();
if (_is_output_left_side_only) {
RETURN_IF_ERROR(local_state._build_output_block(&local_state._left_block, block));
RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(), block));
source_state =
local_state._shared_state->left_side_eos ? SourceState::FINISHED : source_state;
local_state._need_more_input_data = !local_state._shared_state->left_side_eos;

View File

@ -103,8 +103,8 @@ private:
}
if constexpr (SetProbeSideFlag) {
int end = filter.size();
for (int i = _left_block_pos == _left_block.rows() ? _left_block_pos - 1
: _left_block_pos;
for (int i = _left_block_pos == _child_block->rows() ? _left_block_pos - 1
: _left_block_pos;
i >= _left_block_start_pos; i--) {
int offset = 0;
if (!_probe_offset_stack.empty()) {
@ -193,10 +193,6 @@ private:
// Visited flags for current row in probe side.
std::vector<int8_t> _cur_probe_row_visited_flags;
size_t _current_build_pos = 0;
// _left_block must be cleared before calling get_next(). The child node
// does not initialize all tuple ptrs in the row, only the ones that it
// is responsible for.
vectorized::Block _left_block;
vectorized::MutableColumns _dst_columns;
std::stack<uint16_t> _build_offset_stack;
std::stack<uint16_t> _probe_offset_stack;
@ -217,9 +213,10 @@ public:
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
Status push(RuntimeState* state, vectorized::Block* input_block, SourceState source_state);
Status push(RuntimeState* state, vectorized::Block* input_block,
SourceState source_state) const;
Status pull(doris::RuntimeState* state, vectorized::Block* output_block,
SourceState& source_state);
SourceState& source_state) const;
const RowDescriptor& intermediate_row_desc() const override {
return _old_version_flag ? _row_descriptor : *_intermediate_row_desc;
}

View File

@ -220,7 +220,8 @@ struct HashJoinSharedState : public JoinSharedState {
vectorized::Sizes probe_key_sz;
const std::vector<TupleDescriptor*> build_side_child_desc;
size_t build_exprs_size = 0;
std::shared_ptr<std::vector<vectorized::Block>> build_blocks;
std::shared_ptr<std::vector<vectorized::Block>> build_blocks =
std::make_shared<std::vector<vectorized::Block>>();
};
class HashJoinDependency final : public Dependency {

View File

@ -112,7 +112,7 @@ void PipelineXLocalStateBase::clear_origin_block() {
}
Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* origin_block,
vectorized::Block* output_block) {
vectorized::Block* output_block) const {
auto local_state = state->get_local_state(id());
SCOPED_TIMER(local_state->_projection_timer);
using namespace vectorized;

View File

@ -225,7 +225,7 @@ public:
/// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc
Status do_projections(RuntimeState* state, vectorized::Block* origin_block,
vectorized::Block* output_block);
vectorized::Block* output_block) const;
protected:
template <typename Dependency>