Files
oceanbase/mittest/multi_replica/test_ob_direct_load_inc_log.cpp
coolfishchen 9de65fb1d7 [FEAT MERGE] incremental direct load phase I
Co-authored-by: Monk-Liu <1152761042@qq.com>
Co-authored-by: suz-yang <suz.yang@foxmail.com>
Co-authored-by: ZenoWang <wzybuaasoft@163.com>
2024-04-22 09:23:47 +00:00

192 lines
6.1 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#define USING_LOG_PREFIX SERVER
#define protected public
#define private public
#include "env/ob_fast_bootstrap.h"
#include "env/ob_multi_replica_util.h"
#include "lib/mysqlclient/ob_mysql_result.h"
#include "storage/tx/ob_dup_table_lease.h"
using namespace oceanbase::transaction;
using namespace oceanbase::storage;
#define CUR_TEST_CASE_NAME ObDirectLoadIncLogTest
DEFINE_MULTI_ZONE_TEST_CASE_CLASS
MULTI_REPLICA_TEST_MAIN_FUNCTION(test_direct_load_inc_);
#define DEFAULT_LOAD_ROW_CNT 30
namespace oceanbase
{
namespace unittest
{
class ObTestExtraLogCb : public logservice::AppendCb
{
public:
int on_success()
{
int ret = OB_SUCCESS;
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] invoke test on_success", K(ret), KP(this));
return ret;
}
int on_failure()
{
int ret = OB_SUCCESS;
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] invoke test on_failure", K(ret), KP(this));
return ret;
}
};
void submit_ddl_redo_log(transaction::ObPartTransCtx *tx_ctx)
{
ObDDLRedoLog ddl_redo;
for (int i = 0; i < 5; i++) {
ObTestExtraLogCb *test_log_cb =
static_cast<ObTestExtraLogCb *>(ob_malloc(sizeof(ObTestExtraLogCb), "TestExtralCb"));
new (test_log_cb) ObTestExtraLogCb();
int64_t replay_hint = GETTID();
share::SCN scn;
scn.set_max();
ASSERT_EQ(OB_SUCCESS,
tx_ctx->submit_direct_load_inc_redo_log(ddl_redo, test_log_cb, replay_hint, scn));
usleep(1000);
}
}
void submit_ddl_start_log(transaction::ObPartTransCtx *tx_ctx)
{
storage::ObDDLIncStartLog ddl_start;
ddl_start.log_basic_.tablet_id_ = 101;//GETTID();
ObTestExtraLogCb *test_log_cb =
static_cast<ObTestExtraLogCb *>(ob_malloc(sizeof(ObTestExtraLogCb), "TestExtralCb"));
new (test_log_cb) ObTestExtraLogCb();
int64_t replay_hint = 0;
share::SCN scn;
scn.set_max();
ASSERT_EQ(OB_SUCCESS, tx_ctx->submit_direct_load_inc_start_log(ddl_start, test_log_cb, scn));
}
void submit_ddl_end_log(transaction::ObPartTransCtx *tx_ctx)
{
storage::ObDDLIncCommitLog ddl_commit;
ddl_commit.log_basic_.tablet_id_ = 101;//GETTID();
ObTestExtraLogCb *test_log_cb =
static_cast<ObTestExtraLogCb *>(ob_malloc(sizeof(ObTestExtraLogCb), "TestExtralCb"));
new (test_log_cb) ObTestExtraLogCb();
int64_t replay_hint = 0;
share::SCN scn;
scn.set_max();
ASSERT_EQ(OB_SUCCESS, tx_ctx->submit_direct_load_inc_commit_log(ddl_commit, test_log_cb, scn));
}
TEST_F(GET_ZONE_TEST_CLASS_NAME(1), create_table)
{
int ret = OB_SUCCESS;
CREATE_TEST_TENANT(test_tenant_id);
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] create test tenant success", K(test_tenant_id));
common::ObMySQLProxy &test_tenant_sql_proxy = get_curr_simple_server().get_sql_proxy2();
ACQUIRE_CONN_FROM_SQL_PROXY(test_conn, test_tenant_sql_proxy);
WRITE_SQL_BY_CONN(test_conn,
"CREATE TABLE test_t1( "
"id_x int, "
"id_y int, "
"id_z int, "
"PRIMARY KEY(id_x)"
") duplicate_scope='cluster' PARTITION BY hash(id_x) partitions 10;");
std::string primary_zone_sql = "ALTER TENANT " + std::string(DEFAULT_TEST_TENANT_NAME)
+ " set primary_zone='zone1; zone3; zone2';";
WRITE_SQL_BY_CONN(test_conn, primary_zone_sql.c_str());
READ_SQL_BY_CONN(test_conn, table_info_result,
"select table_id, duplicate_scope from "
"oceanbase.__all_table where table_name = 'test_t1' ");
ASSERT_EQ(OB_SUCCESS, table_info_result->next());
int64_t table_id;
int64_t dup_scope;
ASSERT_EQ(OB_SUCCESS, table_info_result->get_int("table_id", table_id));
ASSERT_EQ(OB_SUCCESS, table_info_result->get_int("duplicate_scope", dup_scope));
ASSERT_EQ(true, table_id > 0);
ASSERT_EQ(true, dup_scope != 0);
std::string tablet_count_sql =
"select count(*), ls_id from oceanbase.__all_tablet_to_ls where table_id = "
+ std::to_string(table_id) + " group by ls_id order by count(*)";
READ_SQL_BY_CONN(test_conn, tablet_count_result, tablet_count_sql.c_str());
int64_t tablet_count = 0;
int64_t ls_id_num = 0;
ASSERT_EQ(OB_SUCCESS, tablet_count_result->next());
ASSERT_EQ(OB_SUCCESS, tablet_count_result->get_int("count(*)", tablet_count));
ASSERT_EQ(OB_SUCCESS, tablet_count_result->get_int("ls_id", ls_id_num));
ASSERT_EQ(10, tablet_count);
ASSERT_EQ(true, share::ObLSID(ls_id_num).is_valid());
GET_LS(test_tenant_id, ls_id_num, ls_handle);
WRITE_SQL_BY_CONN(test_conn, "set autocommit = false;");
WRITE_SQL_BY_CONN(test_conn, "begin;");
for (int i = 1; i <= DEFAULT_LOAD_ROW_CNT; i++) {
std::string insert_sql_str = "INSERT INTO test_t1 VALUES(" + std::to_string(i) + ", 0 , 0)";
WRITE_SQL_BY_CONN(test_conn, insert_sql_str.c_str());
}
int64_t tx_id_num;
// GET_RUNNGING_TRX_ID(test_conn, tx_id_num);
std::string template_sql_str = "INSERT INTO test_t1 VALUES(" + std::to_string(1) + ", 0 , 0)";
GET_TX_ID_FROM_SQL_AUDIT(test_conn, template_sql_str.c_str(), tx_id_num);
transaction::ObPartTransCtx *tx_ctx = nullptr;
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(ObTransID(tx_id_num), false, tx_ctx));
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] get target tx ctx", K(ret), K(tx_id_num), KPC(tx_ctx));
submit_ddl_start_log(tx_ctx);
std::thread t1(submit_ddl_redo_log, tx_ctx);
std::thread t2(submit_ddl_redo_log, tx_ctx);
std::thread t3(submit_ddl_redo_log, tx_ctx);
std::thread t4(submit_ddl_redo_log, tx_ctx);
t1.join();
t2.join();
t3.join();
t4.join();
// submit_ddl_end_log(tx_ctx);
WRITE_SQL_BY_CONN(test_conn, "commit;");
// Don't free log cb in thread
}
} // namespace unittest
} // namespace oceanbase