Fix: defragment_sys_variable has memory allocation failure wild pointer problem
This commit is contained in:
@ -91,7 +91,9 @@ ObBasicSessionInfo::ObBasicSessionInfo()
|
|||||||
trans_flags_(),
|
trans_flags_(),
|
||||||
sql_scope_flags_(),
|
sql_scope_flags_(),
|
||||||
base_sys_var_alloc_(ObModIds::OB_SQL_SESSION, OB_MALLOC_NORMAL_BLOCK_SIZE),
|
base_sys_var_alloc_(ObModIds::OB_SQL_SESSION, OB_MALLOC_NORMAL_BLOCK_SIZE),
|
||||||
inc_sys_var_alloc_(ObModIds::OB_SQL_SESSION, OB_MALLOC_NORMAL_BLOCK_SIZE),
|
inc_sys_var_alloc1_(ObModIds::OB_SQL_SESSION, OB_MALLOC_NORMAL_BLOCK_SIZE),
|
||||||
|
inc_sys_var_alloc2_(ObModIds::OB_SQL_SESSION, OB_MALLOC_NORMAL_BLOCK_SIZE),
|
||||||
|
current_buf_index_(0),
|
||||||
bucket_allocator_wrapper_(&block_allocator_),
|
bucket_allocator_wrapper_(&block_allocator_),
|
||||||
user_var_val_map_(SMALL_BLOCK_SIZE, ObWrapperAllocator(&block_allocator_)),
|
user_var_val_map_(SMALL_BLOCK_SIZE, ObWrapperAllocator(&block_allocator_)),
|
||||||
influence_plan_var_indexs_(),
|
influence_plan_var_indexs_(),
|
||||||
@ -151,6 +153,8 @@ ObBasicSessionInfo::ObBasicSessionInfo()
|
|||||||
CHAR_CARRAY_INIT(trace_id_buff_);
|
CHAR_CARRAY_INIT(trace_id_buff_);
|
||||||
ssl_cipher_buff_[0] = '\0';
|
ssl_cipher_buff_[0] = '\0';
|
||||||
sess_bt_buff_[0] = '\0';
|
sess_bt_buff_[0] = '\0';
|
||||||
|
inc_sys_var_alloc_[0] = &inc_sys_var_alloc1_;
|
||||||
|
inc_sys_var_alloc_[1] = &inc_sys_var_alloc2_;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObBasicSessionInfo::~ObBasicSessionInfo()
|
ObBasicSessionInfo::~ObBasicSessionInfo()
|
||||||
@ -411,7 +415,9 @@ void ObBasicSessionInfo::reset(bool skip_sys_var)
|
|||||||
// 最后再重置所有allocator
|
// 最后再重置所有allocator
|
||||||
// 否则thread_data_.user_name_之类的属性会有野指针,在session_mgr的foreach接口遍历时可能core掉。
|
// 否则thread_data_.user_name_之类的属性会有野指针,在session_mgr的foreach接口遍历时可能core掉。
|
||||||
name_pool_.reset();
|
name_pool_.reset();
|
||||||
inc_sys_var_alloc_.reset();
|
inc_sys_var_alloc1_.reset();
|
||||||
|
inc_sys_var_alloc2_.reset();
|
||||||
|
current_buf_index_ = 0;
|
||||||
if (!skip_sys_var) {
|
if (!skip_sys_var) {
|
||||||
base_sys_var_alloc_.reset();
|
base_sys_var_alloc_.reset();
|
||||||
sys_var_fac_.destroy();
|
sys_var_fac_.destroy();
|
||||||
@ -1741,7 +1747,7 @@ int ObBasicSessionInfo::deep_copy_sys_variable(ObBasicSysVar &sys_var,
|
|||||||
sys_var.set_value(dest_val);
|
sys_var.set_value(dest_val);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (OB_FAIL(inc_sys_var_alloc_.write_obj(src_val, &dest_val))) {
|
if (OB_FAIL(inc_sys_var_alloc_[current_buf_index_]->write_obj(src_val, &dest_val))) {
|
||||||
LOG_WARN("fail to write obj", K(src_val), K(ret));
|
LOG_WARN("fail to write obj", K(src_val), K(ret));
|
||||||
} else {
|
} else {
|
||||||
if (ob_is_string_type(src_val.get_type())) {
|
if (ob_is_string_type(src_val.get_type())) {
|
||||||
@ -1755,23 +1761,30 @@ int ObBasicSessionInfo::deep_copy_sys_variable(ObBasicSysVar &sys_var,
|
|||||||
|
|
||||||
// defragment.
|
// defragment.
|
||||||
// https://work.aone.alibaba-inc.com/issue/39958139
|
// https://work.aone.alibaba-inc.com/issue/39958139
|
||||||
|
// Double buffer optimization
|
||||||
|
// https://work.aone.alibaba-inc.com/issue/47795403
|
||||||
|
// Double buffer optimization uses two buffers to cyclically store
|
||||||
|
// system variable values. If the currently used buffer reaches the
|
||||||
|
// upper limit, the variable value will be defragmented and stored
|
||||||
|
// in another buffer, which can avoid the failure of using one buffer
|
||||||
|
// to store in the temporary buffer and cause wild pointer problem.
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (inc_sys_var_alloc_.used() > next_frag_mem_point_) {
|
if (inc_sys_var_alloc_[current_buf_index_]->used() > next_frag_mem_point_) {
|
||||||
// Note: this defrag impl. algrothim is not efficient.
|
// Double buffer optimization
|
||||||
// howerver, considering it is not the common path, we don't expect to be here very often.
|
// tmp_value storage tmp dest_val.
|
||||||
// for an efficient impl. please refer https://yuque.antfin-inc.com/xiaochu.yh/doc/oa9az6
|
ObArray<std::pair<int64_t, ObObj>> tmp_value;
|
||||||
common::ObStringBuf tmp_buf(ObModIds::OB_SQL_SESSION, OB_MALLOC_NORMAL_BLOCK_SIZE);
|
if (OB_FAIL(defragment_sys_variable_from(tmp_value))) {
|
||||||
if (OB_FAIL(defragment_sys_variable_to(tmp_buf))) {
|
|
||||||
LOG_WARN("fail to defrag sys variable memory to temp alloc", K(ret));
|
LOG_WARN("fail to defrag sys variable memory to temp alloc", K(ret));
|
||||||
} else {
|
} else {
|
||||||
|
int32_t next_index = (current_buf_index_ == 0 ? 1 : 0);
|
||||||
LOG_INFO("Too much memory used for system variable values. do defragment",
|
LOG_INFO("Too much memory used for system variable values. do defragment",
|
||||||
"before", inc_sys_var_alloc_.used(), "after", tmp_buf.used());
|
"before", inc_sys_var_alloc_[current_buf_index_]->used(),
|
||||||
inc_sys_var_alloc_.reset();
|
"after", inc_sys_var_alloc_[next_index]->used());
|
||||||
if (OB_FAIL(defragment_sys_variable_to(inc_sys_var_alloc_))) {
|
defragment_sys_variable_to(tmp_value);
|
||||||
LOG_WARN("fail to defrag sys variable memory to base sys var alloc", K(ret));
|
inc_sys_var_alloc_[current_buf_index_]->reset();
|
||||||
} else {
|
current_buf_index_ = next_index;
|
||||||
next_frag_mem_point_ = std::max(2 * inc_sys_var_alloc_.used(), OB_MALLOC_NORMAL_BLOCK_SIZE);
|
next_frag_mem_point_ = std::max(2 * inc_sys_var_alloc_[next_index]->used(),
|
||||||
}
|
OB_MALLOC_NORMAL_BLOCK_SIZE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1779,11 +1792,13 @@ int ObBasicSessionInfo::deep_copy_sys_variable(ObBasicSysVar &sys_var,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObBasicSessionInfo::defragment_sys_variable_to(common::ObStringBuf &allocator)
|
// buf2 write_obj for defragment_sys_variable and push dest_val into tmp_value.
|
||||||
|
int ObBasicSessionInfo::defragment_sys_variable_from(ObArray<std::pair<int64_t, ObObj>> &tmp_value)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
const SysVarIds &all_sys_var_ids = sys_var_inc_info_.get_all_sys_var_ids();
|
const SysVarIds &all_sys_var_ids = sys_var_inc_info_.get_all_sys_var_ids();
|
||||||
for (int i = 0; OB_SUCC(ret) && i < all_sys_var_ids.count(); i++) {
|
int32_t next_index = (current_buf_index_ == 0 ? 1 : 0);
|
||||||
|
for (int64_t i = 0; OB_SUCC(ret) && i < all_sys_var_ids.count(); i++) {
|
||||||
int64_t store_idx = -1;
|
int64_t store_idx = -1;
|
||||||
ObSysVarClassType sys_var_id = all_sys_var_ids.at(i);
|
ObSysVarClassType sys_var_id = all_sys_var_ids.at(i);
|
||||||
OZ (ObSysVarFactory::calc_sys_var_store_idx(sys_var_id, store_idx));
|
OZ (ObSysVarFactory::calc_sys_var_store_idx(sys_var_id, store_idx));
|
||||||
@ -1793,17 +1808,32 @@ int ObBasicSessionInfo::defragment_sys_variable_to(common::ObStringBuf &allocato
|
|||||||
const ObObj &src_val = sys_vars_[store_idx]->get_value();
|
const ObObj &src_val = sys_vars_[store_idx]->get_value();
|
||||||
if (ob_is_string_type(src_val.get_type()) || ob_is_number_tc(src_val.get_type())) {
|
if (ob_is_string_type(src_val.get_type()) || ob_is_number_tc(src_val.get_type())) {
|
||||||
ObObj dest_val;
|
ObObj dest_val;
|
||||||
if (OB_FAIL(allocator.write_obj(src_val, &dest_val))) {
|
if (OB_FAIL(inc_sys_var_alloc_[next_index]->write_obj(src_val, &dest_val))) {
|
||||||
LOG_WARN("fail to write obj", K(src_val), K(ret));
|
LOG_WARN("fail to write obj", K(src_val), K(ret));
|
||||||
|
} else if (OB_FAIL(tmp_value.push_back(std::pair<int64_t, ObObj>(store_idx, dest_val)))){
|
||||||
|
LOG_WARN("fail to push back tmp_value", K(ret));
|
||||||
} else {
|
} else {
|
||||||
sys_vars_[store_idx]->set_value(dest_val);
|
LOG_DEBUG("success to push back tmp value", K(ret), K(store_idx), K(dest_val));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
inc_sys_var_alloc_[next_index]->reset();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sys_vars_ set value.
|
||||||
|
void ObBasicSessionInfo::defragment_sys_variable_to(ObArray<std::pair<int64_t, ObObj>> &tmp_value)
|
||||||
|
{
|
||||||
|
|
||||||
|
for (int64_t i = 0; i < tmp_value.count(); i++) {
|
||||||
|
sys_vars_[tmp_value.at(i).first]->set_value(tmp_value.at(i).second);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
int ObBasicSessionInfo::update_session_sys_variable(ObExecContext &ctx,
|
int ObBasicSessionInfo::update_session_sys_variable(ObExecContext &ctx,
|
||||||
const ObString &var,
|
const ObString &var,
|
||||||
const ObObj &val)
|
const ObObj &val)
|
||||||
@ -4423,7 +4453,9 @@ int ObBasicSessionInfo::clean_all_sys_vars()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
OX (base_sys_var_alloc_.reset());
|
OX (base_sys_var_alloc_.reset());
|
||||||
OX (inc_sys_var_alloc_.reset());
|
OX (inc_sys_var_alloc1_.reset());
|
||||||
|
OX (inc_sys_var_alloc2_.reset());
|
||||||
|
current_buf_index_ = 0;
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1327,7 +1327,8 @@ private:
|
|||||||
int deep_copy_sys_variable(share::ObBasicSysVar &sys_var,
|
int deep_copy_sys_variable(share::ObBasicSysVar &sys_var,
|
||||||
const share::ObSysVarClassType sys_var_id,
|
const share::ObSysVarClassType sys_var_id,
|
||||||
const common::ObObj &src_val);
|
const common::ObObj &src_val);
|
||||||
int defragment_sys_variable_to(common::ObStringBuf &allocator);
|
int defragment_sys_variable_from(ObArray<std::pair<int64_t, ObObj>> &tmp_value);
|
||||||
|
void defragment_sys_variable_to(ObArray<std::pair<int64_t, ObObj>> &tmp_value);
|
||||||
int deep_copy_trace_id_var(const common::ObObj &src_val,
|
int deep_copy_trace_id_var(const common::ObObj &src_val,
|
||||||
common::ObObj *dest_val_ptr);
|
common::ObObj *dest_val_ptr);
|
||||||
inline int store_query_string_(const ObString &stmt);
|
inline int store_query_string_(const ObString &stmt);
|
||||||
@ -1933,7 +1934,12 @@ protected:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
common::ObStringBuf base_sys_var_alloc_; // for variables names and statement names
|
common::ObStringBuf base_sys_var_alloc_; // for variables names and statement names
|
||||||
common::ObStringBuf inc_sys_var_alloc_; // for values in sys variables update
|
// Double buffer optimization
|
||||||
|
common::ObStringBuf *inc_sys_var_alloc_[2]; // for values in sys variables update
|
||||||
|
common::ObStringBuf inc_sys_var_alloc1_; // for values in sys variables update
|
||||||
|
common::ObStringBuf inc_sys_var_alloc2_; // for values in sys variables update
|
||||||
|
int32_t current_buf_index_; // for record current buf index
|
||||||
|
// Double buffer optimization end.
|
||||||
common::ObWrapperAllocator bucket_allocator_wrapper_;
|
common::ObWrapperAllocator bucket_allocator_wrapper_;
|
||||||
ObSessionValMap user_var_val_map_; // user variables
|
ObSessionValMap user_var_val_map_; // user variables
|
||||||
share::ObBasicSysVar *sys_vars_[share::ObSysVarFactory::ALL_SYS_VARS_COUNT]; // system variables
|
share::ObBasicSysVar *sys_vars_[share::ObSysVarFactory::ALL_SYS_VARS_COUNT]; // system variables
|
||||||
|
|||||||
Reference in New Issue
Block a user