[CP] [CP] Fix returning rowid error when different versions of the client are run together

This commit is contained in:
obdev 2023-11-13 13:09:17 +00:00 committed by ob-robot
parent 47645971e3
commit 0e2f1802c0
6 changed files with 75 additions and 43 deletions

View File

@ -239,8 +239,6 @@ int ObAllVirtualPsItemInfo::get_next_row_from_specified_tenant(uint64_t tenant_i
} else if (OB_ISNULL(stmt_item)) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "stmt_item is NULL", K(ret));
} else {
// done
}
SERVER_LOG(DEBUG, "all setup", K(ret), K(tmp_ret), KP(stmt_info), KP(stmt_item));

View File

@ -919,9 +919,12 @@ int ObSql::do_add_ps_cache(const PsCacheInfoCtx &info_ctx,
ObPsStmtItem *ps_stmt_item = NULL;
ObPsStmtInfo *ref_stmt_info = NULL;
bool duplicate_prepare = false;
ObPsSqlKey ps_key;
ps_key.db_id_ = db_id;
ps_key.ps_sql_ = info_ctx.normalized_sql_;
ps_key.is_client_return_hidden_rowid_ = session.is_client_return_rowid();
// add stmt item
if (OB_FAIL(ps_cache->get_or_add_stmt_item(db_id,
info_ctx.normalized_sql_,
if (OB_FAIL(ps_cache->get_or_add_stmt_item(ps_key,
is_contain_tmp_tbl,
ps_stmt_item))) {
LOG_WARN("get or create stmt item faield", K(ret), K(db_id), K(info_ctx.normalized_sql_));
@ -1638,6 +1641,10 @@ int ObSql::handle_ps_prepare(const ObString &stmt,
bool need_do_real_prepare = false;
uint64_t db_id = OB_INVALID_ID;
(void)session.get_database_id(db_id);
ObPsSqlKey ps_key;
ps_key.db_id_ = db_id;
ps_key.ps_sql_ = stmt;
ps_key.is_client_return_hidden_rowid_ = session.is_client_return_rowid();
ObPsStmtId inner_stmt_id = OB_INVALID_STMT_ID;
ObPsStmtId client_stmt_id = OB_INVALID_STMT_ID;
ObPsStmtInfo *stmt_info = NULL;
@ -1665,7 +1672,7 @@ int ObSql::handle_ps_prepare(const ObString &stmt,
K(db_id), K(stmt), K(need_do_real_prepare), K(context.secondary_namespace_),
K(result.is_simple_ps_protocol()));
}
} else if (OB_FAIL(ps_cache->ref_stmt_item(db_id, stmt, stmt_item))) {
} else if (OB_FAIL(ps_cache->ref_stmt_item(ps_key, stmt_item))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS;
need_do_real_prepare = true;
@ -1700,7 +1707,7 @@ int ObSql::handle_ps_prepare(const ObString &stmt,
LOG_WARN("fail to check schema version", K(ret));
} else if (is_expired) {
stmt_info->set_is_expired();
if (OB_FAIL(ps_cache->erase_stmt_item(inner_stmt_id, stmt_info->get_sql_key()))) {
if (OB_FAIL(ps_cache->erase_stmt_item(inner_stmt_id, ps_key))) {
LOG_WARN("fail to erase stmt item", K(ret), K(*stmt_info));
}
need_do_real_prepare = true;

View File

@ -27,6 +27,7 @@ namespace sql
int ObPsSqlKey::deep_copy(const ObPsSqlKey &other, common::ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
flag_ = other.flag_;
db_id_ = other.db_id_;
inc_id_ = other.inc_id_;
if (OB_FAIL(ObPsSqlUtils::deep_copy_str(allocator, other.ps_sql_, ps_sql_))) {
@ -38,6 +39,7 @@ int ObPsSqlKey::deep_copy(const ObPsSqlKey &other, common::ObIAllocator &allocat
ObPsSqlKey &ObPsSqlKey::operator=(const ObPsSqlKey &other)
{
if (this != &other) {
flag_ = other.flag_;
db_id_ = other.db_id_;
inc_id_ = other.inc_id_;
ps_sql_ = other.ps_sql_;
@ -47,7 +49,8 @@ ObPsSqlKey &ObPsSqlKey::operator=(const ObPsSqlKey &other)
bool ObPsSqlKey::operator==(const ObPsSqlKey &other) const
{
return db_id_ == other.db_id_ &&
return flag_ == other.flag_ &&
db_id_ == other.db_id_ &&
inc_id_ == other.inc_id_ &&
ps_sql_.compare(other.ps_sql_) == 0;
}
@ -55,6 +58,7 @@ bool ObPsSqlKey::operator==(const ObPsSqlKey &other) const
int64_t ObPsSqlKey::hash() const
{
uint64_t hash_val = 0;
hash_val = murmurhash(&flag_, sizeof(uint32_t), hash_val);
hash_val = murmurhash(&db_id_, sizeof(uint64_t), hash_val);
hash_val = murmurhash(&inc_id_, sizeof(uint64_t), hash_val);
ps_sql_.hash(hash_val, hash_val);

View File

@ -30,20 +30,24 @@ struct ObPsSqlKey
{
public:
ObPsSqlKey()
: db_id_(OB_INVALID_ID),
: flag_(0),
db_id_(OB_INVALID_ID),
inc_id_(OB_INVALID_ID),
ps_sql_()
{}
ObPsSqlKey(uint64_t db_id,
const common::ObString &ps_sql)
: db_id_(db_id),
: flag_(0),
db_id_(db_id),
inc_id_(OB_INVALID_ID),
ps_sql_(ps_sql)
{}
ObPsSqlKey(uint64_t db_id,
ObPsSqlKey(uint32_t flag,
uint64_t db_id,
uint64_t inc_id,
const common::ObString &ps_sql)
: db_id_(db_id),
: flag_(flag),
db_id_(db_id),
inc_id_(inc_id),
ps_sql_(ps_sql)
{}
@ -52,15 +56,40 @@ public:
int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; }
ObPsSqlKey &operator=(const ObPsSqlKey &other);
bool operator==(const ObPsSqlKey &other) const;
void set_is_client_return_rowid()
{
is_client_return_hidden_rowid_ = true;
}
bool get_is_client_return_rowid()
{
return is_client_return_hidden_rowid_;
}
void set_flag(uint32_t flag)
{
flag_ = flag;
}
uint32_t get_flag() const
{
return flag_;
}
void reset()
{
flag_ = 0;
db_id_ = OB_INVALID_ID;
inc_id_ = OB_INVALID_ID;
ps_sql_.reset();
}
TO_STRING_KV(K_(db_id), K_(inc_id), K_(ps_sql));
TO_STRING_KV(K_(flag), K_(db_id), K_(inc_id), K_(ps_sql));
public:
union
{
uint32_t flag_;
struct {
uint32_t is_client_return_hidden_rowid_ : 1;
uint32_t reserved_ : 31;
};
};
uint64_t db_id_;
// MySQL allows session-level temporary tables with the same name to have different schema definitions.
// In order to distinguish this scenario, an incremental id is used to generate different prepared

View File

@ -233,17 +233,16 @@ int ObPsCache::get_stmt_info_guard(const ObPsStmtId ps_stmt_id,
//2.set stmt_id_map_返回OB_HASH_EXIST时,尝试从stmt_id_map_中获取stmt_item
// 1)获取成功,返回stmt_item
// 2) 报OB_HASH_NOT_EXIST, 则递归调get_or_add_stmt_item,尝试重新创建
int ObPsCache::get_or_add_stmt_item(uint64_t db_id,
const ObString &ps_sql,
int ObPsCache::get_or_add_stmt_item(const ObPsSqlKey &ps_key,
const bool is_contain_tmp_tbl,
ObPsStmtItem *&ps_item_value)
{
int ret = OB_SUCCESS;
ObPsStmtId new_stmt_id = gen_new_ps_stmt_id();
ObPsStmtItem tmp_item_value(new_stmt_id);
tmp_item_value.assign_sql_key(ObPsSqlKey(db_id,
is_contain_tmp_tbl ? new_stmt_id : OB_INVALID_ID,
ps_sql));
ObPsSqlKey tmp_ps_key = ps_key;
tmp_ps_key.inc_id_ = is_contain_tmp_tbl ? new_stmt_id : OB_INVALID_ID;
tmp_item_value.assign_sql_key(tmp_ps_key);
//will deep copy
ObPsStmtItem *new_item_value = NULL;
//由于stmt_id_map_中的value是ObPsStmtItem的指针,因此这里需要copy整个内存
@ -261,26 +260,26 @@ int ObPsCache::get_or_add_stmt_item(uint64_t db_id,
}
}
if (OB_SUCC(ret)) {
const ObPsSqlKey ps_sql_key = new_item_value->get_sql_key();
const ObPsSqlKey inner_ps_key = new_item_value->get_sql_key();
new_item_value->check_erase_inc_ref_count(); //inc ref count for ps cache, ignore ret;
ret = stmt_id_map_.set_refactored(ps_sql_key, new_item_value);
ret = stmt_id_map_.set_refactored(inner_ps_key, new_item_value);
if (OB_SUCC(ret)) {
//do nothing
LOG_INFO("add stmt item", K(ps_sql_key), K(*new_item_value));
LOG_INFO("add stmt item", K(inner_ps_key), K(*new_item_value));
ps_item_value = new_item_value;
} else if (OB_HASH_EXIST == ret) {
ret = OB_SUCCESS;
//may be other session has set
//inc ref count
ObPsStmtItem *tmp_item_value = NULL;
if (OB_FAIL(ref_stmt_item(ps_sql_key, tmp_item_value))) {
if (OB_FAIL(inner_ref_stmt_item(inner_ps_key, tmp_item_value))) {
LOG_WARN("get stmt item failed", K(ret));
if (OB_HASH_NOT_EXIST == ret) {//stmt item被删除,需要重新创建
if (OB_FAIL(get_or_add_stmt_item(db_id, ps_sql, is_contain_tmp_tbl, ps_item_value))) {
if (OB_FAIL(get_or_add_stmt_item(ps_key, is_contain_tmp_tbl, ps_item_value))) {
LOG_WARN("fail to get or add stmt item", K(ret));
}
} else {
LOG_WARN("unexpected error", K(ret), K(ps_sql_key));
LOG_WARN("unexpected error", K(ret), K(inner_ps_key));
}
} else {
if (OB_ISNULL(tmp_item_value)) {
@ -322,8 +321,8 @@ int ObPsCache::get_or_add_stmt_item(uint64_t db_id,
// 1) OB_SUCCESS: 成功获取ps_stmt_item
// 2) OB_HASH_NOT_EXIST: a.stmt_id_map_中本身就不存在当前key b.开始key对应的引用计数为0,重试若干次后变为该错误码
// 3) OB_EGAIN: 尝试了MAX_RETRY_CNT,ps_stmt_item的ref_count仍旧为0
int ObPsCache::ref_stmt_item(const ObPsSqlKey &ps_sql_key,
ObPsStmtItem *&ps_stmt_item)
int ObPsCache::inner_ref_stmt_item(const ObPsSqlKey &ps_sql_key,
ObPsStmtItem *&ps_stmt_item)
{
int ret = OB_SUCCESS;
int callback_ret = OB_SUCCESS;
@ -377,23 +376,18 @@ int ObPsCache::deref_stmt_item(const ObPsSqlKey &ps_sql_key)
return ret;
}
int ObPsCache::ref_stmt_item(const uint64_t db_id,
const ObString &ps_sql,
ObPsStmtItem *&stmt_item)
int ObPsCache::ref_stmt_item(const ObPsSqlKey &ps_sql_key, ObPsStmtItem *&stmt_item)
{
int ret = OB_SUCCESS;
stmt_item = NULL;
if (ps_sql.empty()) {
stmt_item = nullptr;
if (ps_sql_key.ps_sql_.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("empty sql", K(ret), K(ps_sql));
} else {
ObPsSqlKey ps_sql_key(db_id, ps_sql);
if (OB_FAIL(ref_stmt_item(ps_sql_key, stmt_item))) {
LOG_WARN_IGNORE_PS_NOTFOUND(ret, "ps item value not exist", K(ret), K(ps_sql_key));
} else if (OB_ISNULL(stmt_item)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get stmt id failed", K(ret));
}
LOG_WARN("empty sql", K(ret), K(ps_sql_key));
} else if (OB_FAIL(inner_ref_stmt_item(ps_sql_key, stmt_item))) {
LOG_WARN_IGNORE_PS_NOTFOUND(ret, "ps item value not exist", K(ret), K(ps_sql_key));
} else if (OB_ISNULL(stmt_item)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get stmt id failed", K(ret));
}
return ret;
}

View File

@ -88,15 +88,14 @@ public:
// always make sure stmt_id is inner_stmt_id!!!
int64_t get_tenant_id() const { return tenant_id_; }
int get_stmt_info_guard(const ObPsStmtId ps_stmt_id, ObPsStmtInfoGuard &guard);
int ref_stmt_item(const uint64_t db_id, const common::ObString &ps_sql, ObPsStmtItem *&stmt_item);
int ref_stmt_item(const ObPsSqlKey &ps_sql_key, ObPsStmtItem *&stmt_item);
int ref_stmt_info(const ObPsStmtId stmt_id, ObPsStmtInfo *&ps_stmt_info);
int deref_stmt_info(const ObPsStmtId stmt_id);
int deref_all_ps_stmt(const ObIArray<ObPsStmtId> &ps_stmt_ids);
int ref_stmt_item(const ObPsSqlKey &ps_sql_key, ObPsStmtItem *&ps_stmt_item);
int inner_ref_stmt_item(const ObPsSqlKey &ps_sql_key, ObPsStmtItem *&ps_stmt_item);
int deref_stmt_item(const ObPsSqlKey &ps_sql_key);
int deref_ps_stmt(const ObPsStmtId stmt_id, bool erase_item = false);
int get_or_add_stmt_item(const uint64_t db_id,
const common::ObString &ps_sql,
int get_or_add_stmt_item(const ObPsSqlKey &ps_key,
const bool is_contain_tmp_tbl,
ObPsStmtItem *&ps_item_value);
int get_or_add_stmt_info(const PsCacheInfoCtx &info_ctx,
@ -128,6 +127,7 @@ public:
ObPsStmtInfo &stmt_info,
bool &is_expired);
int erase_stmt_item(ObPsStmtId stmt_id, const ObPsSqlKey &ps_key);
private:
int inner_cache_evict(bool is_evict_all);
int fill_ps_stmt_info(const ObResultSet &result,