diff --git a/src/sql/engine/aggregate/ob_adaptive_bypass_ctrl.cpp b/src/sql/engine/aggregate/ob_adaptive_bypass_ctrl.cpp index 5283fb4212..6cf1eee3b0 100644 --- a/src/sql/engine/aggregate/ob_adaptive_bypass_ctrl.cpp +++ b/src/sql/engine/aggregate/ob_adaptive_bypass_ctrl.cpp @@ -12,6 +12,7 @@ #define USING_LOG_PREFIX SQL_ENG #include "sql/engine/aggregate/ob_adaptive_bypass_ctrl.h" +#include "sql/optimizer/ob_opt_selectivity.h" namespace oceanbase @@ -19,9 +20,12 @@ namespace oceanbase namespace sql { -void ObAdaptiveByPassCtrl::gby_process_state(int64_t probe_cnt, int64_t row_cnt, int64_t mem_size) +void ObAdaptiveByPassCtrl::gby_process_state(int64_t probe_cnt, + int64_t row_cnt, + int64_t mem_size) { int64_t min_period_cnt = MIN_PERIOD_CNT; + processed_cnt_ += probe_cnt; if (!by_pass_ctrl_enabled_) { // do nothing } else if (STATE_PROCESS_HT == state_) { @@ -29,30 +33,63 @@ void ObAdaptiveByPassCtrl::gby_process_state(int64_t probe_cnt, int64_t row_cnt, } else if (0 == probe_cnt) { } else if (STATE_L2_INSERT == state_) { // insert until exceed l2 cache - probe_cnt_ += probe_cnt; if (!in_l2_cache(row_cnt, mem_size)) { state_ = STATE_ANALYZE; } } else if (STATE_L3_INSERT == state_) { // insert until exceed l3 cache - probe_cnt_ += probe_cnt; if (!in_l3_cache(row_cnt, mem_size)) { state_ = STATE_ANALYZE; } } else if (STATE_ANALYZE == state_) { - probe_cnt_ += probe_cnt; double ratio = MIN_RATIO_FOR_L3; - if (static_cast (exists_cnt_) / probe_cnt_ >= + probe_cnt_for_period_[round_times_ % MAX_REBUILD_TIMES] = probe_cnt; + ndv_cnt_for_period_[round_times_ % MAX_REBUILD_TIMES] = row_cnt; + ++round_times_; + int64_t exists_cnt = probe_cnt - row_cnt; + if (static_cast (exists_cnt) / probe_cnt >= std::max(ratio, 1 - (1 / static_cast (cut_ratio_)))) { // very good distinct rate, can expend hash map to l3 cache rebuild_times_ = 0; - if (in_l2_cache(row_cnt, mem_size)) { + if (in_l3_cache(row_cnt, mem_size)) { state_ = STATE_L3_INSERT; need_resize_hash_table_ = true; } else { state_ = STATE_PROCESS_HT; } - } else if (static_cast (exists_cnt_) / probe_cnt_ >= + } else if (round_times_ >= MAX_REBUILD_TIMES) { + double select_rows = 0.0; + double ndv = 0.0; + for (int64_t i = 0; i < MAX_REBUILD_TIMES; ++i) { + select_rows += probe_cnt_for_period_[i]; + ndv += ndv_cnt_for_period_[i]; + } + ndv /= MAX_REBUILD_TIMES; + double rows = select_rows / MAX_REBUILD_TIMES; + double new_ndv = ObOptSelectivity::scale_distinct(select_rows, rows, ndv); + double new_ratio = 1 - new_ndv / select_rows; + if (new_ratio >= std::max(ratio, 1 - (1 / static_cast (cut_ratio_)))) { + // very good distinct rate, can expend hash map to l3 cache + rebuild_times_ = 0; + if (in_l3_cache(row_cnt, mem_size)) { + state_ = STATE_L3_INSERT; + need_resize_hash_table_ = true; + } else { + state_ = STATE_PROCESS_HT; + } + } else if (new_ratio >= 1 - (1 / static_cast (cut_ratio_))) { + // good distinct rate, reset rebuild times + state_ = STATE_PROCESS_HT; + rebuild_times_ = 0; + } else { + // distinct rate is not good + // prepare to release curr hash table + state_ = STATE_PROCESS_HT; + } + //ObTaskController::get().allow_next_syslog(); + LOG_TRACE("adaptive groupby try redefine ratio", K(select_rows), K(rows), K(ndv), + K(new_ndv), K(new_ratio), K(state_)); + } else if (static_cast (exists_cnt) / probe_cnt >= 1 - (1 / static_cast (cut_ratio_))) { // good distinct rate, reset rebuild times state_ = STATE_PROCESS_HT; @@ -62,10 +99,10 @@ void ObAdaptiveByPassCtrl::gby_process_state(int64_t probe_cnt, int64_t row_cnt, // prepare to release curr hash table state_ = STATE_PROCESS_HT; } - LOG_TRACE("get new state", K(state_), K(processed_cnt_), K(exists_cnt_), - K(probe_cnt_), K(rebuild_times_), K(cut_ratio_), K(mem_size), K(op_id_), K(row_cnt)); - probe_cnt_ = 0; - exists_cnt_ = 0; + //ObTaskController::get().allow_next_syslog(); + LOG_TRACE("adaptive groupby generate new state", K(state_), K(rebuild_times_), K(cut_ratio_), + K(mem_size), K(op_id_), K(row_cnt), + K(probe_cnt), K(exists_cnt), K(processed_cnt_)); } } diff --git a/src/sql/engine/aggregate/ob_adaptive_bypass_ctrl.h b/src/sql/engine/aggregate/ob_adaptive_bypass_ctrl.h index cd6233cabb..6a0b8d38d4 100644 --- a/src/sql/engine/aggregate/ob_adaptive_bypass_ctrl.h +++ b/src/sql/engine/aggregate/ob_adaptive_bypass_ctrl.h @@ -26,9 +26,7 @@ const int64_t INIT_L3_CACHE_SIZE = get_level3_cache_size(); const int64_t MAX_L3_CACHE_SIZE = 50 *1024 *1024; //50M const uint64_t FORCE_GPD = 0x100; const int64_t MAX_REBUILD_TIMES = 5; -constexpr const double MIN_RATIO_FOR_L3 = 0.95; -const int64_t DISTINCT_ITEM_SIZE = 24; -const int64_t GROUP_BY_ITEM_SIZE = 40; +constexpr const double MIN_RATIO_FOR_L3 = 0.80; class ObAdaptiveByPassCtrl { public: typedef enum { @@ -43,7 +41,8 @@ public: ObAdaptiveByPassCtrl () : by_pass_(false), processed_cnt_(0), state_(STATE_L2_INSERT), period_cnt_(MIN_PERIOD_CNT), probe_cnt_(0), exists_cnt_(0), rebuild_times_(0), cut_ratio_(INIT_CUT_RATIO), by_pass_ctrl_enabled_(false), - small_row_cnt_(0), op_id_(-1), need_resize_hash_table_(false) {} + small_row_cnt_(0), op_id_(-1), need_resize_hash_table_(false), + round_times_(0) {} inline void reset() { by_pass_ = false; processed_cnt_ = 0; @@ -74,7 +73,7 @@ public: inline bool by_passing() { return by_pass_; } inline void start_by_pass() { by_pass_ = true; } inline void reset_rebuild_times() { rebuild_times_ = 0; } - inline bool rebuild_times_exceeded() { return rebuild_times_ > MAX_REBUILD_TIMES; } + inline bool rebuild_times_exceeded() { return rebuild_times_ >= MAX_REBUILD_TIMES; } inline void set_max_rebuild_times() { rebuild_times_ = MAX_REBUILD_TIMES + 1; } inline void open_by_pass_ctrl() { by_pass_ctrl_enabled_ = true; } inline void set_op_id(int64_t op_id) { op_id_ = op_id; } @@ -92,6 +91,9 @@ public: int64_t small_row_cnt_; // 0 will be omit int64_t op_id_; bool need_resize_hash_table_; + int64_t probe_cnt_for_period_[MAX_REBUILD_TIMES]; + int64_t ndv_cnt_for_period_[MAX_REBUILD_TIMES]; + int64_t round_times_; }; } // end namespace sql diff --git a/src/sql/engine/aggregate/ob_exec_hash_struct.h b/src/sql/engine/aggregate/ob_exec_hash_struct.h index 51b1322465..e5d5bfc50f 100644 --- a/src/sql/engine/aggregate/ob_exec_hash_struct.h +++ b/src/sql/engine/aggregate/ob_exec_hash_struct.h @@ -59,7 +59,8 @@ public: : initial_bucket_num_(0), size_(0), buckets_(NULL), - allocator_("ExtendHTBucket") + allocator_("ExtendHTBucket"), + probe_cnt_(0) { } ~ObExtendHashTable() { destroy(); } @@ -68,12 +69,12 @@ public: int64_t initial_size = INITIAL_SIZE); bool is_inited() const { return NULL != buckets_; } // return the first item which equal to, NULL for none exist. - const Item *get(const Item &item) const; + const Item *get(const Item &item); // Link item to hash table, extend buckets if needed. // (Do not check item is exist or not) int set(Item &item); int64_t size() const { return size_; } - + int64_t get_probe_cnt() const { return probe_cnt_; } void reuse() { int ret = common::OB_SUCCESS; @@ -85,6 +86,7 @@ public: } } size_ = 0; + probe_cnt_ = 0; } int resize(ObIAllocator *allocator, int64_t bucket_num); @@ -154,6 +156,7 @@ protected: int64_t size_; BucketArray *buckets_; common::ModulePageAllocator allocator_; + int64_t probe_cnt_; }; template @@ -205,9 +208,10 @@ int ObExtendHashTable::resize(ObIAllocator *allocator, int64_t bucket_num) } template -const Item *ObExtendHashTable::get(const Item &item) const +const Item *ObExtendHashTable::get(const Item &item) { Item *res = NULL; + ++probe_cnt_; if (NULL == buckets_) { // do nothing } else { diff --git a/src/sql/engine/aggregate/ob_hash_groupby_op.cpp b/src/sql/engine/aggregate/ob_hash_groupby_op.cpp index a4287f8948..e433a36de2 100644 --- a/src/sql/engine/aggregate/ob_hash_groupby_op.cpp +++ b/src/sql/engine/aggregate/ob_hash_groupby_op.cpp @@ -935,10 +935,8 @@ int ObHashGroupByOp::load_data() bool check_dump = false; ObGbyBloomFilter *bloom_filter = NULL; const ObChunkDatumStore::StoredRow *srow = NULL; - int64_t last_batch_size = 0; for (int64_t loop_cnt = 0; OB_SUCC(ret); ++loop_cnt) { - int64_t curr_batch_size = 0; - bypass_ctrl_.gby_process_state(last_batch_size, + bypass_ctrl_.gby_process_state(local_group_rows_.get_probe_cnt(), local_group_rows_.size(), get_actual_mem_used_size()); if (bypass_ctrl_.processing_ht()) { @@ -1001,8 +999,6 @@ int ObHashGroupByOp::load_data() srow, curr_gr_item.hash_))) { LOG_WARN("failed to get_groupby_exprs_hash", K(ret)); } - ++curr_batch_size; - bypass_ctrl_.inc_processed_cnt(1); curr_gr_item.is_expr_row_ = true; curr_gr_item.batch_idx_ = 0; LOG_DEBUG("finish calc_groupby_exprs_hash", K(curr_gr_item)); @@ -1010,7 +1006,6 @@ int ObHashGroupByOp::load_data() } else if ((!start_dump || bloom_filter->exist(curr_gr_item.hash())) && NULL != (exist_curr_gr_item = local_group_rows_.get(curr_gr_item))) { ++agged_row_cnt_; - bypass_ctrl_.inc_exists_cnt(); if (OB_ISNULL(exist_curr_gr_item->group_row_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("group_row is null", K(ret)); @@ -1065,7 +1060,6 @@ int ObHashGroupByOp::load_data() } has_checked = true; } while (!last_group && OB_SUCC(ret)); - last_batch_size = curr_batch_size; } //必须先reset,否则等自动析构时候,内存都被释放了,会有问题 @@ -1621,7 +1615,7 @@ int ObHashGroupByOp::load_data_batch(int64_t max_row_cnt) int64_t last_batch_size = 0; while (OB_SUCC(ret)) { - bypass_ctrl_.gby_process_state(last_batch_size, + bypass_ctrl_.gby_process_state(local_group_rows_.get_probe_cnt(), local_group_rows_.size(), get_actual_mem_used_size()); if (bypass_ctrl_.processing_ht()) { @@ -1637,8 +1631,6 @@ int ObHashGroupByOp::load_data_batch(int64_t max_row_cnt) if (NULL != cur_part) { store_rows = batch_rows_from_dump_; } - last_batch_size = child_brs->size_ - child_brs->skip_->accumulate_bit_cnt(child_brs->size_); - bypass_ctrl_.inc_processed_cnt(last_batch_size); clear_evaluated_flag(); loop_cnt += child_brs->size_; if (OB_FAIL(try_check_status())) { @@ -2274,7 +2266,6 @@ int ObHashGroupByOp::group_child_batch_rows(const ObChunkDatumStore::StoredRow * gris_per_batch_[gri_cnt_per_batch_++] = exist_curr_gr_item; } ++agged_row_cnt_; - bypass_ctrl_.inc_exists_cnt(); batch_row_gri_ptrs_[i] = exist_curr_gr_item; const_cast(exist_curr_gr_item)->group_row_count_in_batch_++; LOG_DEBUG("exist item", K(gri_cnt_per_batch_), K(*exist_curr_gr_item), diff --git a/src/sql/engine/aggregate/ob_hash_groupby_op.h b/src/sql/engine/aggregate/ob_hash_groupby_op.h index 2c1890176b..1ec92c04f1 100644 --- a/src/sql/engine/aggregate/ob_hash_groupby_op.h +++ b/src/sql/engine/aggregate/ob_hash_groupby_op.h @@ -117,7 +117,7 @@ class ObGroupRowHashTable : public ObExtendHashTable public: ObGroupRowHashTable() : ObExtendHashTable(), eval_ctx_(nullptr), cmp_funcs_(nullptr) {} - OB_INLINE const ObGroupRowItem *get(const ObGroupRowItem &item) const; + OB_INLINE const ObGroupRowItem *get(const ObGroupRowItem &item); OB_INLINE void prefetch(const ObBatchRows &brs, uint64_t *hash_vals) const; int init(ObIAllocator *allocator, lib::ObMemAttr &mem_attr, @@ -134,11 +134,12 @@ private: static const int64_t HASH_BUCKET_PREFETCH_MAGIC_NUM = 4 * 1024; }; -OB_INLINE const ObGroupRowItem *ObGroupRowHashTable::get(const ObGroupRowItem &item) const +OB_INLINE const ObGroupRowItem *ObGroupRowHashTable::get(const ObGroupRowItem &item) { ObGroupRowItem *res = NULL; int ret = OB_SUCCESS; bool result = false; + ++probe_cnt_; if (OB_UNLIKELY(NULL == buckets_)) { // do nothing } else { diff --git a/src/sql/engine/px/exchange/ob_px_receive_op.cpp b/src/sql/engine/px/exchange/ob_px_receive_op.cpp index 00daa9a3da..81ed89840e 100644 --- a/src/sql/engine/px/exchange/ob_px_receive_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_receive_op.cpp @@ -305,7 +305,8 @@ int ObPxReceiveOp::link_ch_sets(ObPxTaskChSet &ch_set, } else if (OB_FAIL(dfc->reserve(ch_set.count()))) { LOG_WARN("fail reserve dfc channels", K(ret), K(ch_set.count())); } else if (ch_set.count() > 0) { - void *buf = oceanbase::common::ob_malloc(DTL_CHANNEL_SIZE * ch_set.count(), "SqlDtlRecvChan"); + ObMemAttr attr(ctx_.get_my_session()->get_effective_tenant_id(), "SqlDtlRecvChan"); + void *buf = oceanbase::common::ob_malloc(DTL_CHANNEL_SIZE * ch_set.count(), attr); if (nullptr == buf) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("malloc channel buf failed", K(ret)); diff --git a/src/sql/engine/px/exchange/ob_px_transmit_op.cpp b/src/sql/engine/px/exchange/ob_px_transmit_op.cpp index 8407e28517..0fd6b3129d 100644 --- a/src/sql/engine/px/exchange/ob_px_transmit_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_transmit_op.cpp @@ -982,7 +982,8 @@ int ObPxTransmitOp::link_ch_sets(ObPxTaskChSet &ch_set, } else if (OB_FAIL(dfc->reserve(ch_set.count()))) { LOG_WARN("fail reserve dfc channels", K(ret), K(ch_set.count())); } else if (ch_set.count() > 0) { - void *buf = oceanbase::common::ob_malloc(DTL_CHANNEL_SIZE * ch_set.count(), "SqlDtlTxChan"); + ObMemAttr attr(ctx_.get_my_session()->get_effective_tenant_id(), "SqlDtlTxChan"); + void *buf = oceanbase::common::ob_malloc(DTL_CHANNEL_SIZE * ch_set.count(), attr); if (nullptr == buf) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("malloc channel buf failed", K(ret));