fix auto mem mgr use arena alloc leads dynamic leak
This commit is contained in:
committed by
ob-robot
parent
37716d6d43
commit
7ba9ad135e
@ -47,7 +47,8 @@ ObHashDistinctOp::ObHashDistinctOp(ObExecContext &exec_ctx, const ObOpSpec &spec
|
|||||||
hash_values_for_batch_(nullptr),
|
hash_values_for_batch_(nullptr),
|
||||||
build_distinct_data_func_(&ObHashDistinctOp::build_distinct_data),
|
build_distinct_data_func_(&ObHashDistinctOp::build_distinct_data),
|
||||||
build_distinct_data_batch_func_(&ObHashDistinctOp::build_distinct_data_for_batch),
|
build_distinct_data_batch_func_(&ObHashDistinctOp::build_distinct_data_for_batch),
|
||||||
bypass_ctrl_()
|
bypass_ctrl_(),
|
||||||
|
mem_context_(NULL)
|
||||||
{
|
{
|
||||||
enable_sql_dumped_ = GCONF.is_sql_operator_dump_enabled();
|
enable_sql_dumped_ = GCONF.is_sql_operator_dump_enabled();
|
||||||
}
|
}
|
||||||
@ -65,6 +66,8 @@ int ObHashDistinctOp::inner_open()
|
|||||||
->get_sys_variable(share::SYS_VAR__GROUPBY_NOPUSHDOWN_CUT_RATIO,
|
->get_sys_variable(share::SYS_VAR__GROUPBY_NOPUSHDOWN_CUT_RATIO,
|
||||||
bypass_ctrl_.cut_ratio_))) {
|
bypass_ctrl_.cut_ratio_))) {
|
||||||
LOG_WARN("failed to get no pushdown cut ratio", K(ret));
|
LOG_WARN("failed to get no pushdown cut ratio", K(ret));
|
||||||
|
} else if (OB_FAIL(init_mem_context())) {
|
||||||
|
LOG_WARN("failed to init mem context", K(ret));
|
||||||
} else {
|
} else {
|
||||||
first_got_row_ = true;
|
first_got_row_ = true;
|
||||||
tenant_id_ = ctx_.get_my_session()->get_effective_tenant_id();
|
tenant_id_ = ctx_.get_my_session()->get_effective_tenant_id();
|
||||||
@ -134,6 +137,10 @@ void ObHashDistinctOp::destroy()
|
|||||||
{
|
{
|
||||||
sql_mem_processor_.unregister_profile_if_necessary();
|
sql_mem_processor_.unregister_profile_if_necessary();
|
||||||
hp_infras_.~ObHashPartInfrastructure();
|
hp_infras_.~ObHashPartInfrastructure();
|
||||||
|
if (OB_LIKELY(NULL != mem_context_)) {
|
||||||
|
DESTROY_CONTEXT(mem_context_);
|
||||||
|
mem_context_ = NULL;
|
||||||
|
}
|
||||||
ObOperator::destroy();
|
ObOperator::destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -145,7 +152,7 @@ int ObHashDistinctOp::init_hash_partition_infras()
|
|||||||
&ctx_, MY_SPEC.px_est_size_factor_, est_rows, est_rows))) {
|
&ctx_, MY_SPEC.px_est_size_factor_, est_rows, est_rows))) {
|
||||||
LOG_WARN("failed to get px size", K(ret));
|
LOG_WARN("failed to get px size", K(ret));
|
||||||
} else if (OB_FAIL(sql_mem_processor_.init(
|
} else if (OB_FAIL(sql_mem_processor_.init(
|
||||||
&ctx_.get_allocator(),
|
&mem_context_->get_malloc_allocator(),
|
||||||
tenant_id_,
|
tenant_id_,
|
||||||
est_rows * MY_SPEC.width_,
|
est_rows * MY_SPEC.width_,
|
||||||
MY_SPEC.type_,
|
MY_SPEC.type_,
|
||||||
@ -782,5 +789,20 @@ int ObHashDistinctOp::process_state(int64_t probe_cnt, bool &can_insert)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObHashDistinctOp::init_mem_context()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (NULL == mem_context_) {
|
||||||
|
lib::ContextParam param;
|
||||||
|
param.set_mem_attr(ctx_.get_my_session()->get_effective_tenant_id(),
|
||||||
|
"ObHashDistRows",
|
||||||
|
ObCtxIds::WORK_AREA);
|
||||||
|
if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(mem_context_, param))) {
|
||||||
|
LOG_WARN("memory entity create failed", K(ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
} // end namespace sql
|
} // end namespace sql
|
||||||
} // end namespace oceanbase
|
} // end namespace oceanbase
|
||||||
|
|||||||
@ -65,6 +65,7 @@ private:
|
|||||||
int build_distinct_data_for_batch_by_pass(const int64_t batch_size, bool is_block);
|
int build_distinct_data_for_batch_by_pass(const int64_t batch_size, bool is_block);
|
||||||
int by_pass_get_next_batch(const int64_t batch_size);
|
int by_pass_get_next_batch(const int64_t batch_size);
|
||||||
int process_state(int64_t probe_cnt, bool &can_insert);
|
int process_state(int64_t probe_cnt, bool &can_insert);
|
||||||
|
int init_mem_context();
|
||||||
private:
|
private:
|
||||||
const int64_t EXTEND_BKT_NUM_PUSH_DOWN = INIT_L3_CACHE_SIZE / sizeof(ObHashPartCols);
|
const int64_t EXTEND_BKT_NUM_PUSH_DOWN = INIT_L3_CACHE_SIZE / sizeof(ObHashPartCols);
|
||||||
typedef int (ObHashDistinctOp::*GetNextRowFunc)();
|
typedef int (ObHashDistinctOp::*GetNextRowFunc)();
|
||||||
@ -88,6 +89,7 @@ private:
|
|||||||
Build_distinct_data_func build_distinct_data_func_;
|
Build_distinct_data_func build_distinct_data_func_;
|
||||||
Build_distinct_data_batch_func build_distinct_data_batch_func_;
|
Build_distinct_data_batch_func build_distinct_data_batch_func_;
|
||||||
ObAdaptiveByPassCtrl bypass_ctrl_;
|
ObAdaptiveByPassCtrl bypass_ctrl_;
|
||||||
|
lib::MemoryContext mem_context_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // end namespace sql
|
} // end namespace sql
|
||||||
|
|||||||
@ -659,7 +659,7 @@ int ObHashGroupByOp::init_distinct_info(bool is_part)
|
|||||||
&ctx_, MY_SPEC.px_est_size_factor_, est_rows, est_rows))) {
|
&ctx_, MY_SPEC.px_est_size_factor_, est_rows, est_rows))) {
|
||||||
LOG_WARN("failed to get px size", K(ret));
|
LOG_WARN("failed to get px size", K(ret));
|
||||||
} else if (OB_FAIL(distinct_sql_mem_processor_.init(
|
} else if (OB_FAIL(distinct_sql_mem_processor_.init(
|
||||||
&ctx_.get_allocator(),
|
&mem_context_->get_malloc_allocator(),
|
||||||
tenant_id,
|
tenant_id,
|
||||||
est_size,
|
est_size,
|
||||||
MY_SPEC.type_,
|
MY_SPEC.type_,
|
||||||
|
|||||||
@ -39,7 +39,8 @@ ObHashSetOp::ObHashSetOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInpu
|
|||||||
hp_infras_(),
|
hp_infras_(),
|
||||||
hash_values_for_batch_(nullptr),
|
hash_values_for_batch_(nullptr),
|
||||||
need_init_(true),
|
need_init_(true),
|
||||||
left_brs_(nullptr)
|
left_brs_(nullptr),
|
||||||
|
mem_context_(nullptr)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -51,6 +52,8 @@ int ObHashSetOp::inner_open()
|
|||||||
LOG_WARN("unexpected status: left or right is null", K(ret), K(left_), K(right_));
|
LOG_WARN("unexpected status: left or right is null", K(ret), K(left_), K(right_));
|
||||||
} else if (OB_FAIL(ObOperator::inner_open())) {
|
} else if (OB_FAIL(ObOperator::inner_open())) {
|
||||||
LOG_WARN("failed to inner open", K(ret));
|
LOG_WARN("failed to inner open", K(ret));
|
||||||
|
} else if (OB_FAIL(init_mem_context())) {
|
||||||
|
LOG_WARN("failed to init mem context", K(ret));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -90,6 +93,10 @@ void ObHashSetOp::destroy()
|
|||||||
{
|
{
|
||||||
sql_mem_processor_.unregister_profile_if_necessary();
|
sql_mem_processor_.unregister_profile_if_necessary();
|
||||||
hp_infras_.~ObHashPartInfrastructure();
|
hp_infras_.~ObHashPartInfrastructure();
|
||||||
|
if (OB_LIKELY(NULL != mem_context_)) {
|
||||||
|
DESTROY_CONTEXT(mem_context_);
|
||||||
|
mem_context_ = NULL;
|
||||||
|
}
|
||||||
ObOperator::destroy();
|
ObOperator::destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -162,7 +169,7 @@ int ObHashSetOp::build_hash_table(bool from_child)
|
|||||||
hp_infras_.get_cur_part_row_cnt(InputSide::RIGHT)))) {
|
hp_infras_.get_cur_part_row_cnt(InputSide::RIGHT)))) {
|
||||||
LOG_WARN("failed to init hash table", K(ret));
|
LOG_WARN("failed to init hash table", K(ret));
|
||||||
} else if (OB_FAIL(sql_mem_processor_.init(
|
} else if (OB_FAIL(sql_mem_processor_.init(
|
||||||
&ctx_.get_allocator(),
|
&mem_context_->get_malloc_allocator(),
|
||||||
ctx_.get_my_session()->get_effective_tenant_id(),
|
ctx_.get_my_session()->get_effective_tenant_id(),
|
||||||
hp_infras_.get_cur_part_file_size(InputSide::RIGHT),
|
hp_infras_.get_cur_part_file_size(InputSide::RIGHT),
|
||||||
spec_.type_,
|
spec_.type_,
|
||||||
@ -215,7 +222,7 @@ int ObHashSetOp::build_hash_table_from_left(bool from_child)
|
|||||||
hp_infras_.get_cur_part_row_cnt(InputSide::LEFT)))) {
|
hp_infras_.get_cur_part_row_cnt(InputSide::LEFT)))) {
|
||||||
LOG_WARN("failed to init hash table", K(ret));
|
LOG_WARN("failed to init hash table", K(ret));
|
||||||
} else if (OB_FAIL(sql_mem_processor_.init(
|
} else if (OB_FAIL(sql_mem_processor_.init(
|
||||||
&ctx_.get_allocator(),
|
&mem_context_->get_malloc_allocator(),
|
||||||
ctx_.get_my_session()->get_effective_tenant_id(),
|
ctx_.get_my_session()->get_effective_tenant_id(),
|
||||||
hp_infras_.get_cur_part_file_size(InputSide::LEFT),
|
hp_infras_.get_cur_part_file_size(InputSide::LEFT),
|
||||||
spec_.type_,
|
spec_.type_,
|
||||||
@ -267,7 +274,7 @@ int ObHashSetOp::build_hash_table_from_left_batch(bool from_child, const int64_t
|
|||||||
hp_infras_.get_cur_part_row_cnt(InputSide::LEFT)))) {
|
hp_infras_.get_cur_part_row_cnt(InputSide::LEFT)))) {
|
||||||
LOG_WARN("failed to init hash table", K(ret));
|
LOG_WARN("failed to init hash table", K(ret));
|
||||||
} else if (OB_FAIL(sql_mem_processor_.init(
|
} else if (OB_FAIL(sql_mem_processor_.init(
|
||||||
&ctx_.get_allocator(),
|
&mem_context_->get_malloc_allocator(),
|
||||||
ctx_.get_my_session()->get_effective_tenant_id(),
|
ctx_.get_my_session()->get_effective_tenant_id(),
|
||||||
hp_infras_.get_cur_part_file_size(InputSide::LEFT),
|
hp_infras_.get_cur_part_file_size(InputSide::LEFT),
|
||||||
spec_.type_,
|
spec_.type_,
|
||||||
@ -327,7 +334,7 @@ int ObHashSetOp::init_hash_partition_infras()
|
|||||||
&ctx_, get_spec().px_est_size_factor_, est_rows, est_rows))) {
|
&ctx_, get_spec().px_est_size_factor_, est_rows, est_rows))) {
|
||||||
LOG_WARN("failed to get px size", K(ret));
|
LOG_WARN("failed to get px size", K(ret));
|
||||||
} else if (OB_FAIL(sql_mem_processor_.init(
|
} else if (OB_FAIL(sql_mem_processor_.init(
|
||||||
&ctx_.get_allocator(),
|
&mem_context_->get_malloc_allocator(),
|
||||||
ctx_.get_my_session()->get_effective_tenant_id(),
|
ctx_.get_my_session()->get_effective_tenant_id(),
|
||||||
est_rows * get_spec().width_,
|
est_rows * get_spec().width_,
|
||||||
get_spec().type_,
|
get_spec().type_,
|
||||||
@ -433,5 +440,20 @@ int ObHashSetOp::init_hash_partition_infras_for_batch()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObHashSetOp::init_mem_context()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (NULL == mem_context_) {
|
||||||
|
lib::ContextParam param;
|
||||||
|
param.set_mem_attr(ctx_.get_my_session()->get_effective_tenant_id(),
|
||||||
|
"ObHashSetRows",
|
||||||
|
ObCtxIds::WORK_AREA);
|
||||||
|
if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(mem_context_, param))) {
|
||||||
|
LOG_WARN("memory entity create failed", K(ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
} // end namespace sql
|
} // end namespace sql
|
||||||
} // end namespace oceanbase
|
} // end namespace oceanbase
|
||||||
|
|||||||
@ -62,6 +62,7 @@ protected:
|
|||||||
const common::ObIArray<ObExpr*> &dst_exprs,
|
const common::ObIArray<ObExpr*> &dst_exprs,
|
||||||
const int64_t batch_size,
|
const int64_t batch_size,
|
||||||
const ObBitVector &skip);
|
const ObBitVector &skip);
|
||||||
|
int init_mem_context();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
//used by intersect and except
|
//used by intersect and except
|
||||||
@ -74,6 +75,7 @@ protected:
|
|||||||
//for batch array init, not reset in rescan
|
//for batch array init, not reset in rescan
|
||||||
bool need_init_;
|
bool need_init_;
|
||||||
const ObBatchRows *left_brs_;
|
const ObBatchRows *left_brs_;
|
||||||
|
lib::MemoryContext mem_context_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // end namespace sql
|
} // end namespace sql
|
||||||
|
|||||||
Reference in New Issue
Block a user