fix fuse row cache in sql audit
This commit is contained in:
parent
4d5d42850c
commit
02c4c0e219
@ -144,6 +144,7 @@ struct ObExecRecord
|
||||
UPDATE_EVENT(blockscan_block_cnt);
|
||||
UPDATE_EVENT(blockscan_row_cnt);
|
||||
UPDATE_EVENT(pushdown_storage_filter_row_cnt);
|
||||
UPDATE_EVENT(fuse_row_cache_hit);
|
||||
}
|
||||
|
||||
uint64_t get_cur_memstore_read_row_count(common::ObDiagnoseSessionInfo *di = NULL) {
|
||||
|
@ -72,7 +72,7 @@ int ObFuseRowCacheFetcher::get_fuse_row_cache(const ObDatumRowkey &rowkey, ObFus
|
||||
STORAGE_LOG(WARN, "fail to get row from multi version fuse row cache", K(ret), K(cache_key));
|
||||
}
|
||||
} else {
|
||||
// TODO(yht146439) add to sql_audit
|
||||
EVENT_INC(ObStatEventIds::FUSE_ROW_CACHE_HIT);
|
||||
EVENT_INC(ObStatEventIds::MULTI_VERSION_FUSE_ROW_CACHE_HIT);
|
||||
}
|
||||
STORAGE_LOG(DEBUG, "get from multi version fuse row cache", K(ret), K(cache_key));
|
||||
|
@ -375,6 +375,7 @@ int ObMultiVersionFuseRowCache::get_row(const ObMultiVersionFuseRowCacheKey &key
|
||||
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
|
||||
LOG_WARN("fail to get key from row cache", K(ret));
|
||||
}
|
||||
EVENT_INC(ObStatEventIds::FUSE_ROW_CACHE_MISS);
|
||||
EVENT_INC(ObStatEventIds::MULTI_VERSION_FUSE_ROW_CACHE_MISS);
|
||||
} else {
|
||||
if (OB_ISNULL(value)) {
|
||||
|
@ -311,6 +311,7 @@ int ObMviewCompactionHelper::create_inner_session(
|
||||
session->get_ddl_info().set_major_refreshing_mview(true);
|
||||
session->get_ddl_info().set_refreshing_mview(true);
|
||||
session->set_database_id(database_id);
|
||||
session->set_query_start_time(ObTimeUtil::current_time());
|
||||
LOG_INFO("[MVIEW COMPACTION]: Succ to create inner session", K(ret), K(tenant_id), K(database_id), KP(session));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -396,29 +397,32 @@ int ObMviewCompactionHelper::validate_row_count(const ObMergeParameter &merge_pa
|
||||
LOG_WARN("Failed to create inner session", K(ret), KPC(merge_param.mview_merge_param_));
|
||||
} else if (OB_FAIL(ObMviewCompactionHelper::create_inner_connection(session, conn))) {
|
||||
LOG_WARN("Failed to create inner connection", K(ret));
|
||||
} else if (OB_FAIL(conn->execute_read(GCONF.cluster_id, MTL_ID(), sql.ptr(), read_result))) {
|
||||
LOG_WARN("Failed to execute", K(ret), K(sql));
|
||||
} else if (OB_ISNULL(sql_result = static_cast<observer::ObInnerSQLResult *>(read_result.get_result()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected null sql result", K(ret), K(sql));
|
||||
} else if (OB_FAIL(sql_result->next())) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("Failed to get next row", K(ret));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
const ObNewRow *new_row = sql_result->get_row();
|
||||
if (OB_UNLIKELY(nullptr == new_row || 1 != new_row->get_count())) {
|
||||
ObSessionStatEstGuard sess_stat_guard(MTL_ID(), session->get_sessid());
|
||||
if (OB_FAIL(conn->execute_read(GCONF.cluster_id, MTL_ID(), sql.ptr(), read_result))) {
|
||||
LOG_WARN("Failed to execute", K(ret), K(sql));
|
||||
} else if (OB_ISNULL(sql_result = static_cast<observer::ObInnerSQLResult *>(read_result.get_result()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected result row", K(ret), KPC(new_row));
|
||||
} else if (merge_param.get_schema()->is_oracle_mode()) {
|
||||
const number::ObNumber nmb(new_row->get_cell(0).get_number());
|
||||
if (OB_FAIL(nmb.extract_valid_int64_with_trunc(join_row_count))) {
|
||||
STORAGE_LOG(WARN, "Failed to cast number to int64", K(ret), K(new_row->get_cell(0)));
|
||||
LOG_WARN("Unexpected null sql result", K(ret), K(sql));
|
||||
} else if (OB_FAIL(sql_result->next())) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("Failed to get next row", K(ret));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
join_row_count = new_row->get_cell(0).get_int();
|
||||
const ObNewRow *new_row = sql_result->get_row();
|
||||
if (OB_UNLIKELY(nullptr == new_row || 1 != new_row->get_count())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected result row", K(ret), KPC(new_row));
|
||||
} else if (merge_param.get_schema()->is_oracle_mode()) {
|
||||
const number::ObNumber nmb(new_row->get_cell(0).get_number());
|
||||
if (OB_FAIL(nmb.extract_valid_int64_with_trunc(join_row_count))) {
|
||||
STORAGE_LOG(WARN, "Failed to cast number to int64", K(ret), K(new_row->get_cell(0)));
|
||||
}
|
||||
} else {
|
||||
join_row_count = new_row->get_cell(0).get_int();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2160,12 +2160,18 @@ ObPartitionMVRowMergeIter::ObPartitionMVRowMergeIter(common::ObIAllocator &alloc
|
||||
free_session_ctx_(),
|
||||
session_(nullptr),
|
||||
conn_(nullptr),
|
||||
sql_result_(nullptr)
|
||||
sql_result_(nullptr),
|
||||
sess_stat_guard_(nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
ObPartitionMVRowMergeIter::~ObPartitionMVRowMergeIter()
|
||||
{
|
||||
if (nullptr != sess_stat_guard_) {
|
||||
sess_stat_guard_->~ObSessionStatEstGuard();
|
||||
allocator_.free(sess_stat_guard_);
|
||||
sess_stat_guard_ = nullptr;
|
||||
}
|
||||
read_result_.~ReadResult(); // need decons before session
|
||||
ObMviewCompactionHelper::release_inner_connection(conn_);
|
||||
ObMviewCompactionHelper::release_inner_session(free_session_ctx_, session_);
|
||||
@ -2241,6 +2247,9 @@ int ObPartitionMVRowMergeIter::inner_init(const ObMergeParameter &merge_param)
|
||||
LOG_WARN("Failed to create inner session", K(ret), KPC(merge_param.mview_merge_param_));
|
||||
} else if (OB_FAIL(ObMviewCompactionHelper::create_inner_connection(session_, conn_))) {
|
||||
LOG_WARN("Failed to create inner connection", K(ret), K_(sql_idx));
|
||||
} else if (OB_ISNULL(sess_stat_guard_ = OB_NEWx(ObSessionStatEstGuard, &allocator_, MTL_ID(), session_->get_sessid()))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("Failed to alloc session stat guard", K(ret), K_(sql_idx), K(sql));
|
||||
} else if (OB_FAIL(conn_->execute_read(GCONF.cluster_id, MTL_ID(), sql.ptr(), read_result_))) {
|
||||
LOG_WARN("Failed to execute", K(ret), K_(sql_idx), K(sql));
|
||||
} else if (OB_ISNULL(sql_result_ = static_cast<observer::ObInnerSQLResult *>(read_result_.get_result()))) {
|
||||
|
@ -417,7 +417,7 @@ public:
|
||||
const ObITableReadInfo *read_info) override;
|
||||
virtual int next() override;
|
||||
TO_STRING_KV(K_(is_delete), K_(is_replace), K_(sql_idx), K_(sql_read_col_cnt), K_(store_col_cnt),
|
||||
K_(free_session_ctx), KP_(session), KP_(conn), KP_(sql_result));
|
||||
K_(free_session_ctx), KP_(session), KP_(conn), KP_(sql_result), KP_(sess_stat_guard));
|
||||
protected:
|
||||
virtual int inner_init(const ObMergeParameter &merge_param) override;
|
||||
virtual bool inner_check(const ObMergeParameter &merge_param) override;
|
||||
@ -433,6 +433,7 @@ private:
|
||||
sql::ObSQLSessionInfo *session_;
|
||||
sqlclient::ObISQLConnection *conn_;
|
||||
observer::ObInnerSQLResult *sql_result_;
|
||||
ObSessionStatEstGuard *sess_stat_guard_;
|
||||
};
|
||||
|
||||
static const int64_t DEFAULT_ITER_COUNT = 16;
|
||||
|
Loading…
x
Reference in New Issue
Block a user