[Improvement](join) optimize join probing phase (#13357)

This commit is contained in:
Gabriel
2022-10-18 12:37:17 +08:00
committed by GitHub
parent 18f2db6064
commit cd3450bd9d
18 changed files with 722 additions and 575 deletions

View File

@ -428,7 +428,13 @@ public:
*/
virtual Ptr replicate(const Offsets& offsets) const = 0;
virtual void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const {
/** Copies each element according offsets parameter.
* (i-th element should be copied counts[i] times.)
* If `begin` and `count_sz` specified, it means elements in range [`begin`, `begin` + `count_sz`) will be replicated.
* If `count_sz` is -1, `begin` must be 0.
*/
virtual void replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin = 0, int count_sz = -1) const {
LOG(FATAL) << "not support";
};

View File

@ -528,17 +528,19 @@ ColumnPtr ColumnArray::replicate(const IColumn::Offsets& replicate_offsets) cons
return replicate_generic(replicate_offsets);
}
void ColumnArray::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const {
size_t col_size = size();
void ColumnArray::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
size_t col_size = count_sz < 0 ? size() : count_sz;
if (col_size == 0) {
return;
}
IColumn::Offsets replicate_offsets(col_size);
size_t cur_offset = 0;
for (size_t i = 0; i < col_size; ++i) {
size_t end = begin + col_size;
for (size_t i = begin; i < end; ++i) {
cur_offset += counts[i];
replicate_offsets[i] = cur_offset;
replicate_offsets[i - begin] = cur_offset;
}
if (cur_offset != target_size) {
LOG(WARNING) << "ColumnArray replicate input target_size:" << target_size

View File

@ -120,7 +120,8 @@ public:
size_t allocated_bytes() const override;
void protect() override;
ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
ColumnPtr convert_to_full_column_if_const() const override;
void get_extremes(Field& min, Field& max) const override {
LOG(FATAL) << "get_extremes not implemented";

View File

@ -236,7 +236,8 @@ public:
ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
[[noreturn]] MutableColumns scatter(IColumn::ColumnIndex num_columns,
const IColumn::Selector& selector) const override {
@ -348,16 +349,17 @@ ColumnPtr ColumnComplexType<T>::replicate(const IColumn::Offsets& offsets) const
}
template <typename T>
void ColumnComplexType<T>::replicate(const uint32_t* counts, size_t target_size,
IColumn& column) const {
size_t size = data.size();
void ColumnComplexType<T>::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
size_t size = count_sz < 0 ? data.size() : count_sz;
if (0 == size) return;
auto& res = reinterpret_cast<ColumnComplexType<T>&>(column);
typename Self::Container& res_data = res.get_data();
res_data.reserve(target_size);
for (size_t i = 0; i < size; ++i) {
size_t end = size + begin;
for (size_t i = begin; i < end; ++i) {
size_t size_to_replicate = counts[i];
for (size_t j = 0; j < size_to_replicate; ++j) {
res_data.push_back(data[i]);

View File

@ -67,7 +67,8 @@ ColumnPtr ColumnConst::replicate(const Offsets& offsets) const {
return ColumnConst::create(data, replicated_size);
}
void ColumnConst::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const {
void ColumnConst::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
if (s == 0) return;
auto& res = reinterpret_cast<ColumnConst&>(column);
res.s = s;

View File

@ -138,7 +138,8 @@ public:
ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override;
ColumnPtr replicate(const Offsets& offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
ColumnPtr permute(const Permutation& perm, size_t limit) const override;
// ColumnPtr index(const IColumn & indexes, size_t limit) const override;
void get_permutation(bool reverse, size_t limit, int nan_direction_hint,

View File

@ -347,16 +347,17 @@ ColumnPtr ColumnDecimal<T>::replicate(const IColumn::Offsets& offsets) const {
}
template <typename T>
void ColumnDecimal<T>::replicate(const uint32_t* counts, size_t target_size,
IColumn& column) const {
size_t size = data.size();
void ColumnDecimal<T>::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
size_t size = count_sz < 0 ? data.size() : count_sz;
if (0 == size) return;
auto& res = reinterpret_cast<ColumnDecimal<T>&>(column);
typename Self::Container& res_data = res.get_data();
res_data.reserve(target_size);
for (size_t i = 0; i < size; ++i) {
size_t end = size + begin;
for (size_t i = begin; i < end; ++i) {
res_data.add_num_element_without_reserve(data[i], counts[i]);
}
}

View File

@ -191,7 +191,8 @@ public:
ColumnPtr replicate(const IColumn::Offsets& offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
void get_extremes(Field& min, Field& max) const override;

View File

@ -332,8 +332,9 @@ ColumnPtr ColumnJsonb::replicate(const Offsets& replicate_offsets) const {
return res;
}
void ColumnJsonb::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const {
size_t col_size = size();
void ColumnJsonb::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
size_t col_size = count_sz < 0 ? size() : count_sz;
if (0 == col_size) return;
auto& res = reinterpret_cast<ColumnJsonb&>(column);
@ -343,10 +344,12 @@ void ColumnJsonb::replicate(const uint32_t* counts, size_t target_size, IColumn&
res_chars.reserve(chars.size() / col_size * target_size);
res_offsets.reserve(target_size);
Offset prev_json_offset = 0;
size_t base = begin > 0 ? offset_at(begin - 1) : 0;
Offset prev_json_offset = 0 + base;
Offset current_new_offset = 0;
for (size_t i = 0; i < col_size; ++i) {
size_t end = begin + col_size;
for (size_t i = begin; i < end; ++i) {
size_t size_to_replicate = counts[i];
size_t json_size = offsets[i] - prev_json_offset;

View File

@ -252,7 +252,8 @@ public:
ColumnPtr replicate(const Offsets& replicate_offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override {
return scatter_impl<ColumnJsonb>(num_columns, selector);

View File

@ -530,10 +530,12 @@ ColumnPtr ColumnNullable::replicate(const Offsets& offsets) const {
return ColumnNullable::create(replicated_data, replicated_null_map);
}
void ColumnNullable::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const {
void ColumnNullable::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
auto& res = reinterpret_cast<ColumnNullable&>(column);
get_nested_column().replicate(counts, target_size, res.get_nested_column());
get_null_map_column().replicate(counts, target_size, res.get_null_map_column());
get_nested_column().replicate(counts, target_size, res.get_nested_column(), begin, count_sz);
get_null_map_column().replicate(counts, target_size, res.get_null_map_column(), begin,
count_sz);
}
template <bool negative>

View File

@ -162,7 +162,8 @@ public:
size_t allocated_bytes() const override;
void protect() override;
ColumnPtr replicate(const Offsets& replicate_offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
void update_hash_with_value(size_t n, SipHash& hash) const override;
void update_hashes_with_value(std::vector<SipHash>& hashes,
const uint8_t* __restrict null_data) const override;

View File

@ -402,8 +402,9 @@ ColumnPtr ColumnString::replicate(const Offsets& replicate_offsets) const {
return res;
}
void ColumnString::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const {
size_t col_size = size();
void ColumnString::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
size_t col_size = count_sz < 0 ? size() : count_sz;
if (0 == col_size) {
return;
}
@ -415,10 +416,12 @@ void ColumnString::replicate(const uint32_t* counts, size_t target_size, IColumn
res_chars.reserve(chars.size() / col_size * target_size);
res_offsets.reserve(target_size);
Offset prev_string_offset = 0;
size_t base = begin > 0 ? offsets[begin - 1] : 0;
Offset prev_string_offset = 0 + base;
Offset current_new_offset = 0;
for (size_t i = 0; i < col_size; ++i) {
size_t end = begin + col_size;
for (size_t i = begin; i < end; ++i) {
size_t size_to_replicate = counts[i];
size_t string_size = offsets[i] - prev_string_offset;

View File

@ -377,7 +377,8 @@ public:
ColumnPtr replicate(const Offsets& replicate_offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override {
return scatter_impl<ColumnString>(num_columns, selector);

View File

@ -478,15 +478,17 @@ ColumnPtr ColumnVector<T>::replicate(const IColumn::Offsets& offsets) const {
}
template <typename T>
void ColumnVector<T>::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const {
size_t size = data.size();
void ColumnVector<T>::replicate(const uint32_t* counts, size_t target_size, IColumn& column,
size_t begin, int count_sz) const {
size_t size = count_sz < 0 ? data.size() : count_sz;
if (size == 0) return;
auto& res = reinterpret_cast<ColumnVector<T>&>(column);
typename Self::Container& res_data = res.get_data();
res_data.reserve(target_size);
for (size_t i = 0; i < size; ++i) {
size_t end = begin + size;
for (size_t i = begin; i < end; ++i) {
res_data.add_num_element_without_reserve(data[i], counts[i]);
}
}

View File

@ -339,7 +339,8 @@ public:
ColumnPtr replicate(const IColumn::Offsets& offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
void get_extremes(Field& min, Field& max) const override;

File diff suppressed because it is too large Load Diff

View File

@ -151,6 +151,101 @@ using JoinOpVariants =
M(NULL_AWARE_LEFT_ANTI_JOIN)
class VExprContext;
class HashJoinNode;
template <class JoinOpType, bool ignore_null>
struct ProcessHashTableProbe {
ProcessHashTableProbe(HashJoinNode* join_node, int batch_size);
// output build side result column
template <bool have_other_join_conjunct = false>
void build_side_output_column(MutableColumns& mcol, int column_offset, int column_length,
const std::vector<bool>& output_slot_flags, int size);
template <bool have_other_join_conjunct = false>
void probe_side_output_column(MutableColumns& mcol, const std::vector<bool>& output_slot_flags,
int size, int last_probe_index, size_t probe_size,
bool all_match_one);
// Only process the join with no other join conjunt, because of no other join conjunt
// the output block struct is same with mutable block. we can do more opt on it and simplify
// the logic of probe
// TODO: opt the visited here to reduce the size of hash table
template <typename HashTableType>
Status do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
MutableBlock& mutable_block, Block* output_block, size_t probe_rows);
// In the presence of other join conjunt, the process of join become more complicated.
// each matching join column need to be processed by other join conjunt. so the sturct of mutable block
// and output block may be different
// The output result is determined by the other join conjunt result and same_to_prev struct
template <typename HashTableType>
Status do_process_with_other_join_conjunts(HashTableType& hash_table_ctx,
ConstNullMapPtr null_map,
MutableBlock& mutable_block, Block* output_block,
size_t probe_rows);
// Process full outer join/ right join / right semi/anti join to output the join result
// in hash table
template <typename HashTableType>
Status process_data_in_hashtable(HashTableType& hash_table_ctx, MutableBlock& mutable_block,
Block* output_block, bool* eos);
vectorized::HashJoinNode* _join_node;
const int _batch_size;
const std::vector<Block>& _build_blocks;
Arena _arena;
std::vector<uint32_t> _items_counts;
std::vector<int8_t> _build_block_offsets;
std::vector<int> _build_block_rows;
// only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN
ColumnUInt8::Container& _tuple_is_null_left_flags;
// only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN
ColumnUInt8::Container& _tuple_is_null_right_flags;
RuntimeProfile::Counter* _rows_returned_counter;
RuntimeProfile::Counter* _search_hashtable_timer;
RuntimeProfile::Counter* _build_side_output_timer;
RuntimeProfile::Counter* _probe_side_output_timer;
static constexpr int PROBE_SIDE_EXPLODE_RATE = 3;
};
using HashTableCtxVariants = std::variant<
std::monostate,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::INNER_JOIN>, true>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::LEFT_SEMI_JOIN>, true>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::LEFT_ANTI_JOIN>, true>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::LEFT_OUTER_JOIN>,
true>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::FULL_OUTER_JOIN>,
true>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_OUTER_JOIN>,
true>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::CROSS_JOIN>, true>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_SEMI_JOIN>,
true>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_ANTI_JOIN>,
true>,
ProcessHashTableProbe<
std::integral_constant<TJoinOp::type, TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>, true>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::INNER_JOIN>, false>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::LEFT_SEMI_JOIN>,
false>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::LEFT_ANTI_JOIN>,
false>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::LEFT_OUTER_JOIN>,
false>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::FULL_OUTER_JOIN>,
false>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_OUTER_JOIN>,
false>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::CROSS_JOIN>, false>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_SEMI_JOIN>,
false>,
ProcessHashTableProbe<std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_ANTI_JOIN>,
false>,
ProcessHashTableProbe<
std::integral_constant<TJoinOp::type, TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>, false>>;
class HashJoinNode : public ::doris::ExecNode {
public:
@ -224,6 +319,8 @@ private:
Arena _arena;
HashTableVariants _hash_table_variants;
HashTableCtxVariants _process_hashtable_ctx_variants;
std::vector<Block> _build_blocks;
Block _probe_block;
ColumnRawPtrs _probe_columns;
@ -246,10 +343,6 @@ private:
Block _join_block;
std::vector<uint32_t> _items_counts;
std::vector<int8_t> _build_block_offsets;
std::vector<int> _build_block_rows;
std::vector<SlotId> _hash_output_slot_ids;
std::vector<bool> _left_output_slot_flags;
std::vector<bool> _right_output_slot_flags;
@ -260,7 +353,6 @@ private:
MutableColumnPtr _tuple_is_null_left_flag_column;
MutableColumnPtr _tuple_is_null_right_flag_column;
private:
void _hash_table_build_thread(RuntimeState* state, std::promise<Status>* status);
Status _hash_table_build(RuntimeState* state);
@ -271,9 +363,10 @@ private:
bool& ignore_null, RuntimeProfile::Counter& expr_call_timer);
Status _extract_probe_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs,
bool& ignore_null, RuntimeProfile::Counter& expr_call_timer);
RuntimeProfile::Counter& expr_call_timer);
void _hash_table_init();
void _process_hashtable_ctx_variants_init(RuntimeState* state);
static constexpr auto _MAX_BUILD_BLOCK_COUNT = 128;
@ -294,7 +387,7 @@ private:
template <class HashTableContext>
friend struct ProcessHashTableBuild;
template <class HashTableContext, class JoinOpType, bool ignore_null>
template <class JoinOpType, bool ignore_null>
friend struct ProcessHashTableProbe;
template <class HashTableContext>