[fix](vectorized) Fix core dump of mutable block different of block (#8280)
This commit is contained in:
@ -835,6 +835,15 @@ doris::Tuple* Block::deep_copy_tuple(const doris::TupleDescriptor& desc, MemPool
|
||||
return dst;
|
||||
}
|
||||
|
||||
MutableBlock::MutableBlock(const std::vector<TupleDescriptor *>& tuple_descs) {
|
||||
for (auto tuple_desc : tuple_descs) {
|
||||
for (auto slot_desc : tuple_desc->slots()) {
|
||||
_data_types.emplace_back(slot_desc->get_data_type_ptr());
|
||||
_columns.emplace_back(_data_types.back()->create_column());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
size_t MutableBlock::rows() const {
|
||||
for (const auto& column : _columns) {
|
||||
if (column) {
|
||||
|
||||
@ -306,11 +306,7 @@ public:
|
||||
MutableBlock() = default;
|
||||
~MutableBlock() = default;
|
||||
|
||||
MutableBlock(DataTypes data_types) : _columns(data_types.size()), _data_types(std::move(data_types)) {
|
||||
for (int i = 0; i < _data_types.size(); ++i) {
|
||||
_columns[i] = _data_types[i]->create_column();
|
||||
}
|
||||
}
|
||||
MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs);
|
||||
|
||||
MutableBlock(Block* block)
|
||||
: _columns(block->mutate_columns()), _data_types(block->get_data_types()) {}
|
||||
@ -348,6 +344,7 @@ public:
|
||||
}
|
||||
}
|
||||
} else {
|
||||
DCHECK_EQ(_columns.size(), block.columns());
|
||||
for (int i = 0; i < _columns.size(); ++i) {
|
||||
if (!_data_types[i]->equals(*block.get_by_position(i).type)) {
|
||||
DCHECK(_data_types[i]->is_nullable());
|
||||
|
||||
@ -187,11 +187,14 @@ struct ProcessHashTableProbe {
|
||||
int current_offset = 0;
|
||||
|
||||
_items_counts.resize(_probe_rows);
|
||||
_build_block_offsets.resize(_batch_size);
|
||||
_build_block_rows.resize(_batch_size);
|
||||
memset(_items_counts.data(), 0, sizeof(uint32_t) * _probe_rows);
|
||||
|
||||
constexpr auto is_right_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN ||
|
||||
constexpr auto need_to_set_visited = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN ||
|
||||
JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN ||
|
||||
JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN;
|
||||
JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN ||
|
||||
JoinOpType::value == TJoinOp::FULL_OUTER_JOIN;
|
||||
|
||||
constexpr auto is_right_semi_anti_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN ||
|
||||
JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN;
|
||||
@ -230,7 +233,7 @@ struct ProcessHashTableProbe {
|
||||
// TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance.
|
||||
// We should rethink whether to use this iterator mode in the future. Now just opt the one row case
|
||||
if (mapped.get_row_count() == 1) {
|
||||
if constexpr (is_right_join)
|
||||
if constexpr (need_to_set_visited)
|
||||
mapped.visited = true;
|
||||
|
||||
if constexpr (!is_right_semi_anti_join) {
|
||||
@ -254,7 +257,7 @@ struct ProcessHashTableProbe {
|
||||
}
|
||||
++current_offset;
|
||||
}
|
||||
if constexpr (is_right_join)
|
||||
if constexpr (need_to_set_visited)
|
||||
it->visited = true;
|
||||
}
|
||||
}
|
||||
@ -294,7 +297,7 @@ struct ProcessHashTableProbe {
|
||||
mcol[i + right_col_idx]->insert_from(column, _build_block_rows[j]);
|
||||
}
|
||||
} else {
|
||||
auto &column = *_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
|
||||
auto& column = *_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
|
||||
mcol[i + right_col_idx]->insert_from(column, _build_block_rows[j]);
|
||||
}
|
||||
}
|
||||
@ -369,14 +372,14 @@ struct ProcessHashTableProbe {
|
||||
for (int i = 0; i < current_offset - origin_offset - 1; ++i) {
|
||||
same_to_prev.emplace_back(true);
|
||||
}
|
||||
} else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
|
||||
} else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
|
||||
JoinOpType::value == TJoinOp::FULL_OUTER_JOIN ||
|
||||
JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) {
|
||||
++current_offset;
|
||||
same_to_prev.emplace_back(false);
|
||||
visited_map.emplace_back(nullptr);
|
||||
// only full outer / left outer need insert the data of right table
|
||||
if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
|
||||
if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
|
||||
JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) {
|
||||
for (size_t j = 0; j < right_col_len; ++j) {
|
||||
DCHECK(mcol[j + right_col_idx]->is_nullable());
|
||||
@ -414,7 +417,7 @@ struct ProcessHashTableProbe {
|
||||
(*_join_node->_vother_join_conjunct_ptr)->execute(output_block, &result_column_id);
|
||||
|
||||
auto column = output_block->get_by_position(result_column_id).column;
|
||||
if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
|
||||
if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
|
||||
JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) {
|
||||
auto new_filter_column = ColumnVector<UInt8>::create();
|
||||
auto& filter_map = new_filter_column->get_data();
|
||||
@ -492,7 +495,7 @@ struct ProcessHashTableProbe {
|
||||
|
||||
output_block->get_by_position(result_column_id).column =
|
||||
std::move(new_filter_column);
|
||||
} else if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN ||
|
||||
} else if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN ||
|
||||
JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN) {
|
||||
for (int i = 0; i < column->size(); ++i) {
|
||||
DCHECK(visited_map[i]);
|
||||
@ -502,7 +505,7 @@ struct ProcessHashTableProbe {
|
||||
// inner join do nothing
|
||||
}
|
||||
|
||||
if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN ||
|
||||
if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN ||
|
||||
JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN) {
|
||||
output_block->clear();
|
||||
} else {
|
||||
@ -526,6 +529,7 @@ struct ProcessHashTableProbe {
|
||||
|
||||
auto& iter = hash_table_ctx.iter;
|
||||
auto block_size = 0;
|
||||
|
||||
auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) {
|
||||
block_size++;
|
||||
for (size_t j = 0; j < right_col_len; ++j) {
|
||||
@ -548,9 +552,9 @@ struct ProcessHashTableProbe {
|
||||
}
|
||||
|
||||
// right outer join / full join need insert data of left table
|
||||
if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
|
||||
JoinOpType::value == TJoinOp::FULL_OUTER_JOIN ||
|
||||
JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN ||
|
||||
if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
|
||||
JoinOpType::value == TJoinOp::FULL_OUTER_JOIN ||
|
||||
JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN ||
|
||||
JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) {
|
||||
for (int i = 0; i < right_col_idx; ++i) {
|
||||
for (int j = 0; j < block_size; ++j) {
|
||||
@ -880,7 +884,7 @@ Status HashJoinNode::open(RuntimeState* state) {
|
||||
Status HashJoinNode::_hash_table_build(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(child(1)->open(state));
|
||||
SCOPED_TIMER(_build_timer);
|
||||
MutableBlock mutable_block(_right_table_data_types);
|
||||
MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors());
|
||||
|
||||
uint8_t index = 0;
|
||||
int64_t last_mem_used = 0;
|
||||
@ -907,7 +911,7 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], index));
|
||||
RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");
|
||||
|
||||
mutable_block = MutableBlock(_right_table_data_types);
|
||||
mutable_block = MutableBlock();
|
||||
++index;
|
||||
last_mem_used = _mem_used;
|
||||
}
|
||||
@ -942,7 +946,10 @@ Status HashJoinNode::extract_build_join_column(Block& block, NullMap& null_map,
|
||||
RETURN_IF_ERROR(_build_expr_ctxs[i]->execute(&block, &result_col_id));
|
||||
}
|
||||
|
||||
// MutableBlock assume no const column in build block
|
||||
// TODO: opt the column is const
|
||||
block.get_by_position(result_col_id).column =
|
||||
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
|
||||
|
||||
if (_is_null_safe_eq_join[i]) {
|
||||
raw_ptrs[i] = block.get_by_position(result_col_id).column.get();
|
||||
} else {
|
||||
|
||||
@ -228,13 +228,13 @@ void VSetOperationNode::hash_table_init() {
|
||||
Status VSetOperationNode::hash_table_build(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(child(0)->open(state));
|
||||
Block block;
|
||||
MutableBlock mutable_block(_left_table_data_types);
|
||||
MutableBlock mutable_block(child(0)->row_desc().tuple_descriptors());
|
||||
|
||||
uint8_t index = 0;
|
||||
int64_t last_mem_used = 0;
|
||||
bool eos = false;
|
||||
while (!eos) {
|
||||
block.clear();
|
||||
block.clear_column_data();
|
||||
SCOPED_TIMER(_build_timer);
|
||||
RETURN_IF_CANCELLED(state);
|
||||
RETURN_IF_ERROR(child(0)->get_next(state, &block, &eos));
|
||||
@ -254,7 +254,7 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) {
|
||||
// which is better.
|
||||
RETURN_IF_ERROR(process_build_block(_build_blocks[index], index));
|
||||
RETURN_IF_LIMIT_EXCEEDED(state, "Set Operation Node, while constructing the hash table.");
|
||||
mutable_block = MutableBlock(_left_table_data_types);
|
||||
mutable_block = MutableBlock();
|
||||
++index;
|
||||
last_mem_used = _mem_used;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user