[CP] bug fix: core dumps during the SQL audit logging stage.

This commit is contained in:
GongYusen 2024-06-17 12:54:16 +00:00 committed by ob-robot
parent 1756434624
commit 9af8acc82a
8 changed files with 92 additions and 73 deletions

View File

@ -254,7 +254,7 @@ int ObAllVirtualPsItemInfo::get_next_row_from_specified_tenant(uint64_t tenant_i
// still need to deref_stmt_item even though ret is not OB_SUCCESS
if (OB_NOT_NULL(stmt_item)) {
stmt_item->dec_ref_count_check_erase();
stmt_item->dec_ref_count();
stmt_item = NULL;
}

View File

@ -1160,7 +1160,7 @@ int ObSql::do_add_ps_cache(const PsCacheInfoCtx &info_ctx,
ObPsStmtId inner_stmt_id = ps_stmt_item->get_ps_stmt_id();
ps_cache->deref_stmt_info(inner_stmt_id); //需要决定是否摘除
}
ps_stmt_item->dec_ref_count_check_erase();
ps_stmt_item->dec_ref_count();
}
}
}
@ -1903,7 +1903,7 @@ int ObSql::handle_ps_prepare(const ObString &stmt,
|| need_do_real_prepare
|| duplicate_prepare) {
if (NULL != stmt_item) {
stmt_item->dec_ref_count_check_erase();
stmt_item->dec_ref_count();
}
if (NULL != stmt_info) {
ps_cache->deref_stmt_info(inner_stmt_id); //需要决定是否摘除

View File

@ -158,7 +158,7 @@ bool ObPsStmtItem::check_erase_inc_ref_count()
return need_erase;
}
void ObPsStmtItem::dec_ref_count_check_erase()
void ObPsStmtItem::dec_ref_count()
{
int ret = OB_SUCCESS;
LOG_TRACE("ps item dec ref count", K(*this));
@ -606,9 +606,8 @@ bool ObPsStmtInfo::check_erase_inc_ref_count()
return need_erase;
}
bool ObPsStmtInfo::dec_ref_count_check_erase()
void ObPsStmtInfo::dec_ref_count()
{
bool need_erase = false;
LOG_TRACE("ps info dec ref count", K(*this));
int64_t ref_count = ATOMIC_SAF(&ref_count_, 1);
if (ref_count > 0) {
@ -617,12 +616,11 @@ bool ObPsStmtInfo::dec_ref_count_check_erase()
}
LOG_TRACE("ps info dec ref count", K(ref_count), K(*this));
} else if (0 == ref_count) {
need_erase = true;
LOG_INFO("free ps info", K(ref_count), K(*this), K(need_erase));
LOG_INFO("free ps info", K(ref_count), K(*this));
} else if (ref_count < 0) {
BACKTRACE_RET(ERROR, OB_ERR_UNEXPECTED, true, "ObPsStmtInfo %p ref count < 0, ref_count = %ld", this, ref_count);
}
return need_erase;
return;
}
int64_t ObPsStmtInfo::to_string(char *buf, const int64_t buf_len) const

View File

@ -115,7 +115,7 @@ public:
ObPsStmtId get_ps_stmt_id() const { return stmt_id_; }
bool check_erase_inc_ref_count();
void dec_ref_count_check_erase();
void dec_ref_count();
int64_t get_ref_count() const { return ATOMIC_LOAD(&ref_count_); }
int get_convert_size(int64_t &cv_size) const;
@ -205,7 +205,7 @@ public:
bool is_valid() const;
bool check_erase_inc_ref_count();
bool dec_ref_count_check_erase();
void dec_ref_count();
int deep_copy(const ObPsStmtInfo &other);
int add_param_field(const common::ObField &param);
int add_column_field(const common::ObField &column);
@ -251,6 +251,7 @@ public:
void set_is_expired() { ATOMIC_STORE(&is_expired_, true); }
bool is_expired() { return ATOMIC_LOAD(&is_expired_); }
bool *get_is_expired_evicted_ptr() { return &is_expired_evicted_; }
bool try_erase() { return 1 == ATOMIC_VCAS(&ref_count_, 1, 0); }
DECLARE_VIRTUAL_TO_STRING;

View File

@ -136,37 +136,24 @@ int ObPsCache::init(const int64_t hash_bucket,
return ret;
}
//for close a session explicitly
int ObPsCache::deref_ps_stmt(const ObPsStmtId stmt_id, bool erase_item/*=false*/)
//only for close a session explicitly
int ObPsCache::deref_ps_stmt(const ObPsStmtId stmt_id)
{
int ret = OB_SUCCESS;
LOG_DEBUG("close ps stmt", K(stmt_id));
ObPsStmtInfoGuard guard;
if (OB_FAIL(get_stmt_info_guard(stmt_id, guard))) {
ObPsStmtInfo *ps_info = nullptr;
if (OB_FAIL(stmt_info_map_.get_refactored(stmt_id, ps_info))) {
LOG_WARN("get stmt info guard failed", K(ret), K(stmt_id));
} else {
ObPsStmtInfo *ps_info = guard.get_stmt_info();
const ObPsSqlKey ps_sql_key = ps_info->get_sql_key();
int tmp_ret = OB_SUCCESS;
if (erase_item) { // dec cached ref
if (OB_FAIL(erase_stmt_item(stmt_id, ps_sql_key))) {
LOG_WARN("fail to erase stmt", K(ret));
}
} else { // dec session ref
if (OB_ISNULL(ps_info->get_ps_item())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(*ps_info));
} else {
ps_info->get_ps_item()->dec_ref_count_check_erase();
}
}
if (OB_SUCCESS != (tmp_ret = deref_stmt_info(stmt_id))) {
ret = tmp_ret; //previous ret ignore
LOG_WARN("deref stmt info failed", K(ret), K(stmt_id), K(ps_sql_key));
if (OB_ISNULL(ps_info->get_ps_item())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(*ps_info));
} else {
LOG_TRACE("deref stmt info success", K(stmt_id), K(ps_sql_key), K(ret));
ps_info->get_ps_item()->dec_ref_count();
}
ps_info->dec_ref_count();
}
return ret;
}
@ -363,18 +350,6 @@ int ObPsCache::inner_ref_stmt_item(const ObPsSqlKey &ps_sql_key,
return ret;
}
int ObPsCache::deref_stmt_item(const ObPsSqlKey &ps_sql_key)
{
int ret = OB_SUCCESS;
ObPsStmtItemDerefAtomicOp op;
if (OB_FAIL(stmt_id_map_.read_atomic(ps_sql_key, op))) {
LOG_WARN("deref stmt item failed", K(ps_sql_key), K(ret));
} else if (op.get_ret() != OB_SUCCESS) {
ret = OB_ERR_UNEXPECTED;
}
return ret;
}
int ObPsCache::ref_stmt_item(const ObPsSqlKey &ps_sql_key, ObPsStmtItem *&stmt_item)
{
int ret = OB_SUCCESS;
@ -485,7 +460,7 @@ int ObPsCache::erase_stmt_item(ObPsStmtId stmt_id, const ObPsSqlKey &ps_key)
LOG_WARN("fail to erase stmt info", K(ps_key), K(ret));
}
} else {
ps_item->dec_ref_count_check_erase();
ps_item->dec_ref_count();
}
}
return ret;
@ -692,21 +667,6 @@ int ObPsCache::deref_stmt_info(const ObPsStmtId stmt_id)
} else if (op.get_ret() != OB_SUCCESS) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("deref stmt info failed", K(ret));
} else if (op.is_erase()) {
if (OB_FAIL(stmt_info_map_.erase_refactored(stmt_id, &ps_info))) {
LOG_WARN("fail to erase stmt info", K(stmt_id), K(ret));
} else if (OB_ISNULL(ps_info)) {
LOG_INFO("ps info is null in map", K(stmt_id));
// do nothing
} else if (OB_ISNULL(allocator = ps_info->get_external_allocator())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(allocator), K(ret));
} else {
LOG_INFO("free ps info", K(*ps_info), K(stmt_id));
ps_info->~ObPsStmtInfo();
allocator->free(ps_info);
}
LOG_INFO("erase stmt info", K(stmt_id), K(ret));
}
return ret;
}
@ -788,7 +748,7 @@ int ObPsCache::inner_cache_evict(bool is_evict_all)
// evict expired ps
for (int64_t i = 0; i < expired_stmt_ids.count(); ++i) { //ignore ret
LOG_TRACE("ps close time", K(i), K(expired_stmt_ids.at(i).first), K(expired_stmt_ids.at(i).second));
if (OB_FAIL(deref_ps_stmt(expired_stmt_ids.at(i).first, true/*erase_stmt*/))) {
if (OB_FAIL(destroy_cached_ps(expired_stmt_ids.at(i).first))) {
LOG_WARN("fail to evict ps stmt", K(ret),
K(expired_stmt_ids.at(i).first), K(expired_stmt_ids.count()));
}
@ -796,7 +756,7 @@ int ObPsCache::inner_cache_evict(bool is_evict_all)
if (is_evict_all) {
for (int64_t i = 0; i < closed_stmt_ids.count(); ++i) { //ignore ret
LOG_TRACE("ps close time", K(i), K(closed_stmt_ids.at(i).first), K(closed_stmt_ids.at(i).second));
if (OB_FAIL(deref_ps_stmt(closed_stmt_ids.at(i).first, true/*erase_stmt*/))) {
if (OB_FAIL(destroy_cached_ps(closed_stmt_ids.at(i).first))) {
LOG_WARN("fail to evict ps stmt", K(ret),
K(closed_stmt_ids.at(i).first), K(closed_stmt_ids.count()));
}
@ -806,7 +766,7 @@ int ObPsCache::inner_cache_evict(bool is_evict_all)
std::sort(closed_stmt_ids.begin(), closed_stmt_ids.end(), PsTimeCmp());
for (int64_t i = 0; i < closed_stmt_ids.count() / 2; ++i) { //ignore ret
LOG_TRACE("ps close time", K(i), K(closed_stmt_ids.at(i).first), K(closed_stmt_ids.at(i).second));
if (OB_FAIL(deref_ps_stmt(closed_stmt_ids.at(i).first, true/*erase_stmt*/))) {
if (OB_FAIL(destroy_cached_ps(closed_stmt_ids.at(i).first))) {
LOG_WARN("fail to evict ps stmt", K(ret),
K(closed_stmt_ids.at(i).first), K(closed_stmt_ids.count()));
}
@ -819,6 +779,45 @@ int ObPsCache::inner_cache_evict(bool is_evict_all)
return ret;
}
int ObPsCache::destroy_cached_ps(const ObPsStmtId inner_stmt_id)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObPsStmtInfo *stmt_info = nullptr;
ObPsStmtInfoDestroyAtomicOp op;
LOG_INFO("start to evict ps from cache", K(inner_stmt_id));
if (OB_FAIL(stmt_info_map_.atomic_refactored(inner_stmt_id, op))) {
if (ret == OB_HASH_NOT_EXIST) {
ret = OB_SUCCESS;
LOG_INFO("ps stmt info has been erase by others", K(inner_stmt_id));
} else {
LOG_WARN("failed to get ps stmt info from stmt_info_map_", K(ret), K(inner_stmt_id));
}
} else if (OB_FAIL(op.get_ret())) {
LOG_WARN("failed to get ps stmt info from stmt_info_map_", K(ret), K(inner_stmt_id));
} else if (!op.marked_erase()) {
LOG_TRACE("can not be destroy", K(inner_stmt_id));
} else {
if (OB_FAIL(stmt_info_map_.erase_refactored(inner_stmt_id, &stmt_info))) {
LOG_WARN("failed to erase ps stmt info from stmt_info_map_", K(ret), K(inner_stmt_id));
} else if (OB_ISNULL(stmt_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ps stmt info is NULL", K(ret), K(inner_stmt_id));
} else if (OB_SUCCESS != (tmp_ret = erase_stmt_item(inner_stmt_id, stmt_info->get_sql_key()))){
LOG_WARN("failed to erase stmt item", K(tmp_ret), K(inner_stmt_id));
}
if (OB_SUCC(ret)) {
ObIAllocator *info_alloc = nullptr;
if (OB_ISNULL(info_alloc = stmt_info->get_external_allocator())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("allocator of ps stmt info is NULL", K(ret), K(inner_stmt_id));
} else {
info_alloc->free(stmt_info);
}
}
}
return ret;
}
void ObPsCache::dump_ps_cache()
{

View File

@ -94,7 +94,7 @@ public:
int deref_all_ps_stmt(const ObIArray<ObPsStmtId> &ps_stmt_ids);
int inner_ref_stmt_item(const ObPsSqlKey &ps_sql_key, ObPsStmtItem *&ps_stmt_item);
int deref_stmt_item(const ObPsSqlKey &ps_sql_key);
int deref_ps_stmt(const ObPsStmtId stmt_id, bool erase_item = false);
int deref_ps_stmt(const ObPsStmtId stmt_id);
int get_or_add_stmt_item(const ObPsSqlKey &ps_key,
const bool is_contain_tmp_tbl,
ObPsStmtItem *&ps_item_value);
@ -129,6 +129,7 @@ public:
int erase_stmt_item(ObPsStmtId stmt_id, const ObPsSqlKey &ps_key);
private:
int destroy_cached_ps(const ObPsStmtId inner_stmt_id);
int inner_cache_evict(bool is_evict_all);
int fill_ps_stmt_info(const ObResultSet &result,
int64_t param_cnt,

View File

@ -55,7 +55,7 @@ void ObPsStmtItemDerefAtomicOp::operator()(const PsStmtIdKV &entry)
ret_ = OB_HASH_NOT_EXIST;
LOG_WARN_RET(ret_, "entry not exist", K_(ret));
} else {
entry.second->dec_ref_count_check_erase();
entry.second->dec_ref_count();
}
}
@ -106,10 +106,17 @@ void ObPsStmtInfoDerefAtomicOp::operator()(const PsStmtInfoKV &entry)
ret_ = OB_HASH_NOT_EXIST;
LOG_WARN_RET(ret_, "entry not exist", K_(ret));
} else {
is_erase_ = entry.second->dec_ref_count_check_erase();
if (is_erase_) {
LOG_INFO("erase stmt info", "stmt_info", *entry.second, "stmt_id", entry.first);
}
entry.second->dec_ref_count();
}
}
void ObPsStmtInfoDestroyAtomicOp::operator()(const PsStmtInfoKV &entry)
{
if (OB_ISNULL(entry.second)) {
ret_ = OB_ERR_UNEXPECTED;
LOG_WARN_RET(ret_, "ps stmt info is NULL", K_(ret));
} else {
marked_erase_ = entry.second->try_erase();
}
}

View File

@ -195,17 +195,30 @@ class ObPsStmtInfoDerefAtomicOp
{
typedef common::hash::HashMapPair<ObPsStmtId, ObPsStmtInfo *> PsStmtInfoKV;
public:
ObPsStmtInfoDerefAtomicOp() : ret_(common::OB_SUCCESS), is_erase_(false) {}
ObPsStmtInfoDerefAtomicOp() : ret_(common::OB_SUCCESS) {}
virtual ~ObPsStmtInfoDerefAtomicOp() {}
void operator()(const PsStmtInfoKV &entry);
int get_ret() const { return ret_; }
inline bool is_erase() const { return is_erase_; }
private:
int ret_;
bool is_erase_;
DISALLOW_COPY_AND_ASSIGN(ObPsStmtInfoDerefAtomicOp);
};
class ObPsStmtInfoDestroyAtomicOp
{
typedef common::hash::HashMapPair<ObPsStmtId, ObPsStmtInfo *> PsStmtInfoKV;
public:
ObPsStmtInfoDestroyAtomicOp() : ret_(common::OB_SUCCESS), marked_erase_(false) {}
virtual ~ObPsStmtInfoDestroyAtomicOp() {}
void operator()(const PsStmtInfoKV &entry);
int get_ret() const { return ret_; }
bool marked_erase() const { return marked_erase_; }
private:
int ret_;
bool marked_erase_;
DISALLOW_COPY_AND_ASSIGN(ObPsStmtInfoDestroyAtomicOp);
};
class ObPsPCVSetAtomicOp
{