fix core when dump tx ctx table

This commit is contained in:
ZenoWang 2023-03-24 07:15:22 +00:00 committed by ob-robot
parent f9b0da4444
commit 7fd1b058cd
13 changed files with 495 additions and 120 deletions

View File

@ -29,6 +29,7 @@ ob_unittest_observer(test_observer_expand_shrink test_observer_expand_shrink.cpp
ob_unittest_observer(test_replay_from_middle test_replay_from_middle.cpp)
ob_unittest_observer(test_special_tablet_flush test_special_tablet_flush.cpp)
ob_unittest_observer(test_tx_data_table_mit test_tx_data_table_mit.cpp)
ob_unittest_observer(test_tx_ctx_table_mit test_tx_ctx_table_mit.cpp)
ob_unittest_observer(test_lock_table_persistence test_lock_table_persistence.cpp)
ob_unittest_observer(test_tx_recover test_tx_recovery.cpp)
ob_unittest_observer(test_tx_recover2 test_tx_recovery2.cpp)

View File

@ -0,0 +1,361 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#include <stdlib.h>
#define USING_LOG_PREFIX STORAGE
#define protected public
#define private public
#include "env/ob_simple_cluster_test_base.h"
#include "env/ob_simple_server_restart_helper.h"
#include "lib/mysqlclient/ob_mysql_result.h"
#include "storage/access/ob_rows_info.h"
#include "storage/checkpoint/ob_data_checkpoint.h"
#include "storage/compaction/ob_schedule_dag_func.h"
#include "storage/compaction/ob_tablet_merge_task.h"
#include "storage/ls/ob_freezer.h"
#include "storage/ls/ob_ls.h"
#include "storage/ls/ob_ls_meta.h"
#include "storage/ls/ob_ls_tablet_service.h"
#include "storage/ls/ob_ls_tx_service.h"
#include "storage/meta_mem/ob_tablet_handle.h"
#include "storage/meta_mem/ob_tenant_meta_mem_mgr.h"
#include "storage/ob_relative_table.h"
#include "storage/ob_storage_table_guard.h"
#include "storage/tx_storage/ob_ls_map.h"
#include "storage/tx_storage/ob_ls_service.h"
#include "storage/tx/ob_trans_part_ctx.h"
#undef private
#undef protected
static const char *TEST_FILE_NAME = "test_tx_ctx_table_mit";
static const char *BORN_CASE_NAME = "ObTxCtxTableTest";
static const char *RESTART_CASE_NAME = "ObTxCtxTableRestartTest";
bool SLEEP_BEFORE_DUMP_TX_CTX = false;
bool HAS_GOT_TX_CTX = false;
int64_t TX_CTX_TABLE_LAST_CHECKPOINT = 0;
namespace oceanbase
{
using namespace transaction;
using namespace storage;
using namespace palf;
using namespace share;
namespace storage
{
int ObTxCtxMemtableScanIterator::get_next_tx_ctx_table_info_(transaction::ObPartTransCtx *&tx_ctx,
ObTxCtxTableInfo &ctx_info)
{
int ret = OB_SUCCESS;
bool need_retry = true;
while (OB_SUCC(ret) && need_retry) {
if (OB_FAIL(ls_tx_ctx_iter_.get_next_tx_ctx(tx_ctx))) {
if (OB_ITER_END != ret) {
STORAGE_LOG(WARN, "ls_tx_ctx_iter_.get_next_tx_ctx failed", K(ret));
}
} else if (OB_FAIL(tx_ctx->get_tx_ctx_table_info(ctx_info))) {
if (OB_TRANS_CTX_NOT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
STORAGE_LOG(WARN, "tx_ctx->get_tx_ctx_table_info failed", K(ret));
}
ls_tx_ctx_iter_.revert_tx_ctx(tx_ctx);
} else {
need_retry = false;
}
}
if (OB_FAIL(ret)) {
STORAGE_LOG(INFO, "get next tx ctx table info failed", KR(ret), KPC(tx_ctx), K(ctx_info.tx_data_guard_));
} else if (SLEEP_BEFORE_DUMP_TX_CTX) {
fprintf(stdout, "ready to dump tx ctx, undo status node ptr : %p\n", ctx_info.tx_data_guard_.tx_data()->undo_status_list_.head_);
fprintf(stdout, "sleep 20 seconds before dump\n");
HAS_GOT_TX_CTX = true;
SLEEP_BEFORE_DUMP_TX_CTX = false;
::sleep(20);
}
return ret;
}
};
namespace unittest
{
class TestRunCtx
{
public:
uint64_t tenant_id_ = 0;
};
TestRunCtx RunCtx;
class ObTxCtxTableTest : public ObSimpleClusterTestBase
{
public:
ObTxCtxTableTest() : ObSimpleClusterTestBase(TEST_FILE_NAME) {}
void dump_ctx_with_merged_undo_action();
private:
};
#define EXE_SQL(sql_str) \
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
#define EXE_SQL_FMT(...) \
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
#define WRITE_SQL_BY_CONN(conn, sql_str) \
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
#define WRITE_SQL_FMT_BY_CONN(conn, ...) \
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
#define DEF_VAL_FOR_SQL \
int ret = OB_SUCCESS; \
ObSqlString sql; \
int64_t affected_rows = 0; \
void ObTxCtxTableTest::dump_ctx_with_merged_undo_action()
{
DEF_VAL_FOR_SQL;
common::ObMySQLProxy &tt1_proxy = get_curr_simple_server().get_sql_proxy2();
common::ObMySQLProxy &sys_proxy = get_curr_simple_server().get_sql_proxy();
sqlclient::ObISQLConnection *tt1_conn = nullptr;
sqlclient::ObISQLConnection *sys_conn = nullptr;
ASSERT_EQ(OB_SUCCESS, tt1_proxy.acquire(tt1_conn));
ASSERT_NE(nullptr, tt1_conn);
ASSERT_EQ(OB_SUCCESS, sys_proxy.acquire(sys_conn));
ASSERT_NE(nullptr, sys_conn);
// 初始化测试用例所需参数
WRITE_SQL_BY_CONN(sys_conn, "alter system set _private_buffer_size = '1B';");
WRITE_SQL_BY_CONN(tt1_conn, "set ob_trx_timeout = 3000000000");
WRITE_SQL_BY_CONN(tt1_conn, "set ob_trx_idle_timeout = 3000000000");
WRITE_SQL_BY_CONN(tt1_conn, "set ob_query_timeout = 3000000000");
WRITE_SQL_BY_CONN(tt1_conn, "alter system set undo_retention = 1800");
sleep(5);
// 建表
WRITE_SQL_BY_CONN(tt1_conn, "create table if not exists test_tx_ctx_t (a int, b int)");
// 开事务
WRITE_SQL_BY_CONN(tt1_conn, "begin");
WRITE_SQL_BY_CONN(tt1_conn, "insert into test_tx_ctx_t values(1, 1);");
for (int i = 0; i < 4; i++) {
WRITE_SQL_BY_CONN(tt1_conn, "insert into test_tx_ctx_t select a, b from test_tx_ctx_t;");
}
// 执行一次事务上下文表的转储,保证建表语句产生的多源事务能够被转储下去,retain ctx可以被释放
SLEEP_BEFORE_DUMP_TX_CTX = false;
WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant all;");
sleep(5);
// 确认当前上下文中有且只有这一个事务
HEAP_VAR(ObMySQLProxy::MySQLResult, res_0)
{
common::sqlclient::ObMySQLResult *result = nullptr;
sql.assign(
"SELECT count(*) as cnt FROM oceanbase.__all_virtual_trans_stat where tenant_id = 1002 and ls_id = 1001;");
int retry_times = 20;
int64_t cnt = 0;
while (--retry_times >= 0) {
res_0.reuse();
ASSERT_EQ(OB_SUCCESS, sys_conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), res_0));
result = res_0.mysql_result();
ASSERT_EQ(OB_SUCCESS, result->next());
ASSERT_EQ(OB_SUCCESS, result->get_int("cnt", cnt));
if (1 == cnt) {
break;
} else {
fprintf(stdout, "waitting for tx ctx table mini merge to clear retain ctx ... \n");
sleep(1);
}
}
ASSERT_EQ(1, cnt);
}
// 获取当前上下文表的转储位点,用于确认下一次转储是否成功
HEAP_VAR(ObMySQLProxy::MySQLResult, res_1)
{
common::sqlclient::ObMySQLResult *result = nullptr;
sql.assign(
"SELECT end_log_scn FROM oceanbase.__all_virtual_table_mgr where tenant_id = 1002 and ls_id = 1001 and tablet_id = 49401 and table_type = 12;");
res_1.reuse();
ASSERT_EQ(OB_SUCCESS, sys_conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), res_1));
result = res_1.mysql_result();
ASSERT_EQ(OB_SUCCESS, result->next());
ASSERT_EQ(OB_SUCCESS, result->get_int("end_log_scn", TX_CTX_TABLE_LAST_CHECKPOINT));
}
WRITE_SQL_BY_CONN(tt1_conn, "savepoint x1");
for (int i = 0; i < 2; i++) {
WRITE_SQL_BY_CONN(tt1_conn, "insert into test_tx_ctx_t select a, b from test_tx_ctx_t;");
}
// 产生一个undo action,同时插入一些新的数据
WRITE_SQL_BY_CONN(tt1_conn, "savepoint x2");
for (int i = 0; i < 2; i++) {
WRITE_SQL_BY_CONN(tt1_conn, "insert into test_tx_ctx_t select a, b from test_tx_ctx_t;");
}
WRITE_SQL_BY_CONN(tt1_conn, "rollback to savepoint x2");
WRITE_SQL_BY_CONN(tt1_conn, "insert into test_tx_ctx_t select a, b from test_tx_ctx_t;");
// 转储ctx table,设置SLEEP_BEFORE_DUMP_TX_CTX标志
SLEEP_BEFORE_DUMP_TX_CTX = true;
HAS_GOT_TX_CTX = false;
WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant tt1 ls 1001 tablet_id = 49401;");
// 等待一段时间保证上下文表的转储已经拿到了带有undo node的tx data
while (!HAS_GOT_TX_CTX) {
fprintf(stdout, "waitting for scheduling tx ctx table merge dag ...\n");
sleep(2);
}
// 再次执行rollback,预期能merge掉第一次回滚产生的undo action
WRITE_SQL_BY_CONN(tt1_conn, "rollback to savepoint x1");
{
sqlclient::ObISQLConnection *tt1_conn_2 = nullptr;
ASSERT_EQ(OB_SUCCESS, tt1_proxy.acquire(tt1_conn_2));
ASSERT_NE(nullptr, tt1_conn_2);
int64_t insert_start_ts = ObTimeUtil::fast_current_time();
fprintf(stdout, "doing insert while dump tx ctx table sleeping...\n");
while (ObTimeUtil::fast_current_time() - insert_start_ts < 30 * 1000 * 1000/* 30 seconds */) {
WRITE_SQL_BY_CONN(tt1_conn_2, "insert into test_tx_ctx_t values(3, 3)");
if (REACH_TIME_INTERVAL(2 * 1000 * 1000)) {
}
}
}
// 确认上下文表转储成功
HEAP_VAR(ObMySQLProxy::MySQLResult, res_2)
{
common::sqlclient::ObMySQLResult *result = nullptr;
sql.assign("SELECT end_log_scn FROM oceanbase.__all_virtual_table_mgr where tenant_id = 1002 and ls_id = 1001 and "
"tablet_id = 49401 and table_type = 12;");
int retry_times = 40;
int64_t end_log_scn = 0;
while (--retry_times >= 0) {
res_2.reuse();
ASSERT_EQ(OB_SUCCESS, sys_conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), res_2));
result = res_2.mysql_result();
ASSERT_EQ(OB_SUCCESS, result->next());
ASSERT_EQ(OB_SUCCESS, result->get_int("end_log_scn", end_log_scn));
if (end_log_scn > TX_CTX_TABLE_LAST_CHECKPOINT) {
break;
} else {
fprintf(stdout, "waitting for tx ctx table mini merge to dump rollback tx data\n");
}
}
ASSERT_GT(end_log_scn, TX_CTX_TABLE_LAST_CHECKPOINT);
}
}
TEST_F(ObTxCtxTableTest, observer_start) { SERVER_LOG(INFO, "observer_start succ"); }
TEST_F(ObTxCtxTableTest, add_tenant)
{
// create tenant
ASSERT_EQ(OB_SUCCESS, create_tenant());
ASSERT_EQ(OB_SUCCESS, get_tenant_id(RunCtx.tenant_id_));
ASSERT_NE(0, RunCtx.tenant_id_);
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
sleep(5);
}
TEST_F(ObTxCtxTableTest, dump_ctx_with_merged_undo_action)
{
SLEEP_BEFORE_DUMP_TX_CTX = false;
HAS_GOT_TX_CTX = false;
dump_ctx_with_merged_undo_action();
fprintf(stdout, "dump ctx with merge undo action done\n");
}
class ObTxCtxTableRestartTest : public ObSimpleClusterTestBase
{
public:
ObTxCtxTableRestartTest() : ObSimpleClusterTestBase(TEST_FILE_NAME) {}
};
TEST_F(ObTxCtxTableRestartTest, observer_restart)
{
// init sql proxy2 to use tenant tt1
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
SERVER_LOG(INFO, "observer restart succ");
}
TEST_F(ObTxCtxTableRestartTest, test_to_do)
{
}
} // namespace unittest
} // namespace oceanbase
int main(int argc, char **argv)
{
int c = 0;
int time_sec = 0;
int concurrency = 1;
char *log_level = (char *)"INFO";
while (EOF != (c = getopt(argc, argv, "t:l:"))) {
switch (c) {
case 't':
time_sec = atoi(optarg);
break;
case 'l':
log_level = optarg;
oceanbase::unittest::ObSimpleClusterTestBase::enable_env_warn_log_ = false;
break;
case 'c':
concurrency = atoi(optarg);
break;
default:
break;
}
}
std::string gtest_file_name = std::string(TEST_FILE_NAME) + "_gtest.log";
oceanbase::unittest::init_gtest_output(gtest_file_name);
int ret = 0;
ObSimpleServerRestartHelper restart_helper(argc, argv, TEST_FILE_NAME, BORN_CASE_NAME,
RESTART_CASE_NAME);
restart_helper.set_sleep_sec(time_sec);
restart_helper.run();
return ret;
}

View File

@ -68,7 +68,6 @@ void ObCtxTxData::reset()
ctx_mgr_ = nullptr;
tx_data_guard_.reset();
read_only_ = false;
tx_commit_data_.reset();
}
void ObCtxTxData::destroy()
@ -88,7 +87,6 @@ int ObCtxTxData::insert_into_tx_table()
GET_TX_TABLE_(tx_table)
if (OB_FAIL(ret)) {
} else {
tx_commit_data_ = *(tx_data_guard_.tx_data());
if (OB_FAIL(insert_tx_data_(tx_table, tx_data_guard_.tx_data()))) {
TRANS_LOG(WARN, "insert tx data failed", K(ret), K(*this));
} else {
@ -100,7 +98,7 @@ int ObCtxTxData::insert_into_tx_table()
return ret;
}
int ObCtxTxData::recover_tx_data(const ObTxData &tmp_tx_data)
int ObCtxTxData::recover_tx_data(ObTxDataGuard &rhs)
{
int ret = OB_SUCCESS;
WLockGuard guard(lock_);
@ -109,12 +107,12 @@ int ObCtxTxData::recover_tx_data(const ObTxData &tmp_tx_data)
if (OB_FAIL(ret)) {
} else if (OB_FAIL(check_tx_data_writable_())) {
TRANS_LOG(WARN, "tx data is not writeable", K(ret), K(*this));
} else if (OB_FAIL(tx_table->alloc_tx_data(tx_data_guard_))) {
TRANS_LOG(WARN, "alloc tx data failed", KR(ret), K(tmp_tx_data));
} else {
ObTxData *tx_data = tx_data_guard_.tx_data();
*tx_data = tmp_tx_data;
TRANS_LOG(WARN, "tx data is not writeable", K(ret), KPC(this));
} else if (OB_ISNULL(rhs.tx_data())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "input tx data guard is unexpected nullptr", K(ret), KPC(this));
} else if (OB_FAIL(tx_data_guard_.init(rhs.tx_data()))) {
TRANS_LOG(WARN, "init tx data guard failed", K(ret), KPC(this));
}
return ret;
@ -288,43 +286,86 @@ int32_t ObCtxTxData::get_state() const
{
RLockGuard guard(lock_);
const ObTxData *tx_data = tx_data_guard_.tx_data();
return (NULL != tx_data ? ATOMIC_LOAD(&tx_data->state_): ATOMIC_LOAD(&tx_commit_data_.state_));
if (OB_ISNULL(tx_data)) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "tx data is unexpected nullptr", KPC(this));
return 0;
} else {
return ATOMIC_LOAD(&tx_data->state_);
}
}
const SCN ObCtxTxData::get_commit_version() const
{
RLockGuard guard(lock_);
const ObTxData *tx_data = tx_data_guard_.tx_data();
return (NULL != tx_data ? tx_data->commit_version_.atomic_load() : tx_commit_data_.commit_version_.atomic_load());
if (OB_ISNULL(tx_data)) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "tx data is unexpected nullptr", KPC(this));
return SCN::invalid_scn();
} else {
return tx_data->commit_version_.atomic_load();
}
}
const SCN ObCtxTxData::get_start_log_ts() const
{
RLockGuard guard(lock_);
const ObTxData *tx_data = tx_data_guard_.tx_data();
SCN ctx_scn = (NULL != tx_data ? tx_data->start_scn_.atomic_load() : tx_commit_data_.start_scn_.atomic_load());
// if (ctx_scn.is_max()) {
// ctx_scn.reset();
// }
return ctx_scn;
if (OB_ISNULL(tx_data)) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "tx data is unexpected nullptr", KPC(this));
return SCN::invalid_scn();
} else {
return tx_data->start_scn_.atomic_load();
}
}
const SCN ObCtxTxData::get_end_log_ts() const
{
RLockGuard guard(lock_);
const ObTxData *tx_data = tx_data_guard_.tx_data();
SCN ctx_scn = (NULL != tx_data ? tx_data->end_scn_.atomic_load() : tx_commit_data_.end_scn_.atomic_load());
// if (ctx_scn.is_max()) {
// ctx_scn.reset();
// }
return ctx_scn;
if (OB_ISNULL(tx_data)) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "tx data is unexpected nullptr", KPC(this));
return SCN::invalid_scn();
} else {
return tx_data->end_scn_.atomic_load();
}
}
ObTransID ObCtxTxData::get_tx_id() const
{
RLockGuard guard(lock_);
const ObTxData *tx_data = tx_data_guard_.tx_data();
return (NULL != tx_data ? tx_data->tx_id_ : tx_commit_data_.tx_id_);
if (OB_ISNULL(tx_data)) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "tx data is unexpected nullptr", KPC(this));
return ObTransID(0);
} else {
return tx_data->tx_id_;
}
}
int ObCtxTxData::get_tx_data(storage::ObTxDataGuard &tx_data_guard)
{
int ret = OB_SUCCESS;
ObTxData *tx_data = tx_data_guard_.tx_data();
if (OB_ISNULL(tx_data)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "tx data is unexpected nullptr", KR(ret), KPC(this));
} else {
ret = tx_data_guard.init(tx_data);
}
return ret;
}
int ObCtxTxData::get_tx_data_ptr(storage::ObTxData *&tx_data_ptr)
{
int ret = OB_SUCCESS;
ObTxData *tx_data = tx_data_guard_.tx_data();
if (OB_ISNULL(tx_data)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "tx data is unexpected nullptr", KR(ret), KPC(this));
} else {
tx_data_ptr = tx_data;
}
return ret;
}
int ObCtxTxData::prepare_add_undo_action(ObUndoAction &undo_action,
@ -401,25 +442,6 @@ int ObCtxTxData::add_undo_action(ObUndoAction &undo_action, storage::ObUndoStatu
return ret;
}
int ObCtxTxData::Guard::get_tx_data(const ObTxData *&tx_data) const
{
int ret = OB_SUCCESS;
auto tmp_tx_data = host_.tx_data_guard_.tx_data();
if (NULL == tmp_tx_data) {
ret = OB_TRANS_CTX_NOT_EXIST;
} else {
tx_data = tmp_tx_data;
}
return ret;
}
int ObCtxTxData::get_tx_commit_data(const ObTxCommitData *&tx_commit_data) const
{
int ret = OB_SUCCESS;
tx_commit_data = &tx_commit_data_;
return ret;
}
int ObCtxTxData::check_tx_data_writable_()
{
int ret = OB_SUCCESS;
@ -481,6 +503,7 @@ int ObCtxTxData::deep_copy_tx_data_(ObTxTable *tx_table, storage::ObTxDataGuard
return ret;
}
} // namespace transaction
} // namespace oceanbase

View File

@ -38,7 +38,7 @@ public:
bool is_read_only() const { return read_only_; }
int insert_into_tx_table();
int recover_tx_data(const storage::ObTxData &tmp_tx_data);
int recover_tx_data(storage::ObTxDataGuard &rhs);
int replace_tx_data(storage::ObTxData *tmp_tx_data);
int deep_copy_tx_data_out(storage::ObTxDataGuard &tmp_tx_data_guard);
int alloc_tmp_tx_data(storage::ObTxDataGuard &tmp_tx_data);
@ -66,19 +66,13 @@ public:
int commit_add_undo_action(ObUndoAction &undo_action, storage::ObUndoStatusNode *tmp_undo_status);
int add_undo_action(ObUndoAction &undo_action, storage::ObUndoStatusNode *tmp_undo_status = NULL);
int get_tx_commit_data(const storage::ObTxCommitData *&tx_commit_data) const;
int get_tx_data(storage::ObTxDataGuard &tx_data_guard);
// ATTENTION : use get_tx_data_ptr only if you can make sure the life cycle of ctx_tx_data is longer than your usage
int get_tx_data_ptr(storage::ObTxData *&tx_data_ptr);
TO_STRING_KV(KP(ctx_mgr_), K(tx_data_guard_), K(read_only_));
TO_STRING_KV(KP(ctx_mgr_), K(tx_data_guard_), K(tx_commit_data_), K(read_only_));
public:
class Guard { // TODO(yunxing.cyx): remove it
friend class ObCtxTxData;
Guard(ObCtxTxData &host) : host_(host) { }
ObCtxTxData &host_;
public:
~Guard() { }
int get_tx_data(const storage::ObTxData *&tx_data) const;
};
Guard get_tx_data() { return Guard(*this); }
public:
//only for unittest
void test_init(storage::ObTxData &tx_data, ObLSTxCtxMgr *ctx_mgr)
@ -113,7 +107,6 @@ private:
private:
ObLSTxCtxMgr *ctx_mgr_;
storage::ObTxDataGuard tx_data_guard_;
storage::ObTxCommitData tx_commit_data_;
bool read_only_;
// lock for tx_data_ pointer
RWLock lock_;

View File

@ -1362,7 +1362,7 @@ int ObPartTransCtx::recover_tx_ctx_table_info(ObTxCtxTableInfo &ctx_info)
// TRANS_LOG(WARN, "unexpected null ptr", K(*this));
} else if (OB_FAIL(mt_ctx_.recover_from_table_lock_durable_info(ctx_info.table_lock_info_))) {
TRANS_LOG(ERROR, "recover_from_table_lock_durable_info failed", K(ret));
} else if (OB_FAIL(ctx_tx_data_.recover_tx_data(ctx_info.state_info_))) {
} else if (OB_FAIL(ctx_tx_data_.recover_tx_data(ctx_info.tx_data_guard_))) {
TRANS_LOG(WARN, "recover tx data failed", K(ret), K(ctx_tx_data_));
} else {
trans_id_ = ctx_info.tx_id_;
@ -5313,9 +5313,8 @@ int ObPartTransCtx::check_with_tx_data(ObITxDataCheckFunctor &fn)
{
// NB: You need notice the lock is not acquired during check
int ret = OB_SUCCESS;
const ObTxData *tx_data_ptr = NULL;
auto guard = ctx_tx_data_.get_tx_data();
if (OB_FAIL(guard.get_tx_data(tx_data_ptr))) {
ObTxData *tx_data_ptr = NULL;
if (OB_FAIL(ctx_tx_data_.get_tx_data_ptr(tx_data_ptr))) {
} else {
// const ObTxData &tx_data = *tx_data_ptr;
// NB: we must read the state then the version without lock. If you are interested in the
@ -5434,41 +5433,24 @@ int ObPartTransCtx::refresh_rec_log_ts_()
int ObPartTransCtx::get_tx_ctx_table_info_(ObTxCtxTableInfo &info)
{
int ret = OB_SUCCESS;
{
const ObTxData *tx_data = NULL;
const ObTxCommitData *tx_commit_data = NULL;
auto guard = ctx_tx_data_.get_tx_data();
if (OB_FAIL(guard.get_tx_data(tx_data))) {
TRANS_LOG(WARN, "get tx data failed", K(ret));
// rewrite ret
ret = OB_SUCCESS;
if (OB_FAIL(ctx_tx_data_.get_tx_commit_data(tx_commit_data))) {
TRANS_LOG(WARN, "get tx commit data failed", K(ret));
} else {
info.state_info_ = *tx_commit_data;
}
if (OB_FAIL(ctx_tx_data_.get_tx_data(info.tx_data_guard_))) {
TRANS_LOG(WARN, "get tx data failed", K(ret));
} else if (OB_FAIL(mt_ctx_.calc_checksum_before_scn(
exec_info_.max_applied_log_ts_, exec_info_.checksum_, exec_info_.checksum_scn_))) {
TRANS_LOG(ERROR, "calc checksum before log ts failed", K(ret), KPC(this));
} else {
info.tx_id_ = trans_id_;
info.ls_id_ = ls_id_;
info.exec_info_ = exec_info_;
info.cluster_id_ = cluster_id_;
if (OB_FAIL(mt_ctx_.get_table_lock_store_info(info.table_lock_info_))) {
TRANS_LOG(WARN, "get_table_lock_store_info failed", K(ret), K(info));
} else {
info.state_info_ = *tx_data;
TRANS_LOG(INFO, "store ctx_info: ", K(ret), K(info), KPC(this));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(mt_ctx_.calc_checksum_before_scn(exec_info_.max_applied_log_ts_,
exec_info_.checksum_,
exec_info_.checksum_scn_))) {
TRANS_LOG(ERROR, "calc checksum before log ts failed", K(ret), KPC(this));
} else {
info.tx_id_ = trans_id_;
info.ls_id_ = ls_id_;
info.exec_info_ = exec_info_;
info.cluster_id_ = cluster_id_;
if (OB_FAIL(mt_ctx_.get_table_lock_store_info(info.table_lock_info_))) {
TRANS_LOG(WARN, "get_table_lock_store_info failed", K(ret), K(info));
} else {
TRANS_LOG(INFO, "store ctx_info: ", K(ret), K(info), KPC(this));
}
}
}
return ret;
}
@ -6702,9 +6684,9 @@ int ObPartTransCtx::dump_2_text(FILE *fd)
fprintf(fd, "********** ObPartTransCtx ***********\n\n");
fprintf(fd, "%s\n", buf);
auto guard = ctx_tx_data_.get_tx_data();
if (OB_FAIL(guard.get_tx_data(tx_data_ptr))) {
} else if (OB_ISNULL(tx_data_ptr)) {
ObTxDataGuard tx_data_guard;
ctx_tx_data_.get_tx_data(tx_data_guard);
if (OB_ISNULL(tx_data_ptr = tx_data_guard.tx_data())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected nullptr", KR(ret));
} else {

View File

@ -23,6 +23,7 @@ namespace storage
int ObUndoStatusList::serialize(char *buf, const int64_t buf_len, int64_t &pos) const
{
SpinRLockGuard guard(lock_);
int ret = OB_SUCCESS;
const int64_t len = get_serialize_size_();
if (OB_UNLIKELY(OB_ISNULL(buf) || buf_len <= 0 || pos > buf_len)) {
@ -73,6 +74,7 @@ int ObUndoStatusList::deserialize(const char *buf,
int ret = OB_SUCCESS;
int64_t version = 0;
int64_t undo_status_list_len = 0;
SpinWLockGuard guard(lock_);
if (OB_FAIL(serialization::decode_vi64(buf, data_len, pos, &version))) {
STORAGE_LOG(WARN, "decode version fail", K(version), K(data_len), K(pos), K(ret));
@ -145,6 +147,7 @@ int ObUndoStatusList::deserialize_(const char *buf,
int64_t ObUndoStatusList::get_serialize_size() const
{
SpinRLockGuard guard(lock_);
int64_t data_len = get_serialize_size_();
int64_t len = 0;
len += serialization::encoded_length_vi64(UNIS_VERSION);
@ -485,7 +488,6 @@ int ObTxData::add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &ne
if (OB_NOT_NULL(undo_node)) {
tx_data_table->free_undo_status_node(undo_node);
}
return ret;
}
@ -506,6 +508,7 @@ int ObTxData::merge_undo_actions_(ObTxDataTable *tx_data_table,
}
if (0 == node->size_) {
// fprintf(stdout, "free undo node, node ptr = %p \n", node);
// all undo actions in this node are merged, free it
// STORAGE_LOG(DEBUG, "current node is empty, now free it");
ObUndoStatusNode *node_to_free = node;

View File

@ -128,8 +128,8 @@ int ObTxCtxTableRecoverHelper::recover_one_tx_ctx_(transaction::ObLSTxCtxMgr* ls
}
int ObTxCtxTableRecoverHelper::recover(const blocksstable::ObDatumRow &row,
ObSliceAlloc &slice_allocator,
transaction::ObLSTxCtxMgr* ls_tx_ctx_mgr)
ObTxDataTable &tx_data_table,
transaction::ObLSTxCtxMgr *ls_tx_ctx_mgr)
{
int ret = OB_SUCCESS;
@ -206,7 +206,7 @@ int ObTxCtxTableRecoverHelper::recover(const blocksstable::ObDatumRow &row,
int64_t pos = 0;
bool tx_ctx_existed = true;
if (OB_FAIL(ctx_info_.deserialize(deserialize_buf, deserialize_buf_length, pos, slice_allocator))) {
if (OB_FAIL(ctx_info_.deserialize(deserialize_buf, deserialize_buf_length, pos, tx_data_table))) {
STORAGE_LOG(WARN, "failed to deserialize status_info", K(ret), K_(ctx_info));
} else if (OB_FAIL(recover_one_tx_ctx_(ls_tx_ctx_mgr, ctx_info_))) {
STORAGE_LOG(WARN, "failed to recover_one_tx_ctx_", K(ret), K(ctx_info_));
@ -313,9 +313,9 @@ int ObTxCtxTable::offline()
return ret;
}
int ObTxCtxTable::recover(const blocksstable::ObDatumRow &row, ObSliceAlloc &slice_allocator)
int ObTxCtxTable::recover(const blocksstable::ObDatumRow &row, ObTxDataTable &tx_data_table)
{
return recover_helper_.recover(row, slice_allocator, ls_tx_ctx_mgr_);
return recover_helper_.recover(row, tx_data_table, ls_tx_ctx_mgr_);
}
int ObTxCtxTable::check_with_tx_data(const transaction::ObTransID tx_id, ObITxDataCheckFunctor &fn)

View File

@ -35,8 +35,8 @@ public:
void reset();
void destroy();
int recover(const blocksstable::ObDatumRow &row,
ObSliceAlloc &slice_allocator,
transaction::ObLSTxCtxMgr* ls_tx_ctx_mgr);
ObTxDataTable &tx_data_table,
transaction::ObLSTxCtxMgr *ls_tx_ctx_mgr);
TO_STRING_KV(K_(in_multi_row_state), K_(prev_meta), K_(prev_end_pos));
@ -109,7 +109,7 @@ public:
int release_ref_();
// We use the method to recover the tx_ctx_table for reboot.
int recover(const blocksstable::ObDatumRow &row, ObSliceAlloc &slice_allocator);
int recover(const blocksstable::ObDatumRow &row, ObTxDataTable &tx_data_table);
int check_with_tx_data(const transaction::ObTransID tx_id, ObITxDataCheckFunctor &fn);

View File

@ -688,7 +688,7 @@ int ObTxTable::restore_tx_ctx_table_(ObITable &trans_sstable)
if (OB_ITER_END != ret) {
LOG_WARN("failed to get next row", K(ret));
}
} else if (OB_FAIL(tx_ctx_table_.recover(*row, *tx_data_table_.get_slice_allocator()))) {
} else if (OB_FAIL(tx_ctx_table_.recover(*row, tx_data_table_))) {
LOG_WARN("failed to recover tx ctx table", K(ret));
}
}

View File

@ -11,6 +11,7 @@
*/
#include "storage/tx_table/ob_tx_table_define.h"
#include "storage/tx_table/ob_tx_data_table.h"
namespace oceanbase
{
@ -71,7 +72,10 @@ int ObTxCtxTableInfo::serialize(char *buf,
int ret = OB_SUCCESS;
const int64_t data_len = get_serialize_size_();
ObTxCtxTableCommonHeader header(MAGIC_VERSION, data_len);
if (OB_FAIL(header.serialize(buf, buf_len, pos))) {
if (OB_ISNULL(tx_data_guard_.tx_data())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "invalid tx data guard", KR(ret), KPC(this));
} else if (OB_FAIL(header.serialize(buf, buf_len, pos))) {
TRANS_LOG(WARN, "encode header fail", K(buf_len), K(pos), K(ret));
} else if (OB_FAIL(serialize_(buf, buf_len, pos))) {
TRANS_LOG(WARN, "serialize fail", K(ret));
@ -91,7 +95,7 @@ int ObTxCtxTableInfo::serialize_(char *buf,
TRANS_LOG(WARN, "serialize ls_id fail.", KR(ret), K(pos), K(buf_len));
} else if (OB_FAIL(serialization::encode_vi64(buf, buf_len, pos, cluster_id_))) {
TRANS_LOG(WARN, "encode cluster id failed", K(cluster_id_), K(buf_len), K(pos), K(ret));
} else if (OB_FAIL(state_info_.serialize(buf, buf_len, pos))) {
} else if (OB_FAIL(tx_data_guard_.tx_data()->serialize(buf, buf_len, pos))) {
TRANS_LOG(WARN, "serialize state_info fail.", KR(ret), K(pos), K(buf_len));
} else if (OB_FAIL(exec_info_.serialize(buf, buf_len, pos))) {
TRANS_LOG(WARN, "serialize exec_info fail.", KR(ret), K(pos), K(buf_len));
@ -105,14 +109,16 @@ int ObTxCtxTableInfo::serialize_(char *buf,
int ObTxCtxTableInfo::deserialize(const char *buf,
const int64_t buf_len,
int64_t &pos,
ObSliceAlloc &slice_allocator)
ObTxDataTable &tx_data_table)
{
int ret = OB_SUCCESS;
ObTxCtxTableCommonHeader header(MAGIC_VERSION, 0);
if (OB_FAIL(header.deserialize(buf, buf_len, pos))) {
if (OB_FAIL(tx_data_table.alloc_tx_data(tx_data_guard_))) {
STORAGE_LOG(WARN, "alloc tx data failed", KR(ret));
} else if (OB_FAIL(header.deserialize(buf, buf_len, pos))) {
TRANS_LOG(WARN, "deserialize header fail", K(buf_len), K(pos), K(ret));
} else if (OB_FAIL(deserialize_(buf, buf_len, pos, slice_allocator))) {
} else if (OB_FAIL(deserialize_(buf, buf_len, pos, tx_data_table))) {
TRANS_LOG(INFO, "deserialize_ fail", "buf_len", buf_len, K(pos), K(ret));
}
return ret;
@ -121,7 +127,7 @@ int ObTxCtxTableInfo::deserialize(const char *buf,
int ObTxCtxTableInfo::deserialize_(const char *buf,
const int64_t buf_len,
int64_t &pos,
ObSliceAlloc &slice_allocator)
ObTxDataTable &tx_data_table)
{
int ret = OB_SUCCESS;
if (OB_FAIL(tx_id_.deserialize(buf, buf_len, pos))) {
@ -130,7 +136,7 @@ int ObTxCtxTableInfo::deserialize_(const char *buf,
TRANS_LOG(WARN, "deserialize ls_id fail.", KR(ret), K(pos), K(buf_len));
} else if (OB_FAIL(serialization::decode_vi64(buf, buf_len, pos, &cluster_id_))) {
TRANS_LOG(WARN, "encode cluster_id fail", K(cluster_id_), K(buf_len), K(pos), K(ret));
} else if (OB_FAIL(state_info_.deserialize(buf, buf_len, pos, slice_allocator))) {
} else if (OB_FAIL(tx_data_guard_.tx_data()->deserialize(buf, buf_len, pos, *tx_data_table.get_slice_allocator()))) {
TRANS_LOG(WARN, "deserialize state_info fail.", KR(ret), K(pos), K(buf_len));
} else if (OB_FAIL(exec_info_.deserialize(buf, buf_len, pos))) {
TRANS_LOG(WARN, "deserialize exec_info fail.", KR(ret), K(pos), K(buf_len));
@ -159,7 +165,7 @@ int64_t ObTxCtxTableInfo::get_serialize_size_(void) const
len += tx_id_.get_serialize_size();
len += ls_id_.get_serialize_size();
len += serialization::encoded_length_vi64(cluster_id_);
len += state_info_.get_serialize_size();
len += (OB_NOT_NULL(tx_data_guard_.tx_data()) ? tx_data_guard_.tx_data()->get_serialize_size() : 0);
len += exec_info_.get_serialize_size();
len += table_lock_info_.get_serialize_size();
return len;

View File

@ -75,12 +75,12 @@ private:
const static int64_t MAGIC_VERSION = MAGIC_NUM + UNIS_VERSION;
public:
int serialize(char *buf, const int64_t buf_len, int64_t &pos) const;
int deserialize(const char *buf, const int64_t buf_len, int64_t &pos, ObSliceAlloc &slice_allocator);
int deserialize(const char *buf, const int64_t buf_len, int64_t &pos, ObTxDataTable &tx_data_table);
int64_t get_serialize_size() const;
private:
int serialize_(char *buf, const int64_t buf_len, int64_t &pos) const;
int deserialize_(const char *buf, const int64_t buf_len, int64_t &pos, ObSliceAlloc &slice_allocator);
int deserialize_(const char *buf, const int64_t buf_len, int64_t &pos, ObTxDataTable &tx_data_table);
int64_t get_serialize_size_() const;
public:
@ -92,16 +92,16 @@ public:
tx_id_.reset();
ls_id_.reset();
cluster_id_ = OB_INVALID_CLUSTER_ID;
state_info_.reset();
tx_data_guard_.reset();
exec_info_.reset();
table_lock_info_.reset();
}
void destroy() { reset(); }
TO_STRING_KV(K_(tx_id), K_(ls_id), K_(cluster_id), K_(state_info), K_(exec_info));
TO_STRING_KV(K_(tx_id), K_(ls_id), K_(cluster_id), K_(tx_data_guard), K_(exec_info));
transaction::ObTransID tx_id_;
share::ObLSID ls_id_;
int64_t cluster_id_;
ObTxData state_info_;
ObTxDataGuard tx_data_guard_;
transaction::ObTxExecInfo exec_info_;
transaction::tablelock::ObTableLockInfo table_lock_info_;
};

View File

@ -314,7 +314,12 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
part1.set_downstream_state(ObTxState::PREPARE);
part1.exec_info_.prepare_version_.convert_for_tx(90);
part2.set_downstream_state(ObTxState::COMMIT);
part2.ctx_tx_data_.tx_commit_data_.commit_version_.convert_for_tx(90);
ObTxData part2_tx_data;
ObSliceAlloc slice_allocator;
part2_tx_data.ref_cnt_ = 1000;
part2_tx_data.slice_allocator_ = &slice_allocator;
part2.ctx_tx_data_.tx_data_guard_.init(&part2_tx_data);
part2.ctx_tx_data_.tx_data_guard_.tx_data()->commit_version_.convert_for_tx(90);
part3.set_downstream_state(ObTxState::UNKNOWN);
can_read = false;
part1.state_info_array_.reset();
@ -343,7 +348,7 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
part1.set_downstream_state(ObTxState::PREPARE);
coord.exec_info_.prepare_version_.convert_for_tx(90);
part2.set_downstream_state(ObTxState::COMMIT);
part2.ctx_tx_data_.tx_commit_data_.commit_version_.convert_for_tx(300);
part2.ctx_tx_data_.tx_data_guard_.tx_data()->commit_version_.convert_for_tx(300);
part3.set_downstream_state(ObTxState::UNKNOWN);
can_read = true;
part1.state_info_array_.reset();

View File

@ -258,9 +258,10 @@ TEST_F(TestTxCtxTable, test_tx_ctx_memtable_mgr)
// ObTransSSTableDurableCtxInfo ctx_info;
ObTxCtxTableInfo ctx_info;
ObSliceAlloc slice_allocator;
ObTxDataTable tx_data_table;
ObMemAttr attr;
attr.tenant_id_ = MTL_ID();
slice_allocator.init(sizeof(ObTxData), OB_MALLOC_NORMAL_BLOCK_SIZE, common::default_blk_alloc, attr);
tx_data_table.slice_allocator_.init(sizeof(ObTxData), OB_MALLOC_NORMAL_BLOCK_SIZE, common::default_blk_alloc, attr);
ObTxPalfParam palf_param((logservice::ObLogHandler *)(0x01));
@ -299,7 +300,7 @@ TEST_F(TestTxCtxTable, test_tx_ctx_memtable_mgr)
row_copy.storage_datums_[TX_CTX_TABLE_META_COLUMN] = row_copy.storage_datums_[meta_col];
row_copy.storage_datums_[TX_CTX_TABLE_VAL_COLUMN] = row_copy.storage_datums_[value_col];
TRANS_LOG(INFO, "row_info projected", K(row_copy));
ASSERT_EQ(OB_SUCCESS, recover_helper.recover(row_copy, slice_allocator, ls_tx_ctx_mgr_recover));
ASSERT_EQ(OB_SUCCESS, recover_helper.recover(row_copy, tx_data_table, ls_tx_ctx_mgr_recover));
} while (tx_ctx_memtable_iter->has_unmerged_buf_);
ObTxCtxTableInfo* ctx_info = recover_helper.get_tx_ctx_table_info();