diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index db6bae55b2..92763f9f27 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -410,7 +410,6 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st _build_expr_ctxs.push_back(ctx); const auto vexpr = _build_expr_ctxs.back()->root(); - const auto& data_type = vexpr->data_type(); bool null_aware = eq_join_conjunct.__isset.opcode && eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL; @@ -421,7 +420,10 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st _store_null_in_hash_table.emplace_back( null_aware || (_build_expr_ctxs.back()->root()->is_nullable() && build_stores_null)); + } + for (const auto& expr : _build_expr_ctxs) { + const auto& data_type = expr->root()->data_type(); if (!data_type->have_maximum_size_of_value()) { break; } @@ -589,6 +591,12 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state.init_short_circuit_for_probe(); if (source_state == SourceState::FINISHED) { + // Since the comparison of null values is meaningless, null aware left anti join should not output null + // when the build side is not empty. + if (!local_state._shared_state->build_blocks->empty() && + _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + local_state._shared_state->probe_ignore_null = true; + } local_state._dependency->set_ready_for_read(); } diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 34f8e6102a..0a0438720a 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -34,7 +34,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); - _probe_ignore_null = p._probe_ignore_null; + _shared_state->probe_ignore_null = p._probe_ignore_null; _probe_expr_ctxs.resize(p._probe_expr_ctxs.size()); for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) { RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, _probe_expr_ctxs[i])); @@ -43,11 +43,6 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) for (size_t i = 0; i < _other_join_conjuncts.size(); i++) { RETURN_IF_ERROR(p._other_join_conjuncts[i]->clone(state, _other_join_conjuncts[i])); } - // Since the comparison of null values is meaningless, null aware left anti join should not output null - // when the build side is not empty. - if (!_shared_state->build_blocks->empty() && p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - _probe_ignore_null = true; - } _construct_mutable_join_block(); _probe_column_disguise_null.reserve(_probe_expr_ctxs.size()); _probe_arena_memory_usage = @@ -189,6 +184,42 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc local_state.init_for_probe(state); SCOPED_TIMER(local_state._probe_timer); if (local_state._shared_state->short_circuit_for_probe) { + /// If `_short_circuit_for_probe` is true, this indicates no rows + /// match the join condition, and this is 'mark join', so we need to create a column as mark + /// with all rows set to 0. + if (_is_mark_join) { + auto block_rows = local_state._probe_block.rows(); + if (block_rows == 0) { + if (local_state._probe_eos) { + source_state = SourceState::FINISHED; + } + return Status::OK(); + } + + vectorized::Block temp_block; + //get probe side output column + for (int i = 0; i < _left_output_slot_flags.size(); ++i) { + if (_left_output_slot_flags[i]) { + temp_block.insert(local_state._probe_block.get_by_position(i)); + } + } + auto mark_column = vectorized::ColumnUInt8::create(block_rows, 0); + temp_block.insert( + {std::move(mark_column), std::make_shared(), ""}); + + { + SCOPED_TIMER(local_state._join_filter_timer); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block( + local_state._conjuncts, &temp_block, temp_block.columns())); + } + + RETURN_IF_ERROR(local_state._build_output_block(&temp_block, output_block, false)); + temp_block.clear(); + local_state._probe_block.clear_column_data( + _child_x->row_desc().num_materialized_slots()); + local_state.reached_limit(output_block, source_state); + return Status::OK(); + } // If we use a short-circuit strategy, should return empty block directly. source_state = SourceState::FINISHED; return Status::OK(); @@ -241,7 +272,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc *local_state._shared_state->hash_table_variants, *local_state._process_hashtable_ctx_variants, vectorized::make_bool_variant(local_state._need_null_map_for_probe), - vectorized::make_bool_variant(local_state._probe_ignore_null)); + vectorized::make_bool_variant(local_state._shared_state->probe_ignore_null)); }); } else if (local_state._probe_eos) { if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) { @@ -299,7 +330,8 @@ bool HashJoinProbeOperatorX::need_more_input_data(RuntimeState* state) const { auto& local_state = state->get_local_state(id())->cast(); return (local_state._probe_block.rows() == 0 || local_state._probe_index == local_state._probe_block.rows()) && - !local_state._probe_eos && !local_state._shared_state->short_circuit_for_probe; + !local_state._probe_eos && + (!local_state._shared_state->short_circuit_for_probe || _is_mark_join); } Status HashJoinProbeOperatorX::_do_evaluate(vectorized::Block& block, diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index bac12a004a..5c451b9fde 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -85,7 +85,6 @@ private: bool _need_null_map_for_probe = false; bool _has_set_need_null_map_for_probe = false; - bool _probe_ignore_null = false; std::unique_ptr _probe_context; vectorized::ColumnUInt8::MutablePtr _null_map_column; // for cases when a probe row matches more than batch size build rows. diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 5f8bccea2b..ecac7c94dd 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -59,6 +59,8 @@ Status NestedLoopJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& RETURN_IF_ERROR(p._join_conjuncts[i]->clone(state, _join_conjuncts[i])); } _construct_mutable_join_block(); + + _loop_join_timer = ADD_TIMER(profile(), "LoopGenerateJoin"); return Status::OK(); } @@ -349,7 +351,7 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableB DCHECK_LE(_left_block_start_pos + _left_side_process_count, _child_block->rows()); for (int j = _left_block_start_pos; j < _left_block_start_pos + _left_side_process_count; ++j) { - mark_data.emplace_back(IsSemi != _cur_probe_row_visited_flags[j]); + mark_data.emplace_back(IsSemi == _cur_probe_row_visited_flags[j]); } for (size_t i = 0; i < p._num_probe_side_columns; ++i) { const vectorized::ColumnWithTypeAndName src_column = @@ -562,6 +564,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block set_build_side_flag, set_probe_side_flag>( state, join_op_variants); }; + SCOPED_TIMER(local_state._loop_join_timer); RETURN_IF_ERROR(std::visit( func, local_state._shared_state->join_op_variants, vectorized::make_bool_variant(_match_all_build || _is_right_semi_anti), diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index e754ae585f..8ad39451b0 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -198,6 +198,8 @@ private: std::stack _probe_offset_stack; uint64_t _output_null_idx_build_side = 0; vectorized::VExprContextSPtrs _join_conjuncts; + + RuntimeProfile::Counter* _loop_join_timer; }; class NestedLoopJoinProbeOperatorX final diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 928f05c272..ef9f54bff7 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -194,7 +194,7 @@ template class ScanLocalState : public ScanLocalStateBase { ENABLE_FACTORY_CREATOR(ScanLocalState); ScanLocalState(RuntimeState* state, OperatorXBase* parent); - virtual ~ScanLocalState() = default; + ~ScanLocalState() override = default; Status init(RuntimeState* state, LocalStateInfo& info) override; Status open(RuntimeState* state) override; diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index f374e43b00..9c720c3ddc 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -530,6 +530,7 @@ struct HashJoinSharedState : public JoinSharedState { size_t build_exprs_size = 0; std::shared_ptr> build_blocks = std::make_shared>(); + bool probe_ignore_null = false; }; class HashJoinDependency final : public WriteDependency { diff --git a/be/src/vec/exprs/vcase_expr.cpp b/be/src/vec/exprs/vcase_expr.cpp index 5c9cdd3c15..bd93cb8226 100644 --- a/be/src/vec/exprs/vcase_expr.cpp +++ b/be/src/vec/exprs/vcase_expr.cpp @@ -79,8 +79,13 @@ Status VCaseExpr::prepare(RuntimeState* state, const RowDescriptor& desc, VExprC Status VCaseExpr::open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) { - RETURN_IF_ERROR(VExpr::open(state, context, scope)); + for (int i = 0; i < _children.size(); ++i) { + RETURN_IF_ERROR(_children[i]->open(state, context, scope)); + } RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function)); + if (scope == FunctionContext::FRAGMENT_LOCAL) { + RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); + } return Status::OK(); } diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp index ef4dd08115..361833120b 100644 --- a/be/src/vec/exprs/vcast_expr.cpp +++ b/be/src/vec/exprs/vcast_expr.cpp @@ -81,8 +81,13 @@ doris::Status VCastExpr::prepare(doris::RuntimeState* state, const doris::RowDes doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) { - RETURN_IF_ERROR(VExpr::open(state, context, scope)); + for (int i = 0; i < _children.size(); ++i) { + RETURN_IF_ERROR(_children[i]->open(state, context, scope)); + } RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function)); + if (scope == FunctionContext::FRAGMENT_LOCAL) { + RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); + } return Status::OK(); } diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp b/be/src/vec/exprs/vectorized_fn_call.cpp index 1843d732a2..0d6a0d9d10 100644 --- a/be/src/vec/exprs/vectorized_fn_call.cpp +++ b/be/src/vec/exprs/vectorized_fn_call.cpp @@ -121,8 +121,13 @@ Status VectorizedFnCall::prepare(RuntimeState* state, const RowDescriptor& desc, Status VectorizedFnCall::open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) { - RETURN_IF_ERROR(VExpr::open(state, context, scope)); + for (int i = 0; i < _children.size(); ++i) { + RETURN_IF_ERROR(_children[i]->open(state, context, scope)); + } RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function)); + if (scope == FunctionContext::FRAGMENT_LOCAL) { + RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); + } return Status::OK(); } diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp index a19dafe439..b2e3b39c5a 100644 --- a/be/src/vec/exprs/vexpr.cpp +++ b/be/src/vec/exprs/vexpr.cpp @@ -201,6 +201,9 @@ Status VExpr::open(RuntimeState* state, VExprContext* context, for (int i = 0; i < _children.size(); ++i) { RETURN_IF_ERROR(_children[i]->open(state, context, scope)); } + if (scope == FunctionContext::FRAGMENT_LOCAL) { + RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); + } return Status::OK(); } @@ -466,6 +469,7 @@ Status VExpr::get_const_col(VExprContext* context, } if (_constant_col != nullptr) { + DCHECK(column_wrapper != nullptr); *column_wrapper = _constant_col; return Status::OK(); } @@ -479,7 +483,10 @@ Status VExpr::get_const_col(VExprContext* context, DCHECK(result != -1); const auto& column = block.get_by_position(result).column; _constant_col = std::make_shared(column); - *column_wrapper = _constant_col; + if (column_wrapper != nullptr) { + *column_wrapper = _constant_col; + } + return Status::OK(); } diff --git a/be/src/vec/exprs/vin_predicate.cpp b/be/src/vec/exprs/vin_predicate.cpp index 83e6bd6320..55e999af47 100644 --- a/be/src/vec/exprs/vin_predicate.cpp +++ b/be/src/vec/exprs/vin_predicate.cpp @@ -78,8 +78,13 @@ Status VInPredicate::prepare(RuntimeState* state, const RowDescriptor& desc, Status VInPredicate::open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) { - RETURN_IF_ERROR(VExpr::open(state, context, scope)); + for (int i = 0; i < _children.size(); ++i) { + RETURN_IF_ERROR(_children[i]->open(state, context, scope)); + } RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function)); + if (scope == FunctionContext::FRAGMENT_LOCAL) { + RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); + } return Status::OK(); } diff --git a/be/src/vec/exprs/vmatch_predicate.cpp b/be/src/vec/exprs/vmatch_predicate.cpp index 2a21aba578..cca6389fff 100644 --- a/be/src/vec/exprs/vmatch_predicate.cpp +++ b/be/src/vec/exprs/vmatch_predicate.cpp @@ -92,11 +92,16 @@ Status VMatchPredicate::prepare(RuntimeState* state, const RowDescriptor& desc, Status VMatchPredicate::open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) { - RETURN_IF_ERROR(VExpr::open(state, context, scope)); + for (int i = 0; i < _children.size(); ++i) { + RETURN_IF_ERROR(_children[i]->open(state, context, scope)); + } RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function)); if (scope == FunctionContext::THREAD_LOCAL || scope == FunctionContext::FRAGMENT_LOCAL) { context->fn_context(_fn_context_index)->set_function_state(scope, _inverted_index_ctx); } + if (scope == FunctionContext::FRAGMENT_LOCAL) { + RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); + } return Status::OK(); } diff --git a/be/src/vec/functions/function_fake.h b/be/src/vec/functions/function_fake.h index 456672e7a2..0efca39c9a 100644 --- a/be/src/vec/functions/function_fake.h +++ b/be/src/vec/functions/function_fake.h @@ -54,6 +54,7 @@ public: } bool use_default_implementation_for_nulls() const override { return true; } + bool use_default_implementation_for_constants() const override { return false; } Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, size_t result, size_t input_rows_count) override {