[FEAT MERGE] Improve the inner implementation of tx table
Co-authored-by: kongfy <njukongfy@gmail.com>
This commit is contained in:
@ -52,6 +52,26 @@ using namespace storage;
|
||||
using namespace palf;
|
||||
using namespace share;
|
||||
|
||||
namespace storage
|
||||
{
|
||||
|
||||
int ObTxDataMemtable::estimate_phy_size(const ObStoreRowkey *start_key,
|
||||
const ObStoreRowkey *end_key,
|
||||
int64_t &total_bytes,
|
||||
int64_t &total_rows)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (MTL_ID() == 1002 && freezer_->get_ls_id().id() == 1001) {
|
||||
total_bytes = 134217728LL * 2LL;
|
||||
} else {
|
||||
total_bytes = 1;
|
||||
}
|
||||
total_rows = 1;
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace unittest
|
||||
{
|
||||
|
||||
@ -73,6 +93,9 @@ public:
|
||||
void freeze_tx_data(ObTxDataTable *tx_data_table);
|
||||
void check_start_tx_scn(ObTxDataTable *tx_data_table);
|
||||
void basic_test();
|
||||
void parallel_dump_rollback();
|
||||
void check_minor_merge();
|
||||
void check_recycle_scn();
|
||||
|
||||
private:
|
||||
void check_tx_data_minor_succeed();
|
||||
@ -99,7 +122,13 @@ private:
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
|
||||
ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
|
||||
|
||||
#define DEF_VAL_FOR_SQL \
|
||||
int ret = OB_SUCCESS; \
|
||||
ObSqlString sql; \
|
||||
int64_t affected_rows = 0;
|
||||
|
||||
static int64_t VAL = 0;
|
||||
|
||||
void ObTxDataTableTest::insert_tx_data()
|
||||
{
|
||||
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
@ -116,14 +145,13 @@ void ObTxDataTableTest::insert_tx_data()
|
||||
|
||||
void ObTxDataTableTest::insert_rollback_tx_data()
|
||||
{
|
||||
DEF_VAL_FOR_SQL
|
||||
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
|
||||
OB_LOG(INFO, "insert rollback tx data start");
|
||||
int64_t i = 0;
|
||||
int64_t affected_rows = 0;
|
||||
int rollback_cnt = 0;
|
||||
first_rollback_ts_ = 0;
|
||||
ObSqlString sql;
|
||||
sqlclient::ObISQLConnection *connection = nullptr;
|
||||
ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection));
|
||||
ASSERT_NE(nullptr, connection);
|
||||
@ -162,7 +190,7 @@ void ObTxDataTableTest::freeze_tx_data(ObTxDataTable *tx_data_table)
|
||||
::sleep(1);
|
||||
int64_t affected_rows = 0;
|
||||
ObSqlString sql;
|
||||
EXE_SQL("alter system minor freeze tenant tt1;");
|
||||
EXE_SQL("alter system minor freeze tenant all;");
|
||||
}
|
||||
ATOMIC_STORE(&stop, true);
|
||||
STORAGE_LOG(INFO, "freeze done");
|
||||
@ -277,6 +305,131 @@ void ObTxDataTableTest::basic_test()
|
||||
check_tx_data_minor_succeed();
|
||||
}
|
||||
|
||||
void ObTxDataTableTest::parallel_dump_rollback()
|
||||
{
|
||||
DEF_VAL_FOR_SQL
|
||||
common::ObMySQLProxy &sys_proxy = get_curr_simple_server().get_sql_proxy();
|
||||
sqlclient::ObISQLConnection *sys_conn = nullptr;
|
||||
ASSERT_EQ(OB_SUCCESS, sys_proxy.acquire(sys_conn));
|
||||
ASSERT_NE(nullptr, sys_conn);
|
||||
|
||||
common::ObMySQLProxy &tt1_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
sqlclient::ObISQLConnection *tt1_conn = nullptr;
|
||||
ASSERT_EQ(OB_SUCCESS, tt1_proxy.acquire(tt1_conn));
|
||||
ASSERT_NE(nullptr, tt1_conn);
|
||||
|
||||
WRITE_SQL_BY_CONN(tt1_conn, "create table if not exists test_parallel_dump_tx_data (c1 int, c2 int);");
|
||||
WRITE_SQL_BY_CONN(tt1_conn, "set ob_trx_timeout = 3000000000");
|
||||
WRITE_SQL_BY_CONN(tt1_conn, "set ob_trx_idle_timeout = 3000000000");
|
||||
WRITE_SQL_BY_CONN(tt1_conn, "set ob_query_timeout = 3000000000");
|
||||
|
||||
fprintf(stdout, "start rollback to savepoint\n");
|
||||
WRITE_SQL_BY_CONN(tt1_conn, "begin");
|
||||
for (int64_t i = 0; i < 1000; i++) {
|
||||
WRITE_SQL_BY_CONN(tt1_conn, "savepoint x");
|
||||
WRITE_SQL_BY_CONN(tt1_conn, "insert into test_parallel_dump_tx_data values(1,1);");
|
||||
WRITE_SQL_BY_CONN(tt1_conn, "rollback to savepoint x");
|
||||
WRITE_SQL_BY_CONN(tt1_conn, "insert into test_parallel_dump_tx_data values(1,1);");
|
||||
|
||||
if (0 == i % 100) {
|
||||
sys_conn->execute_write(OB_SYS_TENANT_ID, "alter system minor freeze tenant tt1 ls 1001 tablet_id = 49402;", affected_rows);
|
||||
}
|
||||
}
|
||||
WRITE_SQL_BY_CONN(tt1_conn, "commit");
|
||||
fprintf(stdout, "finish rollback to savepoint\n");
|
||||
sleep(5);
|
||||
|
||||
sys_conn->execute_write(OB_SYS_TENANT_ID, "alter system minor freeze tenant tt1 ls 1001 tablet_id = 49402;", affected_rows);
|
||||
sleep(5);
|
||||
}
|
||||
|
||||
void ObTxDataTableTest::check_minor_merge()
|
||||
{
|
||||
DEF_VAL_FOR_SQL
|
||||
common::ObMySQLProxy &sys_proxy = get_curr_simple_server().get_sql_proxy();
|
||||
sqlclient::ObISQLConnection *sys_conn = nullptr;
|
||||
ASSERT_EQ(OB_SUCCESS, sys_proxy.acquire(sys_conn));
|
||||
ASSERT_NE(nullptr, sys_conn);
|
||||
|
||||
HEAP_VAR(ObMySQLProxy::MySQLResult, res)
|
||||
{
|
||||
int64_t retry_times = 10;
|
||||
|
||||
// 确认MINI MERGE 并行转储正确
|
||||
int64_t cnt = 0;
|
||||
while (--retry_times >= 0) {
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
sys_conn->execute_read(
|
||||
OB_SYS_TENANT_ID,
|
||||
"select count(*) as cnt from oceanbase.__all_virtual_tablet_compaction_history where tenant_id "
|
||||
"= 1002 and ls_id = 1001 and tablet_id = 49402 and parallel_degree > 1 and type = 'MINI_MERGE';",
|
||||
res));
|
||||
common::sqlclient::ObMySQLResult *result = res.mysql_result();
|
||||
ASSERT_EQ(OB_SUCCESS, result->next());
|
||||
ASSERT_EQ(OB_SUCCESS, result->get_int("cnt", cnt));
|
||||
if (cnt > 0) {
|
||||
break;
|
||||
} else {
|
||||
sleep(1);
|
||||
}
|
||||
}
|
||||
ASSERT_GT(cnt, 0);
|
||||
|
||||
// 确认没有未能转储的memtable
|
||||
retry_times = 10;
|
||||
while (--retry_times >= 0) {
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
sys_conn->execute_read(
|
||||
OB_SYS_TENANT_ID,
|
||||
"select count(*) as cnt from oceanbase.__all_virtual_tx_data_table where state = 'FROZEN' or state = 'FREEZING';",
|
||||
res));
|
||||
common::sqlclient::ObMySQLResult *result = res.mysql_result();
|
||||
cnt = -1;
|
||||
ASSERT_EQ(OB_SUCCESS, result->next());
|
||||
ASSERT_EQ(OB_SUCCESS, result->get_int("cnt", cnt));
|
||||
if (0 == cnt) {
|
||||
break;
|
||||
} else {
|
||||
sleep(1);
|
||||
}
|
||||
}
|
||||
ASSERT_EQ(0, cnt);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void ObTxDataTableTest::check_recycle_scn()
|
||||
{
|
||||
DEF_VAL_FOR_SQL
|
||||
common::ObMySQLProxy &sys_proxy = get_curr_simple_server().get_sql_proxy();
|
||||
sqlclient::ObISQLConnection *sys_conn = nullptr;
|
||||
ASSERT_EQ(OB_SUCCESS, sys_proxy.acquire(sys_conn));
|
||||
ASSERT_NE(nullptr, sys_conn);
|
||||
|
||||
HEAP_VAR(ObMySQLProxy::MySQLResult, res)
|
||||
{
|
||||
ASSERT_EQ(
|
||||
OB_SUCCESS,
|
||||
sys_conn->execute_read(
|
||||
OB_SYS_TENANT_ID,
|
||||
"select tenant_id, min_tx_log_scn from oceanbase.__all_virtual_tx_data_table where ls_id = 1 and state = 'MINOR';",
|
||||
res));
|
||||
common::sqlclient::ObMySQLResult *result = res.mysql_result();
|
||||
|
||||
int tenant_cnt = 0;
|
||||
while (OB_SUCCESS == result->next()) {
|
||||
int64_t tenant_id = 0;
|
||||
int64_t min_tx_log_scn = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, result->get_int("tenant_id", tenant_id));
|
||||
ASSERT_EQ(OB_SUCCESS, result->get_int("min_tx_log_scn", min_tx_log_scn));
|
||||
ASSERT_TRUE(1 == tenant_id || 1001 == tenant_id || 1002 == tenant_id);
|
||||
ASSERT_TRUE(min_tx_log_scn > 1);
|
||||
tenant_cnt++;
|
||||
}
|
||||
ASSERT_EQ(tenant_cnt, 3);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ObTxDataTableTest, observer_start) { SERVER_LOG(INFO, "observer_start succ"); }
|
||||
|
||||
TEST_F(ObTxDataTableTest, add_tenant)
|
||||
@ -288,6 +441,19 @@ TEST_F(ObTxDataTableTest, add_tenant)
|
||||
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
|
||||
}
|
||||
|
||||
TEST_F(ObTxDataTableTest, init_config)
|
||||
{
|
||||
DEF_VAL_FOR_SQL
|
||||
common::ObMySQLProxy &sys_proxy = get_curr_simple_server().get_sql_proxy();
|
||||
sqlclient::ObISQLConnection *sys_conn = nullptr;
|
||||
ASSERT_EQ(OB_SUCCESS, sys_proxy.acquire(sys_conn));
|
||||
ASSERT_NE(nullptr, sys_conn);
|
||||
WRITE_SQL_BY_CONN(sys_conn, "alter system set _private_buffer_size = '1B'");
|
||||
WRITE_SQL_BY_CONN(sys_conn, "alter system set minor_compact_trigger = 0");
|
||||
WRITE_SQL_BY_CONN(sys_conn, "alter system set _minor_compaction_amplification_factor = 1");
|
||||
sleep(2);
|
||||
}
|
||||
|
||||
TEST_F(ObTxDataTableTest, create_table)
|
||||
{
|
||||
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
@ -304,6 +470,21 @@ TEST_F(ObTxDataTableTest, basic_test)
|
||||
basic_test();
|
||||
}
|
||||
|
||||
TEST_F(ObTxDataTableTest, parallel_dump_rollback)
|
||||
{
|
||||
parallel_dump_rollback();
|
||||
}
|
||||
|
||||
TEST_F(ObTxDataTableTest, check_minor_merge)
|
||||
{
|
||||
check_minor_merge();
|
||||
}
|
||||
|
||||
TEST_F(ObTxDataTableTest, check_recycle_scn)
|
||||
{
|
||||
check_recycle_scn();
|
||||
}
|
||||
|
||||
class ObTxDataTableRestartTest : public ObSimpleClusterTestBase
|
||||
{
|
||||
public:
|
||||
|
||||
Reference in New Issue
Block a user