Fix the correctness of ps prepare contain temporary table

This commit is contained in:
obdev
2023-07-11 12:42:39 +00:00
committed by ob-robot
parent 9e6a7c4f6d
commit 2b30190d8e
7 changed files with 106 additions and 95 deletions

View File

@ -125,7 +125,7 @@ int ObAllVirtualPsItemInfo::fill_cells(uint64_t tenant_id,
break;
}
case share::ALL_VIRTUAL_PS_ITEM_INFO_CDE::DB_ID: {
cells[i].set_int(stmt_info->get_db_id());
cells[i].set_int(stmt_info->get_sql_key().db_id_);
break;
}
case share::ALL_VIRTUAL_PS_ITEM_INFO_CDE::PS_SQL: {
@ -228,8 +228,7 @@ int ObAllVirtualPsItemInfo::get_next_row_from_specified_tenant(uint64_t tenant_i
} else if (OB_ISNULL(stmt_info)) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "stmt_info is NULL", K(ret));
} else if (FALSE_IT(tmp_ret = ps_cache_->ref_stmt_item(stmt_info->get_db_id(),
stmt_info->get_ps_sql(),
} else if (FALSE_IT(tmp_ret = ps_cache_->ref_stmt_item(stmt_info->get_sql_key(),
stmt_item))) {
} else if (OB_HASH_NOT_EXIST == tmp_ret) {
SERVER_LOG(DEBUG, "cannot get stmt_item, may be deleted",

View File

@ -883,6 +883,7 @@ int ObSql::do_add_ps_cache(const PsCacheInfoCtx &info_ctx,
ObResultSet &result)
{
int ret = OB_SUCCESS;
bool is_contain_tmp_tbl = false;
ObSQLSessionInfo &session = result.get_session();
ObPsCache *ps_cache = session.get_ps_cache();
uint64_t db_id = OB_INVALID_ID;
@ -890,6 +891,9 @@ int ObSql::do_add_ps_cache(const PsCacheInfoCtx &info_ctx,
if (OB_ISNULL(ps_cache)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ps plan cache should not be null", K(ret));
} else if (lib::is_mysql_mode() &&
OB_FAIL(check_contain_temporary_table(schema_guard, result, is_contain_tmp_tbl))) {
LOG_WARN("failed to check contain temporary table", K(ret));
} else {
ObPsStmtItem *ps_stmt_item = NULL;
ObPsStmtInfo *ref_stmt_info = NULL;
@ -897,6 +901,7 @@ int ObSql::do_add_ps_cache(const PsCacheInfoCtx &info_ctx,
// add stmt item
if (OB_FAIL(ps_cache->get_or_add_stmt_item(db_id,
info_ctx.normalized_sql_,
is_contain_tmp_tbl,
ps_stmt_item))) {
LOG_WARN("get or create stmt item faield", K(ret), K(db_id), K(info_ctx.normalized_sql_));
} else if (OB_FAIL(ps_cache->get_or_add_stmt_info(info_ctx,
@ -942,6 +947,33 @@ int ObSql::do_add_ps_cache(const PsCacheInfoCtx &info_ctx,
return ret;
}
int ObSql::check_contain_temporary_table(share::schema::ObSchemaGetterGuard &schema_guard,
ObResultSet &result,
bool &is_contain_tmp_tbl)
{
int ret = OB_SUCCESS;
is_contain_tmp_tbl = false;
const ObTableSchema *table_schema = nullptr;
for (int64_t i = 0; OB_SUCC(ret) && !is_contain_tmp_tbl && i < result.get_ref_objects().count(); i++) {
table_schema = nullptr;
ObSchemaObjVersion &obj_version = result.get_ref_objects().at(i);
if (DEPENDENCY_TABLE != obj_version.object_type_) {
// do nothing
} else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(),
obj_version.object_id_,
table_schema))) {
LOG_WARN("failed to get table schema", K(ret), K(obj_version), K(table_schema));
} else if (nullptr == table_schema) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get an unexpected null schema", K(ret), K(table_schema));
} else if (table_schema->is_tmp_table()) {
is_contain_tmp_tbl = true;
break;
}
}
return ret;
}
int ObSql::do_real_prepare(const ObString &sql,
ObSqlCtx &context,
ObResultSet &result,
@ -1629,11 +1661,8 @@ int ObSql::handle_ps_prepare(const ObString &stmt,
is_expired))) {
LOG_WARN("fail to check schema version", K(ret));
} else if (is_expired) {
ObPsSqlKey ps_sql_key;
stmt_info->set_is_expired();
ps_sql_key.set_db_id(stmt_info->get_db_id());
ps_sql_key.set_ps_sql(stmt_info->get_ps_sql());
if (OB_FAIL(ps_cache->erase_stmt_item(inner_stmt_id, ps_sql_key))) {
if (OB_FAIL(ps_cache->erase_stmt_item(inner_stmt_id, stmt_info->get_sql_key()))) {
LOG_WARN("fail to erase stmt item", K(ret), K(*stmt_info));
}
need_do_real_prepare = true;

View File

@ -269,6 +269,9 @@ private:
int do_add_ps_cache(const PsCacheInfoCtx &info_ctx,
share::schema::ObSchemaGetterGuard &schema_guard,
ObResultSet &result);
int check_contain_temporary_table(share::schema::ObSchemaGetterGuard &schema_guard,
ObResultSet &result,
bool &is_contain_tmp_tbl);
int fill_result_set(ObResultSet &result, ObSqlCtx *context, const PlanCacheMode mode, ObStmt &stmt);
int fill_select_result_set(ObResultSet &result_set, ObSqlCtx *context, const PlanCacheMode mode,
ObCollationType collation_type, const ObString &type_name,

View File

@ -24,17 +24,13 @@ namespace oceanbase
using namespace common;
namespace sql
{
int ObPsSqlKey::deep_copy(const ObPsSqlKey &other)
int ObPsSqlKey::deep_copy(const ObPsSqlKey &other, common::ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(allocator_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("allocator is invalid", K(ret));
} else {
db_id_ = other.db_id_;
if (OB_FAIL(ObPsSqlUtils::deep_copy_str(*allocator_, other.get_ps_sql(), ps_sql_))) {
LOG_WARN("deep copy str failed", K(other), K(ret));
}
db_id_ = other.db_id_;
inc_id_ = other.inc_id_;
if (OB_FAIL(ObPsSqlUtils::deep_copy_str(allocator, other.ps_sql_, ps_sql_))) {
LOG_WARN("deep copy str failed", K(other), K(ret));
}
return ret;
}
@ -43,6 +39,7 @@ ObPsSqlKey &ObPsSqlKey::operator=(const ObPsSqlKey &other)
{
if (this != &other) {
db_id_ = other.db_id_;
inc_id_ = other.inc_id_;
ps_sql_ = other.ps_sql_;
}
return *this;
@ -51,21 +48,21 @@ ObPsSqlKey &ObPsSqlKey::operator=(const ObPsSqlKey &other)
bool ObPsSqlKey::operator==(const ObPsSqlKey &other) const
{
return db_id_ == other.db_id_ &&
ps_sql_.compare(other.get_ps_sql()) == 0;
inc_id_ == other.inc_id_ &&
ps_sql_.compare(other.ps_sql_) == 0;
}
int64_t ObPsSqlKey::hash() const
{
uint64_t hash_val = 0;
hash_val = murmurhash(&db_id_, sizeof(uint64_t), hash_val);
hash_val = murmurhash(&inc_id_, sizeof(uint64_t), hash_val);
ps_sql_.hash(hash_val, hash_val);
return hash_val;
}
ObPsStmtItem::ObPsStmtItem()
: ref_count_(1),
db_id_(OB_INVALID_ID),
ps_sql_(),
stmt_id_(OB_INVALID_STMT_ID),
is_expired_evicted_(false),
allocator_(NULL),
@ -75,8 +72,6 @@ ObPsStmtItem::ObPsStmtItem()
ObPsStmtItem::ObPsStmtItem(const ObPsStmtId ps_stmt_id)
: ref_count_(1),
db_id_(OB_INVALID_ID),
ps_sql_(),
stmt_id_(ps_stmt_id),
is_expired_evicted_(false),
allocator_(NULL),
@ -87,8 +82,6 @@ ObPsStmtItem::ObPsStmtItem(const ObPsStmtId ps_stmt_id)
ObPsStmtItem::ObPsStmtItem(ObIAllocator *inner_allocator,
ObIAllocator *external_allocator)
: ref_count_(1),
db_id_(OB_INVALID_ID),
ps_sql_(),
stmt_id_(OB_INVALID_STMT_ID),
is_expired_evicted_(false),
allocator_(inner_allocator),
@ -104,10 +97,8 @@ int ObPsStmtItem::deep_copy(const ObPsStmtItem &other)
LOG_WARN("allocator is invalid", K(ret));
} else {
stmt_id_ = other.stmt_id_;
db_id_ = other.db_id_;
//not copy refcount
if (OB_FAIL(ObPsSqlUtils::deep_copy_str(*allocator_, other.get_ps_sql(), ps_sql_))) {
LOG_WARN("deep copy str failed", K(other), K(ret));
if (OB_FAIL(ps_key_.deep_copy(other.get_sql_key(), *allocator_))) {
LOG_WARN("deep copy ps key failed", K(other), K(ret));
}
}
return ret;
@ -117,8 +108,7 @@ ObPsStmtItem &ObPsStmtItem::operator=(const ObPsStmtItem &other)
{
if (this != &other) {
ref_count_ = other.ref_count_;
db_id_ = other.get_db_id();
ps_sql_ = other.get_ps_sql();
ps_key_ = other.get_sql_key();
stmt_id_ = other.get_ps_stmt_id();
is_expired_evicted_ = other.is_expired_evicted_;
}
@ -128,7 +118,7 @@ ObPsStmtItem &ObPsStmtItem::operator=(const ObPsStmtItem &other)
bool ObPsStmtItem::is_valid() const
{
bool bret = false;
if (!ps_sql_.empty()
if (!ps_key_.ps_sql_.empty()
&& OB_INVALID_STMT_ID != stmt_id_) {
bret = true;
}
@ -138,7 +128,7 @@ bool ObPsStmtItem::is_valid() const
int ObPsStmtItem::get_convert_size(int64_t &cv_size) const
{
cv_size = sizeof(ObPsStmtItem);
cv_size += ps_sql_.length() + 1;
cv_size += ps_key_.ps_sql_.length() + 1;
return OB_SUCCESS;
}
@ -273,8 +263,6 @@ int ObPsSqlMeta::get_convert_size(int64_t &cv_size) const
ObPsStmtInfo::ObPsStmtInfo(ObIAllocator *inner_allocator)
: stmt_type_(stmt::T_NONE),
ps_stmt_checksum_(0),
db_id_(OB_INVALID_ID),
ps_sql_(),
ps_sql_meta_(inner_allocator),
ref_count_(1),
question_mark_count_(0),
@ -303,8 +291,6 @@ ObPsStmtInfo::ObPsStmtInfo(ObIAllocator *inner_allocator,
ObIAllocator *external_allocator)
: stmt_type_(stmt::T_NONE),
ps_stmt_checksum_(0),
db_id_(OB_INVALID_ID),
ps_sql_(),
ps_sql_meta_(inner_allocator),
ref_count_(1),
question_mark_count_(0),
@ -330,7 +316,7 @@ ObPsStmtInfo::ObPsStmtInfo(ObIAllocator *inner_allocator,
bool ObPsStmtInfo::is_valid() const
{
return !ps_sql_.empty();
return !ps_key_.ps_sql_.empty();
}
int ObPsStmtInfo::assign_no_param_sql(const common::ObString &no_param_sql)
@ -460,7 +446,6 @@ int ObPsStmtInfo::deep_copy(const ObPsStmtInfo &other)
} else {
stmt_type_ = other.stmt_type_;
ps_stmt_checksum_ = other.ps_stmt_checksum_;
db_id_ = other.db_id_;
question_mark_count_ = other.question_mark_count_;
num_of_returning_into_ = other.num_of_returning_into_;
is_sensitive_sql_ = other.is_sensitive_sql_;
@ -482,8 +467,8 @@ int ObPsStmtInfo::deep_copy(const ObPsStmtInfo &other)
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(ObPsSqlUtils::deep_copy_str(*allocator_, other.get_ps_sql(), ps_sql_))) {
LOG_WARN("deep copy str failed", K(other), K(ret));
} else if (OB_FAIL(ps_key_.deep_copy(other.get_sql_key(), *allocator_))) {
LOG_WARN("failed to deep copy ps key", K(ret), K(other));
} else if (OB_FAIL(ObPsSqlUtils::deep_copy_str(*allocator_, other.get_no_param_sql(),
no_param_sql_))) {
LOG_WARN("deep copy str failed", K(other), K(ret));
@ -522,7 +507,7 @@ int ObPsStmtInfo::get_convert_size(int64_t &cv_size) const
int ret = OB_SUCCESS;
cv_size = 0;
int64_t convert_size = sizeof(ObPsStmtInfo);
convert_size += ps_sql_.length() + 1;
convert_size += ps_key_.ps_sql_.length() + 1;
convert_size += no_param_sql_.length() + 1;
convert_size += raw_sql_.length() + 1;
int64_t meta_convert_size = 0;
@ -638,8 +623,7 @@ int64_t ObPsStmtInfo::to_string(char *buf, const int64_t buf_len) const
int64_t pos = 0;
int ret = OB_SUCCESS;
J_OBJ_START();
J_KV(K_(db_id),
K_(ps_sql),
J_KV(K_(ps_key),
K_(ref_count),
K_(can_direct_use_param),
K_(question_mark_count),

View File

@ -22,40 +22,48 @@ namespace sql
{
class ObCallProcedureStmt;
//ps stmt key
class ObPsSqlKey
// prepared statement stmt key
struct ObPsSqlKey
{
public:
ObPsSqlKey() : db_id_(OB_INVALID_ID), ps_sql_(), allocator_(NULL){}
ObPsSqlKey()
: db_id_(OB_INVALID_ID),
inc_id_(OB_INVALID_ID),
ps_sql_()
{}
ObPsSqlKey(uint64_t db_id,
const common::ObString &ps_sql)
: db_id_(db_id), ps_sql_(ps_sql), allocator_(NULL)
: db_id_(db_id),
inc_id_(OB_INVALID_ID),
ps_sql_(ps_sql)
{}
//for deep copy
explicit ObPsSqlKey(common::ObIAllocator *allocator)
: db_id_(), ps_sql_(), allocator_(allocator) {}
int deep_copy(const ObPsSqlKey &other);
ObPsSqlKey(uint64_t db_id,
uint64_t inc_id,
const common::ObString &ps_sql)
: db_id_(db_id),
inc_id_(inc_id),
ps_sql_(ps_sql)
{}
int deep_copy(const ObPsSqlKey &other, common::ObIAllocator &allocator);
int64_t hash() const;
int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; }
ObPsSqlKey &operator=(const ObPsSqlKey &other);
bool operator==(const ObPsSqlKey &other) const;
//need to reset allocator?
void reset() { db_id_ = OB_INVALID_ID; ps_sql_.reset();}
void reset()
{
db_id_ = OB_INVALID_ID;
inc_id_ = OB_INVALID_ID;
ps_sql_.reset();
}
TO_STRING_KV(K_(db_id), K_(inc_id), K_(ps_sql));
int get_convert_size(int64_t &cv_size) const;
uint64_t get_db_id() const { return db_id_; }
const common::ObString &get_ps_sql() const { return ps_sql_; }
void set_db_id(uint64_t db_id) { db_id_ = db_id; } //not deep copy
void set_ps_sql(const common::ObString &ps_sql) { ps_sql_ = ps_sql; }//not deep copy
TO_STRING_KV(K_(db_id), K_(ps_sql));
private:
uint64_t db_id_;//database id中存在tenant id,因此将db_id作为key的一个部分,可以区分租户
public:
uint64_t db_id_;
// MySQL allows session-level temporary tables with the same name to have different schema definitions.
// In order to distinguish this scenario, an incremental id is used to generate different prepared
// statements each time.
uint64_t inc_id_;
common::ObString ps_sql_;
common::ObIAllocator *allocator_;
};
//ps stmt item
@ -79,29 +87,20 @@ public:
int64_t get_ref_count() const { return ATOMIC_LOAD(&ref_count_); }
int get_convert_size(int64_t &cv_size) const;
uint64_t get_db_id() const { return db_id_; }
const common::ObString &get_ps_sql() const { return ps_sql_; }
void get_sql_key(ObPsSqlKey &ps_sql_key)
{
ps_sql_key.set_db_id(db_id_);
ps_sql_key.set_ps_sql(ps_sql_);
}
const ObPsSqlKey& get_sql_key() const { return ps_key_; }
void assign_sql_key(const ObPsSqlKey &ps_sql_key)
{
db_id_ = ps_sql_key.get_db_id();
ps_sql_ = ps_sql_key.get_ps_sql();
ps_key_ = ps_sql_key;
}
void set_ps_sql(const common::ObString &ps_sql) { ps_sql_ = ps_sql; }
bool *get_is_expired_evicted_ptr() { return &is_expired_evicted_; }
ObIAllocator *get_external_allocator() { return external_allocator_; }
TO_STRING_KV(K_(ref_count), K_(db_id), K_(ps_sql), K_(stmt_id), K_(is_expired_evicted));
TO_STRING_KV(K_(ref_count), K_(ps_key), K_(stmt_id), K_(is_expired_evicted));
private:
volatile int64_t ref_count_;
uint64_t db_id_;
common::ObString ps_sql_;
ObPsSqlKey ps_key_;
ObPsStmtId stmt_id_;
bool is_expired_evicted_;
//ObDataBuffer用于ObPsStmtItem内部内存的使用,内存实质上来自ObPsPlancache中的inner_allocator_
@ -151,8 +150,8 @@ public:
inline int64_t get_num_of_column() const { return ps_sql_meta_.get_column_size(); }
inline stmt::StmtType get_stmt_type() const { return stmt_type_; }
inline void set_stmt_type(stmt::StmtType stmt_type) { stmt_type_ = stmt_type; }
inline uint64_t get_db_id() const { return db_id_; }
inline const common::ObString &get_ps_sql() const { return ps_sql_; }
const ObPsSqlKey& get_sql_key() const { return ps_key_; }
inline const common::ObString &get_ps_sql() const { return ps_key_.ps_sql_; }
inline const common::ObString &get_no_param_sql() const { return no_param_sql_; }
inline const common::ObIArray<int64_t> &get_raw_params_idx() const
{ return raw_params_idx_; }
@ -195,8 +194,7 @@ public:
void assign_sql_key(const ObPsStmtItem &ps_stmt_item)
{
db_id_ = ps_stmt_item.get_db_id();
ps_sql_ = ps_stmt_item.get_ps_sql();
ps_key_ = ps_stmt_item.get_sql_key();
}
ObIAllocator *get_external_allocator() { return external_allocator_; }
void set_inner_allocator(common::ObIAllocator *allocator)
@ -225,8 +223,7 @@ public:
private:
stmt::StmtType stmt_type_;
uint64_t ps_stmt_checksum_;
uint64_t db_id_;
common::ObString ps_sql_;
ObPsSqlKey ps_key_;
ObPsSqlMeta ps_sql_meta_;
volatile int64_t ref_count_;
// simple prepare protocol协议不会填充ps_sql_meta, 这里记录下question mark cnt, 用于execute时对入参个数进行检查

View File

@ -147,9 +147,7 @@ int ObPsCache::deref_ps_stmt(const ObPsStmtId stmt_id, bool erase_item/*=false*/
LOG_WARN("get stmt info guard failed", K(ret), K(stmt_id));
} else {
ObPsStmtInfo *ps_info = guard.get_stmt_info();
ObPsSqlKey ps_sql_key;
ps_sql_key.set_db_id(ps_info->get_db_id());
ps_sql_key.set_ps_sql(ps_info->get_ps_sql());
const ObPsSqlKey ps_sql_key = ps_info->get_sql_key();
int tmp_ret = OB_SUCCESS;
if (erase_item) { // dec cached ref
if (OB_FAIL(erase_stmt_item(stmt_id, ps_sql_key))) {
@ -237,12 +235,15 @@ int ObPsCache::get_stmt_info_guard(const ObPsStmtId ps_stmt_id,
// 2) 报OB_HASH_NOT_EXIST, 则递归调get_or_add_stmt_item,尝试重新创建
int ObPsCache::get_or_add_stmt_item(uint64_t db_id,
const ObString &ps_sql,
const bool is_contain_tmp_tbl,
ObPsStmtItem *&ps_item_value)
{
int ret = OB_SUCCESS;
ObPsStmtId new_stmt_id = gen_new_ps_stmt_id();
ObPsStmtItem tmp_item_value(new_stmt_id);
tmp_item_value.assign_sql_key(ObPsSqlKey(db_id, ps_sql));
tmp_item_value.assign_sql_key(ObPsSqlKey(db_id,
is_contain_tmp_tbl ? new_stmt_id : OB_INVALID_ID,
ps_sql));
//will deep copy
ObPsStmtItem *new_item_value = NULL;
//由于stmt_id_map_中的value是ObPsStmtItem的指针,因此这里需要copy整个内存
@ -260,8 +261,7 @@ int ObPsCache::get_or_add_stmt_item(uint64_t db_id,
}
}
if (OB_SUCC(ret)) {
ObPsSqlKey ps_sql_key;
new_item_value->get_sql_key(ps_sql_key);
const ObPsSqlKey ps_sql_key = new_item_value->get_sql_key();
new_item_value->check_erase_inc_ref_count(); //inc ref count for ps cache, ignore ret;
ret = stmt_id_map_.set_refactored(ps_sql_key, new_item_value);
if (OB_SUCC(ret)) {
@ -276,7 +276,7 @@ int ObPsCache::get_or_add_stmt_item(uint64_t db_id,
if (OB_FAIL(ref_stmt_item(ps_sql_key, tmp_item_value))) {
LOG_WARN("get stmt item failed", K(ret));
if (OB_HASH_NOT_EXIST == ret) {//stmt item被删除,需要重新创建
if (OB_FAIL(get_or_add_stmt_item(db_id, ps_sql, ps_item_value))) {
if (OB_FAIL(get_or_add_stmt_item(db_id, ps_sql, is_contain_tmp_tbl, ps_item_value))) {
LOG_WARN("fail to get or add stmt item", K(ret));
}
} else {
@ -292,12 +292,10 @@ int ObPsCache::get_or_add_stmt_item(uint64_t db_id,
}
//no matter succ or not release
new_item_value->~ObPsStmtItem();
ps_sql_key.reset();
inner_allocator_->free(new_item_value);
} else {
LOG_WARN("unexpecte error", K(ret), K(new_stmt_id));
new_item_value->~ObPsStmtItem();
ps_sql_key.reset();
inner_allocator_->free(new_item_value);
}
}
@ -455,7 +453,7 @@ int ObPsCache::get_or_add_stmt_info(const PsCacheInfoCtx &info_ctx,
}
// may parallel execute by multi_thread
int ObPsCache::erase_stmt_item(ObPsStmtId stmt_id, ObPsSqlKey &ps_key)
int ObPsCache::erase_stmt_item(ObPsStmtId stmt_id, const ObPsSqlKey &ps_key)
{
int ret = OB_SUCCESS;
ObPsStmtItem *ps_item = NULL;

View File

@ -97,6 +97,7 @@ public:
int deref_ps_stmt(const ObPsStmtId stmt_id, bool erase_item = false);
int get_or_add_stmt_item(const uint64_t db_id,
const common::ObString &ps_sql,
const bool is_contain_tmp_tbl,
ObPsStmtItem *&ps_item_value);
int get_or_add_stmt_info(const PsCacheInfoCtx &info_ctx,
const ObResultSet &result,
@ -126,7 +127,7 @@ public:
int check_schema_version(ObSchemaGetterGuard &schema_guard,
ObPsStmtInfo &stmt_info,
bool &is_expired);
int erase_stmt_item(ObPsStmtId stmt_id, ObPsSqlKey &ps_key);
int erase_stmt_item(ObPsStmtId stmt_id, const ObPsSqlKey &ps_key);
private:
int inner_cache_evict(bool is_evict_all);
int fill_ps_stmt_info(const ObResultSet &result,