[Bug](nljoin) Keep compatibility for nljoin (#14182)

This commit is contained in:
Gabriel
2022-11-11 15:54:55 +08:00
committed by GitHub
parent a162dab40a
commit fe2944d56d
5 changed files with 59 additions and 38 deletions

View File

@ -1031,9 +1031,9 @@ Status HashJoinNode::prepare(RuntimeState* state) {
// _vother_join_conjuncts are evaluated in the context of the rows produced by this node
if (_vother_join_conjunct_ptr) {
RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->prepare(state, _intermediate_row_desc));
RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->prepare(state, *_intermediate_row_desc));
}
RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _intermediate_row_desc));
RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc));
// right table data types
_right_table_data_types = VectorizedUtils::get_data_types(child(1)->row_desc());

View File

@ -28,7 +28,9 @@ namespace doris::vectorized {
VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
_join_op(tnode.__isset.hash_join_node ? tnode.hash_join_node.join_op
: tnode.nested_loop_join_node.join_op),
: (tnode.__isset.nested_loop_join_node
? tnode.nested_loop_join_node.join_op
: TJoinOp::CROSS_JOIN)),
_have_other_join_conjunct(tnode.__isset.hash_join_node
? tnode.hash_join_node.__isset.vother_join_conjunct
: false),
@ -42,22 +44,24 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des
_join_op == TJoinOp::LEFT_SEMI_JOIN)),
_is_right_semi_anti(_join_op == TJoinOp::RIGHT_ANTI_JOIN ||
_join_op == TJoinOp::RIGHT_SEMI_JOIN),
_is_outer_join(_match_all_build || _match_all_probe),
_output_row_desc(
descs,
{tnode.__isset.hash_join_node ? tnode.hash_join_node.voutput_tuple_id
: tnode.nested_loop_join_node.voutput_tuple_id},
{false}),
_intermediate_row_desc(
descs,
tnode.__isset.hash_join_node
? tnode.hash_join_node.vintermediate_tuple_id_list
: tnode.nested_loop_join_node.vintermediate_tuple_id_list,
std::vector<bool>(
tnode.__isset.hash_join_node
? tnode.hash_join_node.vintermediate_tuple_id_list.size()
: tnode.nested_loop_join_node.vintermediate_tuple_id_list
.size())) {}
_is_outer_join(_match_all_build || _match_all_probe) {
if (tnode.__isset.hash_join_node) {
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}));
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.hash_join_node.vintermediate_tuple_id_list,
std::vector<bool>(tnode.hash_join_node.vintermediate_tuple_id_list.size())));
} else if (tnode.__isset.nested_loop_join_node) {
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false}));
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list,
std::vector<bool>(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size())));
} else {
// Iff BE has been upgraded and FE has not yet, we should keep origin logics for CROSS JOIN.
DCHECK_EQ(_join_op, TJoinOp::CROSS_JOIN);
}
}
Status VJoinNodeBase::close(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJoinNodeBase::close");
@ -66,7 +70,7 @@ Status VJoinNodeBase::close(RuntimeState* state) {
}
void VJoinNodeBase::_construct_mutable_join_block() {
const auto& mutable_block_desc = _intermediate_row_desc;
const auto& mutable_block_desc = intermediate_row_desc();
for (const auto tuple_desc : mutable_block_desc.tuple_descriptors()) {
for (const auto slot_desc : tuple_desc->slots()) {
auto type_ptr = slot_desc->get_data_type_ptr();
@ -78,9 +82,9 @@ void VJoinNodeBase::_construct_mutable_join_block() {
Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block) {
auto is_mem_reuse = output_block->mem_reuse();
MutableBlock mutable_block =
is_mem_reuse ? MutableBlock(output_block)
: MutableBlock(VectorizedUtils::create_empty_columnswithtypename(
_output_row_desc));
is_mem_reuse
? MutableBlock(output_block)
: MutableBlock(VectorizedUtils::create_empty_columnswithtypename(row_desc()));
auto rows = origin_block->rows();
// TODO: After FE plan support same nullable of output expr and origin block and mutable column
// we should replace `insert_column_datas` by `insert_range_from`
@ -97,13 +101,13 @@ Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_blo
if (rows != 0) {
auto& mutable_columns = mutable_block.mutable_columns();
if (_output_expr_ctxs.empty()) {
DCHECK(mutable_columns.size() == _output_row_desc.num_materialized_slots());
DCHECK(mutable_columns.size() == row_desc().num_materialized_slots());
for (int i = 0; i < mutable_columns.size(); ++i) {
insert_column_datas(mutable_columns[i], *origin_block->get_by_position(i).column,
rows);
}
} else {
DCHECK(mutable_columns.size() == _output_row_desc.num_materialized_slots());
DCHECK(mutable_columns.size() == row_desc().num_materialized_slots());
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(_output_expr_ctxs[i]->execute(origin_block, &result_column_id));
@ -123,13 +127,15 @@ Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_blo
}
Status VJoinNodeBase::init(const TPlanNode& tnode, RuntimeState* state) {
const auto& output_exprs = tnode.__isset.hash_join_node
? tnode.hash_join_node.srcExprList
: tnode.nested_loop_join_node.srcExprList;
for (const auto& expr : output_exprs) {
VExprContext* ctx = nullptr;
RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, expr, &ctx));
_output_expr_ctxs.push_back(ctx);
if (tnode.__isset.hash_join_node || tnode.__isset.nested_loop_join_node) {
const auto& output_exprs = tnode.__isset.hash_join_node
? tnode.hash_join_node.srcExprList
: tnode.nested_loop_join_node.srcExprList;
for (const auto& expr : output_exprs) {
VExprContext* ctx = nullptr;
RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, expr, &ctx));
_output_expr_ctxs.push_back(ctx);
}
}
return ExecNode::init(tnode, state);
}

View File

@ -47,9 +47,11 @@ public:
virtual Status open(RuntimeState* state) override;
const RowDescriptor& row_desc() const override { return _output_row_desc; }
virtual const RowDescriptor& row_desc() const override { return *_output_row_desc; }
const RowDescriptor& intermediate_row_desc() const override { return _intermediate_row_desc; }
virtual const RowDescriptor& intermediate_row_desc() const override {
return *_intermediate_row_desc;
}
virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
@ -80,8 +82,8 @@ protected:
const bool _is_right_semi_anti;
const bool _is_outer_join;
RowDescriptor _output_row_desc;
RowDescriptor _intermediate_row_desc;
std::unique_ptr<RowDescriptor> _output_row_desc;
std::unique_ptr<RowDescriptor> _intermediate_row_desc;
// output expr
std::vector<VExprContext*> _output_expr_ctxs;

View File

@ -39,7 +39,8 @@ VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool, const TPlanNode& tnod
_cur_probe_row_visited_flags(false),
_matched_rows_done(false),
_left_block_pos(0),
_left_side_eos(false) {}
_left_side_eos(false),
_old_version_flag(!tnode.__isset.nested_loop_join_node) {}
Status VNestedLoopJoinNode::prepare(RuntimeState* state) {
DCHECK(_join_op == TJoinOp::CROSS_JOIN || _join_op == TJoinOp::INNER_JOIN ||
@ -65,7 +66,7 @@ Status VNestedLoopJoinNode::prepare(RuntimeState* state) {
_num_probe_side_columns = child(0)->row_desc().num_materialized_slots();
_num_build_side_columns = child(1)->row_desc().num_materialized_slots();
RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _intermediate_row_desc));
RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc));
_construct_mutable_join_block();
return Status::OK();
}

View File

@ -48,6 +48,16 @@ public:
void debug_string(int indentation_level, std::stringstream* out) const override;
const RowDescriptor& intermediate_row_desc() const override {
return _old_version_flag ? _row_descriptor : *_intermediate_row_desc;
}
const RowDescriptor& row_desc() const override {
return _old_version_flag
? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor)
: *_output_row_desc;
}
private:
Status _materialize_build_side(RuntimeState* state) override;
@ -89,6 +99,8 @@ private:
int _left_block_pos; // current scan pos in _left_block
bool _left_side_eos; // if true, left child has no more rows to process
bool _old_version_flag;
};
} // namespace doris::vectorized