[CP] adjust algorithm for adaptive group by && split sqldtl memory to user tenant

This commit is contained in:
18523270951@163.com
2023-07-11 16:42:50 +00:00
committed by ob-robot
parent 27bce01acf
commit 21e7012901
7 changed files with 72 additions and 35 deletions

View File

@ -12,6 +12,7 @@
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/aggregate/ob_adaptive_bypass_ctrl.h"
#include "sql/optimizer/ob_opt_selectivity.h"
namespace oceanbase
@ -19,9 +20,12 @@ namespace oceanbase
namespace sql
{
void ObAdaptiveByPassCtrl::gby_process_state(int64_t probe_cnt, int64_t row_cnt, int64_t mem_size)
void ObAdaptiveByPassCtrl::gby_process_state(int64_t probe_cnt,
int64_t row_cnt,
int64_t mem_size)
{
int64_t min_period_cnt = MIN_PERIOD_CNT;
processed_cnt_ += probe_cnt;
if (!by_pass_ctrl_enabled_) {
// do nothing
} else if (STATE_PROCESS_HT == state_) {
@ -29,30 +33,63 @@ void ObAdaptiveByPassCtrl::gby_process_state(int64_t probe_cnt, int64_t row_cnt,
} else if (0 == probe_cnt) {
} else if (STATE_L2_INSERT == state_) {
// insert until exceed l2 cache
probe_cnt_ += probe_cnt;
if (!in_l2_cache(row_cnt, mem_size)) {
state_ = STATE_ANALYZE;
}
} else if (STATE_L3_INSERT == state_) {
// insert until exceed l3 cache
probe_cnt_ += probe_cnt;
if (!in_l3_cache(row_cnt, mem_size)) {
state_ = STATE_ANALYZE;
}
} else if (STATE_ANALYZE == state_) {
probe_cnt_ += probe_cnt;
double ratio = MIN_RATIO_FOR_L3;
if (static_cast<double> (exists_cnt_) / probe_cnt_ >=
probe_cnt_for_period_[round_times_ % MAX_REBUILD_TIMES] = probe_cnt;
ndv_cnt_for_period_[round_times_ % MAX_REBUILD_TIMES] = row_cnt;
++round_times_;
int64_t exists_cnt = probe_cnt - row_cnt;
if (static_cast<double> (exists_cnt) / probe_cnt >=
std::max(ratio, 1 - (1 / static_cast<double> (cut_ratio_)))) {
// very good distinct rate, can expend hash map to l3 cache
rebuild_times_ = 0;
if (in_l2_cache(row_cnt, mem_size)) {
if (in_l3_cache(row_cnt, mem_size)) {
state_ = STATE_L3_INSERT;
need_resize_hash_table_ = true;
} else {
state_ = STATE_PROCESS_HT;
}
} else if (static_cast<double> (exists_cnt_) / probe_cnt_ >=
} else if (round_times_ >= MAX_REBUILD_TIMES) {
double select_rows = 0.0;
double ndv = 0.0;
for (int64_t i = 0; i < MAX_REBUILD_TIMES; ++i) {
select_rows += probe_cnt_for_period_[i];
ndv += ndv_cnt_for_period_[i];
}
ndv /= MAX_REBUILD_TIMES;
double rows = select_rows / MAX_REBUILD_TIMES;
double new_ndv = ObOptSelectivity::scale_distinct(select_rows, rows, ndv);
double new_ratio = 1 - new_ndv / select_rows;
if (new_ratio >= std::max(ratio, 1 - (1 / static_cast<double> (cut_ratio_)))) {
// very good distinct rate, can expend hash map to l3 cache
rebuild_times_ = 0;
if (in_l3_cache(row_cnt, mem_size)) {
state_ = STATE_L3_INSERT;
need_resize_hash_table_ = true;
} else {
state_ = STATE_PROCESS_HT;
}
} else if (new_ratio >= 1 - (1 / static_cast<double> (cut_ratio_))) {
// good distinct rate, reset rebuild times
state_ = STATE_PROCESS_HT;
rebuild_times_ = 0;
} else {
// distinct rate is not good
// prepare to release curr hash table
state_ = STATE_PROCESS_HT;
}
//ObTaskController::get().allow_next_syslog();
LOG_TRACE("adaptive groupby try redefine ratio", K(select_rows), K(rows), K(ndv),
K(new_ndv), K(new_ratio), K(state_));
} else if (static_cast<double> (exists_cnt) / probe_cnt >=
1 - (1 / static_cast<double> (cut_ratio_))) {
// good distinct rate, reset rebuild times
state_ = STATE_PROCESS_HT;
@ -62,10 +99,10 @@ void ObAdaptiveByPassCtrl::gby_process_state(int64_t probe_cnt, int64_t row_cnt,
// prepare to release curr hash table
state_ = STATE_PROCESS_HT;
}
LOG_TRACE("get new state", K(state_), K(processed_cnt_), K(exists_cnt_),
K(probe_cnt_), K(rebuild_times_), K(cut_ratio_), K(mem_size), K(op_id_), K(row_cnt));
probe_cnt_ = 0;
exists_cnt_ = 0;
//ObTaskController::get().allow_next_syslog();
LOG_TRACE("adaptive groupby generate new state", K(state_), K(rebuild_times_), K(cut_ratio_),
K(mem_size), K(op_id_), K(row_cnt),
K(probe_cnt), K(exists_cnt), K(processed_cnt_));
}
}

View File

@ -26,9 +26,7 @@ const int64_t INIT_L3_CACHE_SIZE = get_level3_cache_size();
const int64_t MAX_L3_CACHE_SIZE = 50 *1024 *1024; //50M
const uint64_t FORCE_GPD = 0x100;
const int64_t MAX_REBUILD_TIMES = 5;
constexpr const double MIN_RATIO_FOR_L3 = 0.95;
const int64_t DISTINCT_ITEM_SIZE = 24;
const int64_t GROUP_BY_ITEM_SIZE = 40;
constexpr const double MIN_RATIO_FOR_L3 = 0.80;
class ObAdaptiveByPassCtrl {
public:
typedef enum {
@ -43,7 +41,8 @@ public:
ObAdaptiveByPassCtrl () : by_pass_(false), processed_cnt_(0), state_(STATE_L2_INSERT),
period_cnt_(MIN_PERIOD_CNT), probe_cnt_(0), exists_cnt_(0),
rebuild_times_(0), cut_ratio_(INIT_CUT_RATIO), by_pass_ctrl_enabled_(false),
small_row_cnt_(0), op_id_(-1), need_resize_hash_table_(false) {}
small_row_cnt_(0), op_id_(-1), need_resize_hash_table_(false),
round_times_(0) {}
inline void reset() {
by_pass_ = false;
processed_cnt_ = 0;
@ -74,7 +73,7 @@ public:
inline bool by_passing() { return by_pass_; }
inline void start_by_pass() { by_pass_ = true; }
inline void reset_rebuild_times() { rebuild_times_ = 0; }
inline bool rebuild_times_exceeded() { return rebuild_times_ > MAX_REBUILD_TIMES; }
inline bool rebuild_times_exceeded() { return rebuild_times_ >= MAX_REBUILD_TIMES; }
inline void set_max_rebuild_times() { rebuild_times_ = MAX_REBUILD_TIMES + 1; }
inline void open_by_pass_ctrl() { by_pass_ctrl_enabled_ = true; }
inline void set_op_id(int64_t op_id) { op_id_ = op_id; }
@ -92,6 +91,9 @@ public:
int64_t small_row_cnt_; // 0 will be omit
int64_t op_id_;
bool need_resize_hash_table_;
int64_t probe_cnt_for_period_[MAX_REBUILD_TIMES];
int64_t ndv_cnt_for_period_[MAX_REBUILD_TIMES];
int64_t round_times_;
};
} // end namespace sql

View File

@ -59,7 +59,8 @@ public:
: initial_bucket_num_(0),
size_(0),
buckets_(NULL),
allocator_("ExtendHTBucket")
allocator_("ExtendHTBucket"),
probe_cnt_(0)
{
}
~ObExtendHashTable() { destroy(); }
@ -68,12 +69,12 @@ public:
int64_t initial_size = INITIAL_SIZE);
bool is_inited() const { return NULL != buckets_; }
// return the first item which equal to, NULL for none exist.
const Item *get(const Item &item) const;
const Item *get(const Item &item);
// Link item to hash table, extend buckets if needed.
// (Do not check item is exist or not)
int set(Item &item);
int64_t size() const { return size_; }
int64_t get_probe_cnt() const { return probe_cnt_; }
void reuse()
{
int ret = common::OB_SUCCESS;
@ -85,6 +86,7 @@ public:
}
}
size_ = 0;
probe_cnt_ = 0;
}
int resize(ObIAllocator *allocator, int64_t bucket_num);
@ -154,6 +156,7 @@ protected:
int64_t size_;
BucketArray *buckets_;
common::ModulePageAllocator allocator_;
int64_t probe_cnt_;
};
template <typename Item>
@ -205,9 +208,10 @@ int ObExtendHashTable<Item>::resize(ObIAllocator *allocator, int64_t bucket_num)
}
template <typename Item>
const Item *ObExtendHashTable<Item>::get(const Item &item) const
const Item *ObExtendHashTable<Item>::get(const Item &item)
{
Item *res = NULL;
++probe_cnt_;
if (NULL == buckets_) {
// do nothing
} else {

View File

@ -935,10 +935,8 @@ int ObHashGroupByOp::load_data()
bool check_dump = false;
ObGbyBloomFilter *bloom_filter = NULL;
const ObChunkDatumStore::StoredRow *srow = NULL;
int64_t last_batch_size = 0;
for (int64_t loop_cnt = 0; OB_SUCC(ret); ++loop_cnt) {
int64_t curr_batch_size = 0;
bypass_ctrl_.gby_process_state(last_batch_size,
bypass_ctrl_.gby_process_state(local_group_rows_.get_probe_cnt(),
local_group_rows_.size(),
get_actual_mem_used_size());
if (bypass_ctrl_.processing_ht()) {
@ -1001,8 +999,6 @@ int ObHashGroupByOp::load_data()
srow, curr_gr_item.hash_))) {
LOG_WARN("failed to get_groupby_exprs_hash", K(ret));
}
++curr_batch_size;
bypass_ctrl_.inc_processed_cnt(1);
curr_gr_item.is_expr_row_ = true;
curr_gr_item.batch_idx_ = 0;
LOG_DEBUG("finish calc_groupby_exprs_hash", K(curr_gr_item));
@ -1010,7 +1006,6 @@ int ObHashGroupByOp::load_data()
} else if ((!start_dump || bloom_filter->exist(curr_gr_item.hash()))
&& NULL != (exist_curr_gr_item = local_group_rows_.get(curr_gr_item))) {
++agged_row_cnt_;
bypass_ctrl_.inc_exists_cnt();
if (OB_ISNULL(exist_curr_gr_item->group_row_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("group_row is null", K(ret));
@ -1065,7 +1060,6 @@ int ObHashGroupByOp::load_data()
}
has_checked = true;
} while (!last_group && OB_SUCC(ret));
last_batch_size = curr_batch_size;
}
//必须先reset,否则等自动析构时候,内存都被释放了,会有问题
@ -1621,7 +1615,7 @@ int ObHashGroupByOp::load_data_batch(int64_t max_row_cnt)
int64_t last_batch_size = 0;
while (OB_SUCC(ret)) {
bypass_ctrl_.gby_process_state(last_batch_size,
bypass_ctrl_.gby_process_state(local_group_rows_.get_probe_cnt(),
local_group_rows_.size(),
get_actual_mem_used_size());
if (bypass_ctrl_.processing_ht()) {
@ -1637,8 +1631,6 @@ int ObHashGroupByOp::load_data_batch(int64_t max_row_cnt)
if (NULL != cur_part) {
store_rows = batch_rows_from_dump_;
}
last_batch_size = child_brs->size_ - child_brs->skip_->accumulate_bit_cnt(child_brs->size_);
bypass_ctrl_.inc_processed_cnt(last_batch_size);
clear_evaluated_flag();
loop_cnt += child_brs->size_;
if (OB_FAIL(try_check_status())) {
@ -2274,7 +2266,6 @@ int ObHashGroupByOp::group_child_batch_rows(const ObChunkDatumStore::StoredRow *
gris_per_batch_[gri_cnt_per_batch_++] = exist_curr_gr_item;
}
++agged_row_cnt_;
bypass_ctrl_.inc_exists_cnt();
batch_row_gri_ptrs_[i] = exist_curr_gr_item;
const_cast<ObGroupRowItem *>(exist_curr_gr_item)->group_row_count_in_batch_++;
LOG_DEBUG("exist item", K(gri_cnt_per_batch_), K(*exist_curr_gr_item),

View File

@ -117,7 +117,7 @@ class ObGroupRowHashTable : public ObExtendHashTable<ObGroupRowItem>
public:
ObGroupRowHashTable() : ObExtendHashTable(), eval_ctx_(nullptr), cmp_funcs_(nullptr) {}
OB_INLINE const ObGroupRowItem *get(const ObGroupRowItem &item) const;
OB_INLINE const ObGroupRowItem *get(const ObGroupRowItem &item);
OB_INLINE void prefetch(const ObBatchRows &brs, uint64_t *hash_vals) const;
int init(ObIAllocator *allocator,
lib::ObMemAttr &mem_attr,
@ -134,11 +134,12 @@ private:
static const int64_t HASH_BUCKET_PREFETCH_MAGIC_NUM = 4 * 1024;
};
OB_INLINE const ObGroupRowItem *ObGroupRowHashTable::get(const ObGroupRowItem &item) const
OB_INLINE const ObGroupRowItem *ObGroupRowHashTable::get(const ObGroupRowItem &item)
{
ObGroupRowItem *res = NULL;
int ret = OB_SUCCESS;
bool result = false;
++probe_cnt_;
if (OB_UNLIKELY(NULL == buckets_)) {
// do nothing
} else {

View File

@ -305,7 +305,8 @@ int ObPxReceiveOp::link_ch_sets(ObPxTaskChSet &ch_set,
} else if (OB_FAIL(dfc->reserve(ch_set.count()))) {
LOG_WARN("fail reserve dfc channels", K(ret), K(ch_set.count()));
} else if (ch_set.count() > 0) {
void *buf = oceanbase::common::ob_malloc(DTL_CHANNEL_SIZE * ch_set.count(), "SqlDtlRecvChan");
ObMemAttr attr(ctx_.get_my_session()->get_effective_tenant_id(), "SqlDtlRecvChan");
void *buf = oceanbase::common::ob_malloc(DTL_CHANNEL_SIZE * ch_set.count(), attr);
if (nullptr == buf) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("malloc channel buf failed", K(ret));

View File

@ -982,7 +982,8 @@ int ObPxTransmitOp::link_ch_sets(ObPxTaskChSet &ch_set,
} else if (OB_FAIL(dfc->reserve(ch_set.count()))) {
LOG_WARN("fail reserve dfc channels", K(ret), K(ch_set.count()));
} else if (ch_set.count() > 0) {
void *buf = oceanbase::common::ob_malloc(DTL_CHANNEL_SIZE * ch_set.count(), "SqlDtlTxChan");
ObMemAttr attr(ctx_.get_my_session()->get_effective_tenant_id(), "SqlDtlTxChan");
void *buf = oceanbase::common::ob_malloc(DTL_CHANNEL_SIZE * ch_set.count(), attr);
if (nullptr == buf) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("malloc channel buf failed", K(ret));