[FEAT MERGE]

Co-authored-by: cqliang1995 <cq.liang@outlook.com>
Co-authored-by: qingsuijiu <642782632@qq.com>
Co-authored-by: yaojing624 <jingfeng.jf@oceanbase.com>
This commit is contained in:
obdev
2024-04-16 10:51:16 +00:00
committed by ob-robot
parent 49b1cfbe6b
commit 037fe7d9bb
89 changed files with 4987 additions and 1511 deletions

View File

@ -81,7 +81,7 @@ enum BucketState {
};
/*
| |extra: aggr row + next ptr| |
| |extra: aggr row | |
compat row
*/
class ObGroupRowItemVec : public ObCompactRow
@ -93,18 +93,7 @@ public:
{
return static_cast<char *>(this->get_extra_payload(row_meta));
}
ObGroupRowItemVec *next(const RowMeta &row_meta)
{
return reinterpret_cast<ObGroupRowItemVec *>(*reinterpret_cast<int64_t *>(
(static_cast<char *>(this->get_extra_payload(row_meta)) + row_meta.extra_size_ - get_sizeof_next_ptr())));
}
void set_next(const ObGroupRowItemVec *next, const RowMeta &row_meta)
{
*reinterpret_cast<int64_t *>(static_cast<char *>(this->get_extra_payload(row_meta))
+ row_meta.extra_size_ - get_sizeof_next_ptr()) = reinterpret_cast<int64_t>(next);
}
const ObCompactRow *get_groupby_row() const { return this; }
static int64_t get_sizeof_next_ptr() { return sizeof(ObGroupRowItemVec *); }
};
static_assert(sizeof(ObGroupRowItemVec) == sizeof(RowHeader),
@ -159,7 +148,7 @@ struct ObGroupRowBucketInline : public ObGroupRowBucketBase
}
OB_INLINE ObGroupRowBucketInline &operator =(const ObGroupRowBucketInline &other)
{
hash_ = other.hash_;
info_ = other.info_;
MEMCPY(buffer_, other.buffer_, INLINE_SIZE);
return *this;
}
@ -562,7 +551,6 @@ public:
change_valid_idx_(),
change_valid_idx_cnt_(0),
srows_(nullptr),
likely_equal_function_(nullptr),
op_id_(-1),
sstr_aggr_(group_store_)
{
@ -596,6 +584,7 @@ public:
} else if (OB_UNLIKELY(!inited_)) {
inited_ = true;
curr_bkt_idx_ = 0;
scan_cnt_ = 0;
while (curr_bkt_idx_ < hash_set_.buckets_->count() && !hash_set_.buckets_->at(curr_bkt_idx_).is_valid()) {
++curr_bkt_idx_;
}
@ -611,21 +600,14 @@ public:
}
if (OB_SUCC(ret)) {
for (int64_t i = read_idx; !iter_end_ && i < max_rows; ++i) {
if (OB_LIKELY(nullptr == curr_item_->next(meta_))) {
do {
++curr_bkt_idx_;
if (curr_bkt_idx_ >= hash_set_.buckets_->count()) {
iter_end_ = true;
}
} while(!iter_end_ && !hash_set_.buckets_->at(curr_bkt_idx_).is_valid());
if (!iter_end_) {
rows[i] = &hash_set_.buckets_->at(curr_bkt_idx_).get_item();
curr_item_ = const_cast<ObGroupRowItemVec *> (static_cast<const ObGroupRowItemVec *> (rows[i]));
++scan_cnt_;
++read_rows;
do {
++curr_bkt_idx_;
if (curr_bkt_idx_ >= hash_set_.buckets_->count()) {
iter_end_ = true;
}
} else {
rows[i] = curr_item_->next(meta_);
} while(!iter_end_ && !hash_set_.buckets_->at(curr_bkt_idx_).is_valid());
if (!iter_end_) {
rows[i] = &hash_set_.buckets_->at(curr_bkt_idx_).get_item();
curr_item_ = const_cast<ObGroupRowItemVec *> (static_cast<const ObGroupRowItemVec *> (rows[i]));
++scan_cnt_;
++read_rows;
@ -633,7 +615,9 @@ public:
}
if (iter_end_ && scan_cnt_ != hash_set_.size()) {
ret = OB_ERR_UNEXPECTED;
SQL_ENG_LOG(WARN, "scan cnt is not match", K(ret), K(scan_cnt_), K(hash_set_.size()));
SQL_ENG_LOG(WARN, "scan cnt is not match", K(ret), K(read_rows), K(scan_cnt_),
K(hash_set_.size()), K(curr_bkt_idx_),
K(hash_set_.get_bucket_num()));
} else if (!iter_end_ && read_rows != max_rows) {
ret = OB_ERR_UNEXPECTED;
SQL_ENG_LOG(WARN, "read rows is not match", K(ret), K(read_rows), K(max_rows));
@ -668,10 +652,8 @@ public:
bool iter_end_;
};
friend class ObExtendHashTableVec::Iterator;
static int calc_extra_size(int64_t aggr_row_size) { return aggr_row_size + sizeof(ObGroupRowItemVec *); }
static int calc_extra_size(int64_t aggr_row_size) { return aggr_row_size; }
bool is_inited() const { return sstr_aggr_.is_valid() || NULL != buckets_; }
// return the first item which equal to, NULL for none exist.
int get(const int64_t batch_idx, uint64_t hash_val, char *&aggr_row);
int init(ObIAllocator *allocator,
lib::ObMemAttr &mem_attr,
@ -686,12 +668,6 @@ public:
int64_t aggr_row_size,
int64_t initial_size,
bool auto_extend);
int append(const common::ObIArray<ObExpr *> &gby_exprs,
const int64_t batch_idx,
const uint64_t hash_value,
const common::ObIArray<int64_t> &lengths,
char *&get_row,
bool need_reinit_vectors = false);
int append_batch(const common::ObIArray<ObExpr *> &gby_exprs,
const ObBatchRows &child_brs,
const bool *is_dumped,
@ -717,6 +693,19 @@ public:
int64_t &agg_group_cnt,
BatchAggrRowsTable *batch_aggr_rows,
bool need_reinit_vectors);
int inner_process_batch(const common::ObIArray<ObExpr *> &gby_exprs,
const ObBatchRows &child_brs,
const bool *is_dumped,
const uint64_t *hash_values,
const common::ObIArray<int64_t> &lengths,
const bool can_append_batch,
const ObGbyBloomFilterVec *bloom_filter,
char **batch_old_rows,
char **batch_new_rows,
int64_t &agg_row_cnt,
int64_t &agg_group_cnt,
BatchAggrRowsTable *batch_aggr_rows,
bool need_reinit_vectors);
void prefetch(const ObBatchRows &brs, uint64_t *hash_vals) const;
// Link item to hash table, extend buckets if needed.
// (Do not check item is exist or not)
@ -833,52 +822,61 @@ public:
}
return ret;
}
int set_distinct(const RowMeta &row_meta,
const uint16_t batch_idx,
uint64_t hash_value,
StoreRowFunc sf);
int set_distinct_batch(const RowMeta &row_meta,
const int64_t batch_size,
const ObBitVector *child_skip,
ObBitVector &my_skip,
uint64_t *hash_values,
StoreRowFunc sf);
int inner_process_batch(const RowMeta &row_meta,
const int64_t batch_size,
const ObBitVector *child_skip,
ObBitVector &my_skip,
uint64_t *hash_values,
StoreRowFunc sf);
int get(const RowMeta &row_meta,
const int64_t batch_idx,
uint64_t hash_val,
const RowItemType *&item);
typedef int (ObExtendHashTableVec::*CMP_FUNC)(const RowMeta &row_meta, const ObCompactRow &left, const int64_t right_idx, bool &result) const;
int likely_equal(const RowMeta &row_meta,
const ObCompactRow &left,
const int64_t right_idx,
bool &result) const;
int likely_equal_nullable(const RowMeta &row_meta,
const ObCompactRow &left,
const int64_t right_idx,
bool &result) const;
int likely_equal_fixed64(const RowMeta &row_meta,
const ObCompactRow &left,
const int64_t right_idx,
bool &result) const;
int likely_equal_fixed64_nullable(const RowMeta &row_meta,
const ObCompactRow &left,
const int64_t right_idx,
bool &result) const;
int extend(const int64_t new_bucket_num);
const BucketArray *get_buckets() const { return buckets_; }
protected:
// Locate the bucket with the same hash value, or empty bucket if not found.
// The returned empty bucket is the insert position for the %hash_val
OB_INLINE const GroupRowBucket &locate_bucket(const BucketArray &buckets,
const uint64_t hash_val) const
OB_INLINE const GroupRowBucket &locate_next_bucket(const BucketArray &buckets,
const uint64_t hash_val,
int64_t &curr_pos) const
{
const GroupRowBucket *bucket = nullptr;
const int64_t cnt = buckets.count();
uint64_t mask_hash = (hash_val & ObGroupRowBucketBase::HASH_VAL_MASK);
if (OB_LIKELY(curr_pos < 0)) {
// relocate bkt
curr_pos = mask_hash & (cnt - 1);
bucket = &buckets.at(curr_pos);
} else {
bucket = &buckets.at((++curr_pos) & (cnt - 1));
}
while (!bucket->check_hash(mask_hash) && bucket->is_valid()) {
bucket = &buckets.at((++curr_pos) & (cnt - 1));
}
return *bucket;
}
// used for extend
OB_INLINE const GroupRowBucket &locate_empty_bucket(const BucketArray &buckets,
const uint64_t hash_val) const
{
const int64_t cnt = buckets.count();
uint64_t mask_hash = (hash_val & ObGroupRowBucketBase::HASH_VAL_MASK);
int64_t pos = mask_hash & (cnt - 1);
const GroupRowBucket *bucket = &buckets.at(pos);
// The extend logical make sure the bucket never full, loop count will always less than %cnt
while (!bucket->check_hash(mask_hash) && bucket->is_valid()) {
bucket = &buckets.at((++pos) & (cnt - 1));
int64_t curr_pos = mask_hash & (cnt - 1);
const GroupRowBucket * bucket = &buckets.at(curr_pos);
while (bucket->is_valid()) {
bucket = &buckets.at((++curr_pos) & (cnt - 1));
}
return *bucket;
}
@ -912,7 +910,6 @@ protected:
common::ObFixedArray<uint16_t, common::ObIAllocator> change_valid_idx_;
int64_t change_valid_idx_cnt_;
ObCompactRow **srows_;
CMP_FUNC likely_equal_function_;
int64_t op_id_;
ShortStringAggregator sstr_aggr_;
};
@ -988,7 +985,7 @@ int ObExtendHashTableVec<GroupRowBucket>::extend(const int64_t new_bucket_num)
for (int64_t i = 0; i < size; i++) {
const GroupRowBucket &old = buckets_->at(i);
if (old.is_valid()) {
const_cast<GroupRowBucket &>(locate_bucket(*new_buckets, old.get_hash())) = old;
const_cast<GroupRowBucket &>(locate_empty_bucket(*new_buckets, old.get_hash())) = old;
} else if (old.is_occupyed()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("extend is prepare allocated", K(old.get_hash()));
@ -1042,17 +1039,20 @@ int ObExtendHashTableVec<GroupRowBucket>::get(const RowMeta &row_meta,
if (OB_UNLIKELY(NULL == buckets_)) {
// do nothing
} else {
bucket = const_cast<GroupRowBucket *> (&locate_bucket(*buckets_, hash_val));
if (bucket->is_valid()) {
RowItemType *it = &(bucket->get_item());
while (OB_SUCC(ret) && nullptr != it) {
int64_t curr_pos = -1;
bool find_bkt = false;
while (OB_SUCC(ret) && !find_bkt) {
bucket = const_cast<GroupRowBucket *> (&locate_next_bucket(*buckets_, hash_val, curr_pos));
if (!bucket->is_valid()) {
find_bkt = true;
} else {
RowItemType *it = &(bucket->get_item());
if (OB_FAIL(likely_equal_nullable(row_meta, static_cast<ObCompactRow&>(*it), batch_idx, result))) {
LOG_WARN("failed to cmp", K(ret));
} else if (result) {
item = it;
break;
find_bkt = true;
}
it = it->next(row_meta);
}
}
}