// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "vec/exec/join/vhash_join_node.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include // IWYU pragma: no_include #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/object_pool.h" #include "exec/exec_node.h" #include "exprs/bloom_filter_func.h" #include "exprs/runtime_filter.h" #include "exprs/runtime_filter_slots.h" #include "gutil/strings/substitute.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/query_context.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "util/defer_op.h" #include "util/telemetry/telemetry.h" #include "util/uid_util.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_vector.h" #include "vec/common/assert_cast.h" #include "vec/common/hash_table/hash_map.h" #include "vec/common/uint128.h" #include "vec/core/column_with_type_and_name.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_number.h" #include "vec/exec/join/join_op.h" #include "vec/exec/join/process_hash_table_probe.h" #include "vec/exec/join/vjoin_node_base.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" #include "vec/runtime/shared_hash_table_controller.h" #include "vec/utils/template_helpers.hpp" #include "vec/utils/util.hpp" namespace doris::vectorized { static constexpr int PREFETCH_STEP = HashJoinNode::PREFETCH_STEP; template Status HashJoinNode::_extract_join_column( Block&, COW::mutable_ptr>&, std::vector>&, std::vector> const&); template Status HashJoinNode::_extract_join_column( Block&, COW::mutable_ptr>&, std::vector>&, std::vector> const&); using ProfileCounter = RuntimeProfile::Counter; template struct Overload : Callables... { using Callables::operator()...; }; template Overload(Callables&&... callables) -> Overload; template struct ProcessHashTableBuild { ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& build_raw_ptrs, HashJoinNode* join_node, int batch_size, uint8_t offset) : _rows(rows), _skip_rows(0), _acquired_block(acquired_block), _build_raw_ptrs(build_raw_ptrs), _join_node(join_node), _batch_size(batch_size), _offset(offset), _build_side_compute_hash_timer(join_node->_build_side_compute_hash_timer) {} template Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key) { using KeyGetter = typename HashTableContext::State; using Mapped = typename HashTableContext::Mapped; Defer defer {[&]() { int64_t bucket_size = hash_table_ctx.hash_table.get_buffer_size_in_cells(); int64_t filled_bucket_size = hash_table_ctx.hash_table.size(); int64_t bucket_bytes = hash_table_ctx.hash_table.get_buffer_size_in_bytes(); COUNTER_SET(_join_node->_hash_table_memory_usage, bucket_bytes); COUNTER_SET(_join_node->_build_buckets_counter, bucket_size); COUNTER_SET(_join_node->_build_buckets_fill_counter, filled_bucket_size); auto hash_table_buckets = hash_table_ctx.hash_table.get_buffer_sizes_in_cells(); std::string hash_table_buckets_info; for (auto bucket_count : hash_table_buckets) { hash_table_buckets_info += std::to_string(bucket_count) + ", "; } _join_node->add_hash_buckets_info(hash_table_buckets_info); auto hash_table_sizes = hash_table_ctx.hash_table.sizes(); hash_table_buckets_info.clear(); for (auto table_size : hash_table_sizes) { hash_table_buckets_info += std::to_string(table_size) + ", "; } _join_node->add_hash_buckets_filled_info(hash_table_buckets_info); }}; KeyGetter key_getter(_build_raw_ptrs, _join_node->_build_key_sz, nullptr); SCOPED_TIMER(_join_node->_build_table_insert_timer); hash_table_ctx.hash_table.reset_resize_timer(); // only not build_unique, we need expanse hash table before insert data // 1. There are fewer duplicate keys, reducing the number of resize hash tables // can improve performance to a certain extent, about 2%-5% // 2. There are many duplicate keys, and the hash table filled bucket is far less than // the hash table build bucket, which may waste a lot of memory. // TODO, use the NDV expansion of the key column in the optimizer statistics if (!_join_node->_build_unique) { RETURN_IF_CATCH_EXCEPTION(hash_table_ctx.hash_table.expanse_for_add_elem( std::min(_rows, config::hash_table_pre_expanse_max_rows))); } vector& inserted_rows = _join_node->_inserted_rows[&_acquired_block]; bool has_runtime_filter = !_join_node->_runtime_filter_descs.empty(); if (has_runtime_filter) { inserted_rows.reserve(_batch_size); } _build_side_hash_values.resize(_rows); auto& arena = *(_join_node->_arena); auto old_build_arena_memory = arena.size(); { SCOPED_TIMER(_build_side_compute_hash_timer); if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { auto old_keys_memory = hash_table_ctx.keys_memory_usage; hash_table_ctx.serialize_keys(_build_raw_ptrs, _rows); key_getter.set_serialized_keys(hash_table_ctx.keys.data()); _join_node->_build_arena_memory_usage->add(hash_table_ctx.keys_memory_usage - old_keys_memory); } for (size_t k = 0; k < _rows; ++k) { if constexpr (ignore_null) { if ((*null_map)[k]) { continue; } } // If apply short circuit strategy for null value (e.g. join operator is // NULL_AWARE_LEFT_ANTI_JOIN), we build hash table until we meet a null value. if constexpr (short_circuit_for_null) { if ((*null_map)[k]) { DCHECK(has_null_key); *has_null_key = true; return Status::OK(); } } if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< KeyGetter>::value) { _build_side_hash_values[k] = hash_table_ctx.hash_table.hash(key_getter.get_key_holder(k, arena).key); } else { _build_side_hash_values[k] = hash_table_ctx.hash_table.hash(key_getter.get_key_holder(k, arena)); } } } bool build_unique = _join_node->_build_unique; #define EMPLACE_IMPL(stmt) \ for (size_t k = 0; k < _rows; ++k) { \ if constexpr (ignore_null) { \ if ((*null_map)[k]) { \ continue; \ } \ } \ auto emplace_result = key_getter.emplace_key(hash_table_ctx.hash_table, \ _build_side_hash_values[k], k, arena); \ if (k + PREFETCH_STEP < _rows) { \ key_getter.template prefetch_by_hash( \ hash_table_ctx.hash_table, _build_side_hash_values[k + PREFETCH_STEP]); \ } \ stmt; \ } if (has_runtime_filter && build_unique) { EMPLACE_IMPL( if (emplace_result.is_inserted()) { new (&emplace_result.get_mapped()) Mapped({k, _offset}); inserted_rows.push_back(k); _join_node->_build_bf_cardinality++; } else { _skip_rows++; }); } else if (has_runtime_filter && !build_unique) { EMPLACE_IMPL( if (emplace_result.is_inserted()) { new (&emplace_result.get_mapped()) Mapped({k, _offset}); inserted_rows.push_back(k); _join_node->_build_bf_cardinality++; } else { emplace_result.get_mapped().insert({k, _offset}, *(_join_node->_arena)); inserted_rows.push_back(k); }); } else if (!has_runtime_filter && build_unique) { EMPLACE_IMPL( if (emplace_result.is_inserted()) { new (&emplace_result.get_mapped()) Mapped({k, _offset}); } else { _skip_rows++; }); } else { EMPLACE_IMPL( if (emplace_result.is_inserted()) { new (&emplace_result.get_mapped()) Mapped({k, _offset}); } else { emplace_result.get_mapped().insert({k, _offset}, *(_join_node->_arena)); }); } #undef EMPLACE_IMPL _join_node->_build_arena_memory_usage->add(arena.size() - old_build_arena_memory); COUNTER_UPDATE(_join_node->_build_table_expanse_timer, hash_table_ctx.hash_table.get_resize_timer_value()); COUNTER_UPDATE(_join_node->_build_table_convert_timer, hash_table_ctx.hash_table.get_convert_timer_value()); return Status::OK(); } private: const int _rows; int _skip_rows; Block& _acquired_block; ColumnRawPtrs& _build_raw_ptrs; HashJoinNode* _join_node; int _batch_size; uint8_t _offset; ProfileCounter* _build_side_compute_hash_timer; std::vector _build_side_hash_values; }; template struct ProcessRuntimeFilterBuild { ProcessRuntimeFilterBuild(HashJoinNode* join_node) : _join_node(join_node) {} Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx) { if (_join_node->_runtime_filter_descs.empty()) { return Status::OK(); } _join_node->_runtime_filter_slots = _join_node->_pool->add( new VRuntimeFilterSlots(_join_node->_probe_expr_ctxs, _join_node->_build_expr_ctxs, _join_node->_runtime_filter_descs)); RETURN_IF_ERROR(_join_node->_runtime_filter_slots->init( state, hash_table_ctx.hash_table.get_size(), _join_node->_build_bf_cardinality)); if (!_join_node->_runtime_filter_slots->empty() && !_join_node->_inserted_rows.empty()) { { SCOPED_TIMER(_join_node->_push_compute_timer); _join_node->_runtime_filter_slots->insert(_join_node->_inserted_rows); } } { SCOPED_TIMER(_join_node->_push_down_timer); _join_node->_runtime_filter_slots->publish(); } return Status::OK(); } private: HashJoinNode* _join_node; }; HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : VJoinNodeBase(pool, tnode, descs), _is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join && tnode.hash_join_node.is_broadcast_join), _hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids ? tnode.hash_join_node.hash_output_slot_ids : std::vector {}), _build_block_idx(0), _build_side_mem_used(0), _build_side_last_mem_used(0) { _runtime_filter_descs = tnode.runtime_filters; _arena = std::make_shared(); _hash_table_variants = std::make_shared(); _process_hashtable_ctx_variants = std::make_unique(); _build_blocks.reset(new std::vector()); // avoid vector expand change block address. // one block can store 4g data, _build_blocks can store 128*4g data. // if probe data bigger than 512g, runtime filter maybe will core dump when insert data. _build_blocks->reserve(_MAX_BUILD_BLOCK_COUNT); } Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(VJoinNodeBase::init(tnode, state)); DCHECK(tnode.__isset.hash_join_node); const bool build_stores_null = _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN || _join_op == TJoinOp::RIGHT_ANTI_JOIN; const bool probe_dispose_null = _match_all_probe || _build_unique || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN; const std::vector& eq_join_conjuncts = tnode.hash_join_node.eq_join_conjuncts; std::vector probe_not_ignore_null(eq_join_conjuncts.size()); size_t conjuncts_index = 0; for (const auto& eq_join_conjunct : eq_join_conjuncts) { VExprContext* ctx = nullptr; RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjunct.left, &ctx)); _probe_expr_ctxs.push_back(ctx); RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjunct.right, &ctx)); _build_expr_ctxs.push_back(ctx); bool null_aware = eq_join_conjunct.__isset.opcode && eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL; _is_null_safe_eq_join.push_back(null_aware); // if is null aware, build join column and probe join column both need dispose null value _store_null_in_hash_table.emplace_back( null_aware || (_build_expr_ctxs.back()->root()->is_nullable() && build_stores_null)); probe_not_ignore_null[conjuncts_index] = null_aware || (_probe_expr_ctxs.back()->root()->is_nullable() && probe_dispose_null); conjuncts_index++; } for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) { _probe_ignore_null |= !probe_not_ignore_null[i]; } _probe_column_disguise_null.reserve(eq_join_conjuncts.size()); if (tnode.hash_join_node.__isset.vother_join_conjunct) { _vother_join_conjunct_ptr.reset(new VExprContext*); RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, tnode.hash_join_node.vother_join_conjunct, _vother_join_conjunct_ptr.get())); // If LEFT SEMI JOIN/LEFT ANTI JOIN with not equal predicate, // build table should not be deduplicated. DCHECK(!_build_unique); DCHECK(_have_other_join_conjunct); } _runtime_filters.resize(_runtime_filter_descs.size()); for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter( RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options(), -1, _probe_expr_ctxs.size() == 1)); RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter( _runtime_filter_descs[i].filter_id, &_runtime_filters[i])); } // init left/right output slots flags, only column of slot_id in _hash_output_slot_ids need // insert to output block of hash join. // _left_output_slots_flags : column of left table need to output set flag = true // _rgiht_output_slots_flags : column of right table need to output set flag = true // if _hash_output_slot_ids is empty, means all column of left/right table need to output. auto init_output_slots_flags = [this](auto& tuple_descs, auto& output_slot_flags) { for (const auto& tuple_desc : tuple_descs) { for (const auto& slot_desc : tuple_desc->slots()) { output_slot_flags.emplace_back( _hash_output_slot_ids.empty() || std::find(_hash_output_slot_ids.begin(), _hash_output_slot_ids.end(), slot_desc->id()) != _hash_output_slot_ids.end()); } } }; init_output_slots_flags(child(0)->row_desc().tuple_descriptors(), _left_output_slot_flags); init_output_slots_flags(child(1)->row_desc().tuple_descriptors(), _right_output_slot_flags); return Status::OK(); } Status HashJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VJoinNodeBase::prepare(state)); auto* memory_usage = runtime_profile()->create_child("PeakMemoryUsage", true, true); runtime_profile()->add_child(memory_usage, false, nullptr); _build_blocks_memory_usage = ADD_COUNTER(memory_usage, "BuildBlocks", TUnit::BYTES); _hash_table_memory_usage = ADD_COUNTER(memory_usage, "HashTable", TUnit::BYTES); _build_arena_memory_usage = memory_usage->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES); _probe_arena_memory_usage = memory_usage->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES); // Build phase _build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true); runtime_profile()->add_child(_build_phase_profile, false, nullptr); _build_timer = ADD_TIMER(_build_phase_profile, "BuildTime"); _build_table_timer = ADD_TIMER(_build_phase_profile, "BuildTableTime"); _build_side_merge_block_timer = ADD_TIMER(_build_phase_profile, "BuildSideMergeBlockTime"); _build_table_insert_timer = ADD_TIMER(_build_phase_profile, "BuildTableInsertTime"); _build_expr_call_timer = ADD_TIMER(_build_phase_profile, "BuildExprCallTime"); _build_table_expanse_timer = ADD_TIMER(_build_phase_profile, "BuildTableExpanseTime"); _build_table_convert_timer = ADD_TIMER(_build_phase_profile, "BuildTableConvertToPartitionedTime"); _build_rows_counter = ADD_COUNTER(_build_phase_profile, "BuildRows", TUnit::UNIT); _build_side_compute_hash_timer = ADD_TIMER(_build_phase_profile, "BuildSideHashComputingTime"); _build_runtime_filter_timer = ADD_TIMER(_build_phase_profile, "BuildRuntimeFilterTime"); // Probe phase auto probe_phase_profile = runtime_profile()->create_child("ProbePhase", true, true); _probe_timer = ADD_TIMER(probe_phase_profile, "ProbeTime"); _probe_next_timer = ADD_TIMER(probe_phase_profile, "ProbeFindNextTime"); _probe_expr_call_timer = ADD_TIMER(probe_phase_profile, "ProbeExprCallTime"); _probe_rows_counter = ADD_COUNTER(probe_phase_profile, "ProbeRows", TUnit::UNIT); _search_hashtable_timer = ADD_TIMER(probe_phase_profile, "ProbeWhenSearchHashTableTime"); _build_side_output_timer = ADD_TIMER(probe_phase_profile, "ProbeWhenBuildSideOutputTime"); _probe_side_output_timer = ADD_TIMER(probe_phase_profile, "ProbeWhenProbeSideOutputTime"); _join_filter_timer = ADD_TIMER(runtime_profile(), "JoinFilterTimer"); _push_down_timer = ADD_TIMER(runtime_profile(), "PublishRuntimeFilterTime"); _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime"); _build_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT); _build_buckets_fill_counter = ADD_COUNTER(runtime_profile(), "FilledBuckets", TUnit::UNIT); _should_build_hash_table = true; if (_is_broadcast_join) { runtime_profile()->add_info_string("BroadcastJoin", "true"); if (state->enable_share_hash_table_for_broadcast_join()) { runtime_profile()->add_info_string("ShareHashTableEnabled", "true"); _shared_hashtable_controller = state->get_query_ctx()->get_shared_hash_table_controller(); _shared_hash_table_context = _shared_hashtable_controller->get_context(id()); _should_build_hash_table = _shared_hashtable_controller->should_build_hash_table( state->fragment_instance_id(), id()); } else { runtime_profile()->add_info_string("ShareHashTableEnabled", "false"); } } RETURN_IF_ERROR(VExpr::prepare(_build_expr_ctxs, state, child(1)->row_desc())); RETURN_IF_ERROR(VExpr::prepare(_probe_expr_ctxs, state, child(0)->row_desc())); // _vother_join_conjuncts are evaluated in the context of the rows produced by this node if (_vother_join_conjunct_ptr) { RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->prepare(state, *_intermediate_row_desc)); } RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc)); // right table data types _right_table_data_types = VectorizedUtils::get_data_types(child(1)->row_desc()); _left_table_data_types = VectorizedUtils::get_data_types(child(0)->row_desc()); // Hash Table Init _hash_table_init(state); _construct_mutable_join_block(); return Status::OK(); } void HashJoinNode::add_hash_buckets_info(const std::string& info) { runtime_profile()->add_info_string("HashTableBuckets", info); } void HashJoinNode::add_hash_buckets_filled_info(const std::string& info) { runtime_profile()->add_info_string("HashTableFilledBuckets", info); } Status HashJoinNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); } return VJoinNodeBase::close(state); } bool HashJoinNode::need_more_input_data() const { return (_probe_block.rows() == 0 || _probe_index == _probe_block.rows()) && !_probe_eos && !_short_circuit_for_null_in_probe_side; } void HashJoinNode::prepare_for_next() { _probe_index = 0; _prepare_probe_block(); } Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) { SCOPED_TIMER(_probe_timer); if (_short_circuit_for_null_in_probe_side) { // If we use a short-circuit strategy for null value in build side (e.g. if join operator is // NULL_AWARE_LEFT_ANTI_JOIN), we should return empty block directly. *eos = true; return Status::OK(); } _join_block.clear_column_data(); MutableBlock mutable_join_block(&_join_block); Block temp_block; Status st; if (_probe_index < _probe_block.rows()) { DCHECK(_has_set_need_null_map_for_probe); RETURN_IF_CATCH_EXCEPTION({ std::visit( [&](auto&& arg, auto&& process_hashtable_ctx, auto need_null_map_for_probe, auto ignore_null) { using HashTableProbeType = std::decay_t; if constexpr (!std::is_same_v) { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { if (_have_other_join_conjunct) { st = process_hashtable_ctx .template do_process_with_other_join_conjuncts< need_null_map_for_probe, ignore_null>( arg, need_null_map_for_probe ? &_null_map_column->get_data() : nullptr, mutable_join_block, &temp_block, _probe_block.rows(), _is_mark_join); } else { st = process_hashtable_ctx.template do_process< need_null_map_for_probe, ignore_null>( arg, need_null_map_for_probe ? &_null_map_column->get_data() : nullptr, mutable_join_block, &temp_block, _probe_block.rows(), _is_mark_join); } } else { LOG(FATAL) << "FATAL: uninited hash table"; } } else { LOG(FATAL) << "FATAL: uninited hash table probe"; } }, *_hash_table_variants, *_process_hashtable_ctx_variants, make_bool_variant(_need_null_map_for_probe), make_bool_variant(_probe_ignore_null)); }); } else if (_probe_eos) { if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) { std::visit( [&](auto&& arg, auto&& process_hashtable_ctx) { using HashTableProbeType = std::decay_t; if constexpr (!std::is_same_v) { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { st = process_hashtable_ctx.process_data_in_hashtable( arg, mutable_join_block, &temp_block, eos); } else { LOG(FATAL) << "FATAL: uninited hash table"; } } else { LOG(FATAL) << "FATAL: uninited hash table probe"; } }, *_hash_table_variants, *_process_hashtable_ctx_variants); } else { *eos = true; return Status::OK(); } } else { return Status::OK(); } if (!st) { return st; } if (_is_outer_join) { _add_tuple_is_null_column(&temp_block); } auto output_rows = temp_block.rows(); DCHECK(output_rows <= state->batch_size()); { SCOPED_TIMER(_join_filter_timer); RETURN_IF_ERROR( VExprContext::filter_block(_vconjunct_ctx_ptr, &temp_block, temp_block.columns())); } RETURN_IF_ERROR(_build_output_block(&temp_block, output_block)); _reset_tuple_is_null_column(); reached_limit(output_block, eos); return Status::OK(); } Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block* input_block, bool eos) { _probe_eos = eos; if (input_block->rows() > 0) { COUNTER_UPDATE(_probe_rows_counter, input_block->rows()); int probe_expr_ctxs_sz = _probe_expr_ctxs.size(); _probe_columns.resize(probe_expr_ctxs_sz); std::vector res_col_ids(probe_expr_ctxs_sz); RETURN_IF_ERROR( _do_evaluate(*input_block, _probe_expr_ctxs, *_probe_expr_call_timer, res_col_ids)); if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) { _probe_column_convert_to_null = _convert_block_to_null(*input_block); } // TODO: Now we are not sure whether a column is nullable only by ExecNode's `row_desc` // so we have to initialize this flag by the first probe block. if (!_has_set_need_null_map_for_probe) { _has_set_need_null_map_for_probe = true; _need_null_map_for_probe = _need_probe_null_map(*input_block, res_col_ids); } if (_need_null_map_for_probe) { if (_null_map_column == nullptr) { _null_map_column = ColumnUInt8::create(); } _null_map_column->get_data().assign(input_block->rows(), (uint8_t)0); } RETURN_IF_ERROR(_extract_join_column(*input_block, _null_map_column, _probe_columns, res_col_ids)); if (&_probe_block != input_block) { input_block->swap(_probe_block); } } return Status::OK(); } Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "HashJoinNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); if (_short_circuit_for_null_in_probe_side) { // If we use a short-circuit strategy for null value in build side (e.g. if join operator is // NULL_AWARE_LEFT_ANTI_JOIN), we should return empty block directly. *eos = true; return Status::OK(); } if (_join_op == TJoinOp::RIGHT_OUTER_JOIN) { const auto hash_table_empty = std::visit( Overload {[&](std::monostate&) -> bool { LOG(FATAL) << "FATAL: uninited hash table"; __builtin_unreachable(); }, [&](auto&& arg) -> bool { return arg.hash_table.size() == 0; }}, *_hash_table_variants); if (hash_table_empty) { *eos = true; return Status::OK(); } } while (need_more_input_data()) { prepare_for_next(); SCOPED_TIMER(_probe_next_timer); RETURN_IF_ERROR_AND_CHECK_SPAN( child(0)->get_next_after_projects( state, &_probe_block, &_probe_eos, std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & ExecNode::get_next, _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)), child(0)->get_next_span(), _probe_eos); RETURN_IF_ERROR(push(state, &_probe_block, _probe_eos)); } return pull(state, output_block, eos); } void HashJoinNode::_add_tuple_is_null_column(Block* block) { DCHECK(_is_outer_join); auto p0 = _tuple_is_null_left_flag_column->assume_mutable(); auto p1 = _tuple_is_null_right_flag_column->assume_mutable(); auto& left_null_map = reinterpret_cast(*p0); auto& right_null_map = reinterpret_cast(*p1); auto left_size = left_null_map.size(); auto right_size = right_null_map.size(); if (left_size == 0) { DCHECK_EQ(right_size, block->rows()); left_null_map.get_data().resize_fill(right_size, 0); } if (right_size == 0) { DCHECK_EQ(left_size, block->rows()); right_null_map.get_data().resize_fill(left_size, 0); } block->insert( {std::move(p0), std::make_shared(), "left_tuples_is_null"}); block->insert( {std::move(p1), std::make_shared(), "right_tuples_is_null"}); } void HashJoinNode::_prepare_probe_block() { // clear_column_data of _probe_block if (!_probe_column_disguise_null.empty()) { for (int i = 0; i < _probe_column_disguise_null.size(); ++i) { auto column_to_erase = _probe_column_disguise_null[i]; _probe_block.erase(column_to_erase - i); } _probe_column_disguise_null.clear(); } // remove add nullmap of probe columns for (auto index : _probe_column_convert_to_null) { auto& column_type = _probe_block.safe_get_by_position(index); DCHECK(column_type.column->is_nullable() || is_column_const(*(column_type.column.get()))); DCHECK(column_type.type->is_nullable()); column_type.column = remove_nullable(column_type.column); column_type.type = remove_nullable(column_type.type); } release_block_memory(_probe_block); } Status HashJoinNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(VJoinNodeBase::open(state)); RETURN_IF_CANCELLED(state); return Status::OK(); } Status HashJoinNode::alloc_resource(doris::RuntimeState* state) { RETURN_IF_ERROR(VJoinNodeBase::alloc_resource(state)); SCOPED_TIMER(_runtime_profile->total_time_counter()); for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { if (auto bf = _runtime_filters[i]->get_bloomfilter()) { RETURN_IF_ERROR(bf->init_with_fixed_length()); } } RETURN_IF_ERROR(VExpr::open(_build_expr_ctxs, state)); RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state)); if (_vother_join_conjunct_ptr) { RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->open(state)); } return Status::OK(); } void HashJoinNode::release_resource(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::release_resources"); VExpr::close(_build_expr_ctxs, state); VExpr::close(_probe_expr_ctxs, state); if (_vother_join_conjunct_ptr) { (*_vother_join_conjunct_ptr)->close(state); } _release_mem(); VJoinNodeBase::release_resource(state); } Status HashJoinNode::_materialize_build_side(RuntimeState* state) { RETURN_IF_ERROR(child(1)->open(state)); if (_should_build_hash_table) { bool eos = false; Block block; // If eos or have already met a null value using short-circuit strategy, we do not need to pull // data from data. while (!eos && !_short_circuit_for_null_in_probe_side) { block.clear_column_data(); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR_AND_CHECK_SPAN( child(1)->get_next_after_projects( state, &block, &eos, std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & ExecNode::get_next, _children[1], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)), child(1)->get_next_span(), eos); RETURN_IF_ERROR(sink(state, &block, eos)); } RETURN_IF_ERROR(child(1)->close(state)); } else { RETURN_IF_ERROR(child(1)->close(state)); RETURN_IF_ERROR(sink(state, nullptr, true)); } return Status::OK(); } Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { SCOPED_TIMER(_build_timer); // make one block for each 4 gigabytes constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; if (_short_circuit_for_null_in_probe_side) { // TODO: if _short_circuit_for_null_in_probe_side is true we should finish current pipeline task. DCHECK(state->enable_pipeline_exec()); return Status::OK(); } if (_should_build_hash_table) { // If eos or have already met a null value using short-circuit strategy, we do not need to pull // data from probe side. _build_side_mem_used += in_block->allocated_bytes(); if (in_block->rows() != 0) { SCOPED_TIMER(_build_side_merge_block_timer); RETURN_IF_ERROR(_build_side_mutable_block.merge(*in_block)); } if (UNLIKELY(_build_side_mem_used - _build_side_last_mem_used > BUILD_BLOCK_MAX_SIZE)) { if (_build_blocks->size() == _MAX_BUILD_BLOCK_COUNT) { return Status::NotSupported( strings::Substitute("data size of right table in hash join > $0", BUILD_BLOCK_MAX_SIZE * _MAX_BUILD_BLOCK_COUNT)); } _build_blocks->emplace_back(_build_side_mutable_block.to_block()); COUNTER_UPDATE(_build_blocks_memory_usage, (*_build_blocks)[_build_block_idx].bytes()); // TODO:: Rethink may we should do the process after we receive all build blocks ? // which is better. RETURN_IF_ERROR(_process_build_block(state, (*_build_blocks)[_build_block_idx], _build_block_idx)); _build_side_mutable_block = MutableBlock(); ++_build_block_idx; _build_side_last_mem_used = _build_side_mem_used; } } if (_should_build_hash_table && eos) { // For pipeline engine, children should be closed once this pipeline task is finished. if (!_build_side_mutable_block.empty()) { if (_build_blocks->size() == _MAX_BUILD_BLOCK_COUNT) { return Status::NotSupported( strings::Substitute("data size of right table in hash join > $0", BUILD_BLOCK_MAX_SIZE * _MAX_BUILD_BLOCK_COUNT)); } _build_blocks->emplace_back(_build_side_mutable_block.to_block()); COUNTER_UPDATE(_build_blocks_memory_usage, (*_build_blocks)[_build_block_idx].bytes()); RETURN_IF_ERROR(_process_build_block(state, (*_build_blocks)[_build_block_idx], _build_block_idx)); } auto ret = std::visit(Overload {[&](std::monostate&) -> Status { LOG(FATAL) << "FATAL: uninited hash table"; __builtin_unreachable(); }, [&](auto&& arg) -> Status { using HashTableCtxType = std::decay_t; ProcessRuntimeFilterBuild runtime_filter_build_process(this); return runtime_filter_build_process(state, arg); }}, *_hash_table_variants); if (!ret.ok()) { if (_shared_hashtable_controller) { _shared_hash_table_context->status = ret; _shared_hashtable_controller->signal(id()); } return ret; } if (_shared_hashtable_controller) { _shared_hash_table_context->status = Status::OK(); // arena will be shared with other instances. _shared_hash_table_context->arena = _arena; _shared_hash_table_context->blocks = _build_blocks; _shared_hash_table_context->hash_table_variants = _hash_table_variants; _shared_hash_table_context->short_circuit_for_null_in_probe_side = _short_circuit_for_null_in_probe_side; if (_runtime_filter_slots) { _runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context); } _shared_hashtable_controller->signal(id()); } } else if (!_should_build_hash_table && (eos || !state->enable_pipeline_exec())) { DCHECK(_shared_hashtable_controller != nullptr); DCHECK(_shared_hash_table_context != nullptr); auto wait_timer = ADD_TIMER(_build_phase_profile, "WaitForSharedHashTableTime"); SCOPED_TIMER(wait_timer); RETURN_IF_ERROR( _shared_hashtable_controller->wait_for_signal(state, _shared_hash_table_context)); _build_phase_profile->add_info_string( "SharedHashTableFrom", print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id()))); _short_circuit_for_null_in_probe_side = _shared_hash_table_context->short_circuit_for_null_in_probe_side; _hash_table_variants = std::static_pointer_cast( _shared_hash_table_context->hash_table_variants); _build_blocks = _shared_hash_table_context->blocks; if (!_shared_hash_table_context->runtime_filters.empty()) { auto ret = std::visit( Overload {[&](std::monostate&) -> Status { LOG(FATAL) << "FATAL: uninited hash table"; __builtin_unreachable(); }, [&](auto&& arg) -> Status { if (_runtime_filter_descs.empty()) { return Status::OK(); } _runtime_filter_slots = _pool->add(new VRuntimeFilterSlots( _probe_expr_ctxs, _build_expr_ctxs, _runtime_filter_descs)); RETURN_IF_ERROR(_runtime_filter_slots->init( state, arg.hash_table.get_size(), 0)); RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context( _shared_hash_table_context)); _runtime_filter_slots->publish(); return Status::OK(); }}, *_hash_table_variants); RETURN_IF_ERROR(ret); } } if (eos || (!_should_build_hash_table && !state->enable_pipeline_exec())) { _process_hashtable_ctx_variants_init(state); } // Since the comparison of null values is meaningless, null aware left anti join should not output null // when the build side is not empty. if (!_build_blocks->empty() && _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { _probe_ignore_null = true; } return Status::OK(); } void HashJoinNode::debug_string(int indentation_level, std::stringstream* out) const { *out << string(indentation_level * 2, ' '); *out << "HashJoin(need_more_input_data=" << (need_more_input_data() ? "true" : "false") << " _probe_block.rows()=" << _probe_block.rows() << " _probe_index=" << _probe_index << " _probe_eos=" << _probe_eos << " _short_circuit_for_null_in_probe_side=" << _short_circuit_for_null_in_probe_side; *out << ")\n children=("; ExecNode::debug_string(indentation_level, out); *out << ")"; } template Status HashJoinNode::_extract_join_column(Block& block, ColumnUInt8::MutablePtr& null_map, ColumnRawPtrs& raw_ptrs, const std::vector& res_col_ids) { DCHECK_EQ(_build_expr_ctxs.size(), _probe_expr_ctxs.size()); for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) { if (_is_null_safe_eq_join[i]) { raw_ptrs[i] = block.get_by_position(res_col_ids[i]).column.get(); } else { auto column = block.get_by_position(res_col_ids[i]).column.get(); if (auto* nullable = check_and_get_column(*column)) { auto& col_nested = nullable->get_nested_column(); auto& col_nullmap = nullable->get_null_map_data(); if constexpr (!BuildSide) { DCHECK(null_map != nullptr); VectorizedUtils::update_null_map(null_map->get_data(), col_nullmap); } if (_store_null_in_hash_table[i]) { raw_ptrs[i] = nullable; } else { if constexpr (BuildSide) { DCHECK(null_map != nullptr); VectorizedUtils::update_null_map(null_map->get_data(), col_nullmap); } raw_ptrs[i] = &col_nested; } } else { raw_ptrs[i] = column; } } } return Status::OK(); } Status HashJoinNode::_do_evaluate(Block& block, std::vector& exprs, RuntimeProfile::Counter& expr_call_timer, std::vector& res_col_ids) { for (size_t i = 0; i < exprs.size(); ++i) { int result_col_id = -1; // execute build column { SCOPED_TIMER(&expr_call_timer); RETURN_IF_ERROR(exprs[i]->execute(&block, &result_col_id)); } // TODO: opt the column is const block.get_by_position(result_col_id).column = block.get_by_position(result_col_id).column->convert_to_full_column_if_const(); res_col_ids[i] = result_col_id; } return Status::OK(); } bool HashJoinNode::_need_probe_null_map(Block& block, const std::vector& res_col_ids) { DCHECK_EQ(_build_expr_ctxs.size(), _probe_expr_ctxs.size()); for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) { if (!_is_null_safe_eq_join[i]) { auto column = block.get_by_position(res_col_ids[i]).column.get(); if (check_and_get_column(*column)) { return true; } } } return false; } void HashJoinNode::_set_build_ignore_flag(Block& block, const std::vector& res_col_ids) { DCHECK_EQ(_build_expr_ctxs.size(), _probe_expr_ctxs.size()); for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) { if (!_is_null_safe_eq_join[i]) { auto column = block.get_by_position(res_col_ids[i]).column.get(); if (check_and_get_column(*column)) { _build_side_ignore_null |= (_join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_store_null_in_hash_table[i]); } } } } Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uint8_t offset) { SCOPED_TIMER(_build_table_timer); size_t rows = block.rows(); if (UNLIKELY(rows == 0)) { return Status::OK(); } COUNTER_UPDATE(_build_rows_counter, rows); ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size()); ColumnUInt8::MutablePtr null_map_val; std::vector res_col_ids(_build_expr_ctxs.size()); RETURN_IF_ERROR(_do_evaluate(block, _build_expr_ctxs, *_build_expr_call_timer, res_col_ids)); if (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) { _convert_block_to_null(block); } // TODO: Now we are not sure whether a column is nullable only by ExecNode's `row_desc` // so we have to initialize this flag by the first build block. if (!_has_set_need_null_map_for_build) { _has_set_need_null_map_for_build = true; _set_build_ignore_flag(block, res_col_ids); } if (_short_circuit_for_null_in_build_side || _build_side_ignore_null) { null_map_val = ColumnUInt8::create(); null_map_val->get_data().assign(rows, (uint8_t)0); } // Get the key column that needs to be built Status st = _extract_join_column(block, null_map_val, raw_ptrs, res_col_ids); st = std::visit( Overload { [&](std::monostate& arg, auto has_null_value, auto short_circuit_for_null_in_build_side) -> Status { LOG(FATAL) << "FATAL: uninited hash table"; __builtin_unreachable(); return Status::OK(); }, [&](auto&& arg, auto has_null_value, auto short_circuit_for_null_in_build_side) -> Status { using HashTableCtxType = std::decay_t; ProcessHashTableBuild hash_table_build_process( rows, block, raw_ptrs, this, state->batch_size(), offset); return hash_table_build_process .template run( arg, has_null_value || short_circuit_for_null_in_build_side ? &null_map_val->get_data() : nullptr, &_short_circuit_for_null_in_probe_side); }}, *_hash_table_variants, make_bool_variant(_build_side_ignore_null), make_bool_variant(_short_circuit_for_null_in_build_side)); return st; } void HashJoinNode::_hash_table_init(RuntimeState* state) { std::visit( [&](auto&& join_op_variants, auto have_other_join_conjunct) { using JoinOpType = std::decay_t; using RowRefListType = std::conditional_t< have_other_join_conjunct, RowRefListWithFlags, std::conditional_t>; _probe_row_match_iter.emplace>(); _outer_join_pull_visited_iter.emplace>(); if (_build_expr_ctxs.size() == 1 && !_store_null_in_hash_table[0]) { // Single column optimization switch (_build_expr_ctxs[0]->root()->result_type()) { case TYPE_BOOLEAN: case TYPE_TINYINT: _hash_table_variants->emplace>(); break; case TYPE_SMALLINT: _hash_table_variants->emplace>(); break; case TYPE_INT: case TYPE_FLOAT: case TYPE_DATEV2: _hash_table_variants->emplace>(); break; case TYPE_BIGINT: case TYPE_DOUBLE: case TYPE_DATETIME: case TYPE_DATE: case TYPE_DATETIMEV2: _hash_table_variants->emplace>(); break; case TYPE_LARGEINT: case TYPE_DECIMALV2: case TYPE_DECIMAL32: case TYPE_DECIMAL64: case TYPE_DECIMAL128I: { DataTypePtr& type_ptr = _build_expr_ctxs[0]->root()->data_type(); TypeIndex idx = _build_expr_ctxs[0]->root()->is_nullable() ? assert_cast(*type_ptr) .get_nested_type() ->get_type_id() : type_ptr->get_type_id(); WhichDataType which(idx); if (which.is_decimal32()) { _hash_table_variants->emplace>(); } else if (which.is_decimal64()) { _hash_table_variants->emplace>(); } else { _hash_table_variants->emplace>(); } break; } default: _hash_table_variants->emplace>(); } return; } bool use_fixed_key = true; bool has_null = false; int key_byte_size = 0; _probe_key_sz.resize(_probe_expr_ctxs.size()); _build_key_sz.resize(_build_expr_ctxs.size()); for (int i = 0; i < _build_expr_ctxs.size(); ++i) { const auto vexpr = _build_expr_ctxs[i]->root(); const auto& data_type = vexpr->data_type(); if (!data_type->have_maximum_size_of_value()) { use_fixed_key = false; break; } auto is_null = data_type->is_nullable(); has_null |= is_null; _build_key_sz[i] = data_type->get_maximum_size_of_value_in_memory() - (is_null ? 1 : 0); _probe_key_sz[i] = _build_key_sz[i]; key_byte_size += _probe_key_sz[i]; } if (std::tuple_size>::value + key_byte_size > sizeof(UInt256)) { use_fixed_key = false; } if (use_fixed_key) { // TODO: may we should support uint256 in the future if (has_null) { if (std::tuple_size>::value + key_byte_size <= sizeof(UInt64)) { _hash_table_variants ->emplace>(); } else if (std::tuple_size>::value + key_byte_size <= sizeof(UInt128)) { _hash_table_variants ->emplace>(); } else { _hash_table_variants ->emplace>(); } } else { if (key_byte_size <= sizeof(UInt64)) { _hash_table_variants ->emplace>(); } else if (key_byte_size <= sizeof(UInt128)) { _hash_table_variants->emplace< I128FixedKeyHashTableContext>(); } else { _hash_table_variants->emplace< I256FixedKeyHashTableContext>(); } } } else { _hash_table_variants->emplace>(); } }, _join_op_variants, make_bool_variant(_have_other_join_conjunct)); DCHECK(!std::holds_alternative(*_hash_table_variants)); std::visit(Overload {[&](std::monostate& arg) { LOG(FATAL) << "FATAL: uninited hash table"; __builtin_unreachable(); }, [&](auto&& arg) { arg.hash_table.set_partitioned_threshold( state->partitioned_hash_join_rows_threshold()); }}, *_hash_table_variants); } void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) { std::visit( [&](auto&& join_op_variants) { using JoinOpType = std::decay_t; _process_hashtable_ctx_variants->emplace>( this, state->batch_size()); }, _join_op_variants); } std::vector HashJoinNode::_convert_block_to_null(Block& block) { std::vector results; for (int i = 0; i < block.columns(); ++i) { if (auto& column_type = block.safe_get_by_position(i); !column_type.type->is_nullable()) { DCHECK(!column_type.column->is_nullable()); column_type.column = make_nullable(column_type.column); column_type.type = make_nullable(column_type.type); results.emplace_back(i); } } return results; } HashJoinNode::~HashJoinNode() { if (_shared_hashtable_controller && _should_build_hash_table) { // signal at here is abnormal _shared_hashtable_controller->signal(id(), Status::Cancelled("signaled in destructor")); } } void HashJoinNode::_release_mem() { _arena = nullptr; _hash_table_variants = nullptr; _process_hashtable_ctx_variants = nullptr; _null_map_column = nullptr; _tuple_is_null_left_flag_column = nullptr; _tuple_is_null_right_flag_column = nullptr; _shared_hash_table_context = nullptr; _probe_block.clear(); } } // namespace doris::vectorized