[Improvement](join) Support nested loop outer join (#13965)

This commit is contained in:
Gabriel
2022-11-10 19:50:46 +08:00
committed by GitHub
parent 6c13126e5c
commit 1ef85ae1f2
25 changed files with 1903 additions and 1351 deletions

View File

@ -17,10 +17,9 @@
#include "vec/exec/join/vhash_join_node.h"
#include "exprs/runtime_filter_slots.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gutil/strings/substitute.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_filter_mgr.h"
#include "util/defer_op.h"
#include "vec/data_types/data_type_number.h"
#include "vec/exprs/vexpr.h"
@ -867,32 +866,15 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
}
HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
_join_op(tnode.hash_join_node.join_op),
: VJoinNodeBase(pool, tnode, descs),
_mem_used(0),
_have_other_join_conjunct(tnode.hash_join_node.__isset.vother_join_conjunct),
_match_all_probe(_join_op == TJoinOp::LEFT_OUTER_JOIN ||
_join_op == TJoinOp::FULL_OUTER_JOIN),
_match_all_build(_join_op == TJoinOp::RIGHT_OUTER_JOIN ||
_join_op == TJoinOp::FULL_OUTER_JOIN),
_build_unique(!_have_other_join_conjunct &&
(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
_join_op == TJoinOp::LEFT_ANTI_JOIN ||
_join_op == TJoinOp::LEFT_SEMI_JOIN)),
_is_right_semi_anti(_join_op == TJoinOp::RIGHT_ANTI_JOIN ||
_join_op == TJoinOp::RIGHT_SEMI_JOIN),
_is_outer_join(_match_all_build || _match_all_probe),
_is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join &&
tnode.hash_join_node.is_broadcast_join),
_hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids
? tnode.hash_join_node.hash_output_slot_ids
: std::vector<SlotId> {}),
_intermediate_row_desc(
descs, tnode.hash_join_node.vintermediate_tuple_id_list,
std::vector<bool>(tnode.hash_join_node.vintermediate_tuple_id_list.size())),
_output_row_desc(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}) {
: std::vector<SlotId> {}) {
_runtime_filter_descs = tnode.runtime_filters;
init_join_op();
_init_join_op();
// avoid vector expand change block address.
// one block can store 4g data, _build_blocks can store 128*4g data.
@ -902,22 +884,8 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
HashJoinNode::~HashJoinNode() = default;
void HashJoinNode::init_join_op() {
switch (_join_op) {
#define M(NAME) \
case TJoinOp::NAME: \
_join_op_variants.emplace<std::integral_constant<TJoinOp::type, TJoinOp::NAME>>(); \
break;
APPLY_FOR_JOINOP_VARIANTS(M);
#undef M
default:
//do nothing
break;
}
}
Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
RETURN_IF_ERROR(VJoinNodeBase::init(tnode, state));
DCHECK(tnode.__isset.hash_join_node);
const bool build_stores_null = _join_op == TJoinOp::RIGHT_OUTER_JOIN ||
@ -968,13 +936,6 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
DCHECK(_have_other_join_conjunct);
}
const auto& output_exprs = tnode.hash_join_node.srcExprList;
for (const auto& expr : output_exprs) {
VExprContext* ctx = nullptr;
RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, expr, &ctx));
_output_expr_ctxs.push_back(ctx);
}
_runtime_filters.resize(_runtime_filter_descs.size());
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
@ -1010,25 +971,8 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
}
Status HashJoinNode::prepare(RuntimeState* state) {
DCHECK(_runtime_profile.get() != nullptr);
_rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT);
_rows_returned_rate = runtime_profile()->add_derived_counter(
ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second, _rows_returned_counter,
runtime_profile()->total_time_counter()),
"");
_mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name(),
_runtime_profile.get());
if (_vconjunct_ctx_ptr) {
RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, _intermediate_row_desc));
}
for (int i = 0; i < _children.size(); ++i) {
RETURN_IF_ERROR(_children[i]->prepare(state));
}
RETURN_IF_ERROR(VJoinNodeBase::prepare(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
// Build phase
auto build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true);
runtime_profile()->add_child(build_phase_profile, false, nullptr);
@ -1069,7 +1013,6 @@ Status HashJoinNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->prepare(state, _intermediate_row_desc));
}
RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _intermediate_row_desc));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, _intermediate_row_desc));
// right table data types
_right_table_data_types = VectorizedUtils::get_data_types(child(1)->row_desc());
@ -1093,14 +1036,12 @@ Status HashJoinNode::close(RuntimeState* state) {
_shared_hashtable_controller->wait_for_closable(state, id());
}
START_AND_SCOPE_SPAN(state->get_tracer(), span, "ashJoinNode::close");
START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::close");
VExpr::close(_build_expr_ctxs, state);
VExpr::close(_probe_expr_ctxs, state);
if (_vother_join_conjunct_ptr) (*_vother_join_conjunct_ptr)->close(state);
VExpr::close(_output_expr_ctxs, state);
return ExecNode::close(state);
return VJoinNodeBase::close(state);
}
Status HashJoinNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
@ -1291,16 +1232,6 @@ void HashJoinNode::_prepare_probe_block() {
release_block_memory(_probe_block);
}
void HashJoinNode::_construct_mutable_join_block() {
const auto& mutable_block_desc = _intermediate_row_desc;
for (const auto tuple_desc : mutable_block_desc.tuple_descriptors()) {
for (const auto slot_desc : tuple_desc->slots()) {
auto type_ptr = slot_desc->get_data_type_ptr();
_join_block.insert({type_ptr->create_column(), type_ptr, slot_desc->col_name()});
}
}
}
Status HashJoinNode::open(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::open");
SCOPED_TIMER(_runtime_profile->total_time_counter());
@ -1309,43 +1240,18 @@ Status HashJoinNode::open(RuntimeState* state) {
RETURN_IF_ERROR(bf->init_with_fixed_length());
}
}
RETURN_IF_ERROR(ExecNode::open(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(VExpr::open(_build_expr_ctxs, state));
RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
if (_vother_join_conjunct_ptr) {
RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->open(state));
}
RETURN_IF_ERROR(VExpr::open(_output_expr_ctxs, state));
std::promise<Status> thread_status;
std::thread([this, state, thread_status_p = &thread_status,
parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
OpentelemetryScope scope {parent_span};
this->_probe_side_open_thread(state, thread_status_p);
}).detach();
// Open the probe-side child so that it may perform any initialisation in parallel.
// Don't exit even if we see an error, we still need to wait for the build thread
// to finish.
// ISSUE-1247, check open_status after buildThread execute.
// If this return first, build thread will use 'thread_status'
// which is already destructor and then coredump.
Status status = _hash_table_build(state);
RETURN_IF_ERROR(thread_status.get_future().get());
return status;
RETURN_IF_ERROR(VJoinNodeBase::open(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
RETURN_IF_CANCELLED(state);
return Status::OK();
}
void HashJoinNode::_probe_side_open_thread(RuntimeState* state, std::promise<Status>* status) {
START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::_hash_table_build_thread");
SCOPED_ATTACH_TASK(state);
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
status->set_value(child(0)->open(state));
}
Status HashJoinNode::_hash_table_build(RuntimeState* state) {
Status HashJoinNode::_materialize_build_side(RuntimeState* state) {
RETURN_IF_ERROR(child(1)->open(state));
SCOPED_TIMER(_build_timer);
MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors());
@ -1744,53 +1650,6 @@ std::vector<uint16_t> HashJoinNode::_convert_block_to_null(Block& block) {
return results;
}
Status HashJoinNode::_build_output_block(Block* origin_block, Block* output_block) {
auto is_mem_reuse = output_block->mem_reuse();
MutableBlock mutable_block =
is_mem_reuse ? MutableBlock(output_block)
: MutableBlock(VectorizedUtils::create_empty_columnswithtypename(
_output_row_desc));
auto rows = origin_block->rows();
// TODO: After FE plan support same nullable of output expr and origin block and mutable column
// we should replace `insert_column_datas` by `insert_range_from`
auto insert_column_datas = [](auto& to, const auto& from, size_t rows) {
if (to->is_nullable() && !from.is_nullable()) {
auto& null_column = reinterpret_cast<ColumnNullable&>(*to);
null_column.get_nested_column().insert_range_from(from, 0, rows);
null_column.get_null_map_column().get_data().resize_fill(rows, 0);
} else {
to->insert_range_from(from, 0, rows);
}
};
if (rows != 0) {
auto& mutable_columns = mutable_block.mutable_columns();
if (_output_expr_ctxs.empty()) {
DCHECK(mutable_columns.size() == _output_row_desc.num_materialized_slots());
for (int i = 0; i < mutable_columns.size(); ++i) {
insert_column_datas(mutable_columns[i], *origin_block->get_by_position(i).column,
rows);
}
} else {
DCHECK(mutable_columns.size() == _output_row_desc.num_materialized_slots());
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(_output_expr_ctxs[i]->execute(origin_block, &result_column_id));
auto column_ptr = origin_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
insert_column_datas(mutable_columns[i], *column_ptr, rows);
}
}
if (!is_mem_reuse) {
output_block->swap(mutable_block.to_block());
}
DCHECK(output_block->rows() == rows);
}
return Status::OK();
}
void HashJoinNode::_add_tuple_is_null_column(Block* block) {
if (_is_outer_join) {
auto p0 = _tuple_is_null_left_flag_column->assume_mutable();