378 lines
13 KiB
C++
378 lines
13 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#include <gtest/gtest.h>
|
|
|
|
#define protected public
|
|
#define private public
|
|
|
|
#include "env/ob_simple_cluster_test_base.h"
|
|
#include "env/ob_simple_server_restart_helper.h"
|
|
|
|
#include "storage/ddl/ob_tablet_ddl_kv.h"
|
|
#include "storage/tx_storage/ob_ls_service.h"
|
|
#include "storage/ob_direct_load_table_guard.h"
|
|
|
|
#undef private
|
|
#undef protected
|
|
|
|
static const char *TEST_FILE_NAME = "test_tablet_memtable_mit";
|
|
static const char *BORN_CASE_NAME = "TestTabletMemtable";
|
|
static const char *RESTART_CASE_NAME = "TestTabletMemtableRestart";
|
|
|
|
ObSimpleServerRestartHelper *helper_ptr = nullptr;
|
|
|
|
|
|
namespace oceanbase {
|
|
|
|
using namespace observer;
|
|
using namespace memtable;
|
|
using namespace storage;
|
|
using namespace share;
|
|
|
|
class TestRunCtx
|
|
{
|
|
public:
|
|
uint64_t tenant_id_ = 0;
|
|
};
|
|
TestRunCtx RunCtx;
|
|
ObLSID LS_ID;
|
|
ObTabletID TABLET_ID;
|
|
bool FLUSH_DISABLED;
|
|
|
|
namespace storage {
|
|
|
|
int ObDDLKV::flush(ObLSID ls_id)
|
|
{
|
|
if (FLUSH_DISABLED) {
|
|
return 0;
|
|
}
|
|
|
|
int ret = OB_SUCCESS;
|
|
if (!ready_for_flush()) {
|
|
return OB_ERR_UNEXPECTED;
|
|
} else {
|
|
ObTabletMemtableMgr *mgr = get_memtable_mgr();
|
|
ObTableHandleV2 handle;
|
|
EXPECT_EQ(OB_SUCCESS, mgr->get_first_frozen_memtable(handle));
|
|
if (handle.get_table() == this) {
|
|
mgr->release_head_memtable_(this);
|
|
STORAGE_LOG(INFO, "flush and release one ddl kv ddl_kv", KP(this));
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
}
|
|
|
|
namespace unittest {
|
|
|
|
|
|
class TestTabletMemtable : public ObSimpleClusterTestBase {
|
|
public:
|
|
TestTabletMemtable() : ObSimpleClusterTestBase(TEST_FILE_NAME) {}
|
|
~TestTabletMemtable() {}
|
|
|
|
void basic_test();
|
|
void create_test_table(const char* table_name);
|
|
|
|
private:
|
|
common::ObArenaAllocator allocator_; // for medium info
|
|
};
|
|
|
|
#define EXE_SQL(sql_str) \
|
|
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
|
|
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
|
|
|
|
#define EXE_SQL_FMT(...) \
|
|
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
|
|
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
|
|
|
|
#define WRITE_SQL_BY_CONN(conn, sql_str) \
|
|
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
|
|
ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
|
|
|
|
#define WRITE_SQL_FMT_BY_CONN(conn, ...) \
|
|
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
|
|
ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
|
|
|
|
#define READ_SQL_FMT_BY_CONN(conn, res, ...) \
|
|
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
|
|
ASSERT_EQ(OB_SUCCESS, conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), res));
|
|
|
|
#define DEF_VAL_FOR_SQL \
|
|
int ret = OB_SUCCESS; \
|
|
ObSqlString sql; \
|
|
int64_t affected_rows = 0; \
|
|
sqlclient::ObISQLConnection *connection = nullptr;
|
|
|
|
void TestTabletMemtable::create_test_table(const char* table_name)
|
|
{
|
|
DEF_VAL_FOR_SQL
|
|
ObString create_sql;
|
|
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().get_sql_proxy2().acquire(connection));
|
|
WRITE_SQL_FMT_BY_CONN(connection, "create table if not exists %s (c1 int, c2 int)", table_name);
|
|
|
|
HEAP_VAR(ObMySQLProxy::MySQLResult, res)
|
|
{
|
|
int64_t int_tablet_id = 0;
|
|
int64_t int_ls_id = 0;
|
|
int64_t retry_times = 10;
|
|
|
|
READ_SQL_FMT_BY_CONN(connection,
|
|
res,
|
|
"SELECT tablet_id, ls_id FROM oceanbase.DBA_OB_TABLE_LOCATIONS WHERE TABLE_NAME = \'%s\'",
|
|
table_name);
|
|
common::sqlclient::ObMySQLResult *result = res.mysql_result();
|
|
ASSERT_EQ(OB_SUCCESS, result->next());
|
|
ASSERT_EQ(OB_SUCCESS, result->get_int("tablet_id", int_tablet_id));
|
|
ASSERT_EQ(OB_SUCCESS, result->get_int("ls_id", int_ls_id));
|
|
TABLET_ID = int_tablet_id;
|
|
LS_ID = int_ls_id;
|
|
fprintf(stdout, "get table info finish : tablet_id = %ld, ls_id = %ld\n", TABLET_ID.id(), LS_ID.id());
|
|
}
|
|
}
|
|
|
|
void TestTabletMemtable::basic_test() {
|
|
share::ObTenantSwitchGuard tenant_guard;
|
|
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(RunCtx.tenant_id_));
|
|
|
|
create_test_table("test_tablet_memtable");
|
|
|
|
ObLSService *ls_svr = MTL(ObLSService *);
|
|
ObLSHandle ls_handle;
|
|
ASSERT_EQ(1001, LS_ID.id());
|
|
ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(LS_ID, ls_handle, ObLSGetMod::STORAGE_MOD));
|
|
|
|
ObLS *ls = nullptr;
|
|
ObLSIterator *ls_iter = nullptr;
|
|
ASSERT_NE(nullptr, ls = ls_handle.get_ls());
|
|
|
|
ObTabletHandle tablet_handle;
|
|
ObTablet *tablet = nullptr;
|
|
ASSERT_EQ(OB_SUCCESS, ls->get_tablet(TABLET_ID, tablet_handle));
|
|
ASSERT_NE(nullptr, tablet = tablet_handle.get_obj());
|
|
|
|
// *********** CREATE DATA METMABLE ************
|
|
ASSERT_EQ(OB_SUCCESS, tablet->create_memtable(1, SCN::min_scn(), false /*for_direct_load*/, false /*for_replay*/));
|
|
ObProtectedMemtableMgrHandle *protected_handle = nullptr;
|
|
ASSERT_EQ(OB_SUCCESS, tablet->get_protected_memtable_mgr_handle(protected_handle));
|
|
ASSERT_NE(nullptr, protected_handle);
|
|
ASSERT_EQ(true, protected_handle->has_memtable());
|
|
|
|
// *********** CHECK METMABLE TYPE ************
|
|
ObTableHandleV2 memtable_handle;
|
|
ObITabletMemtable *memtable = nullptr;
|
|
ASSERT_EQ(OB_SUCCESS, protected_handle->get_active_memtable(memtable_handle));
|
|
ASSERT_EQ(OB_SUCCESS, memtable_handle.get_tablet_memtable(memtable));
|
|
ASSERT_EQ(ObITable::TableType::DATA_MEMTABLE, memtable->get_table_type());
|
|
|
|
// *********** DO TABLET FREEZE ************
|
|
ObFreezer *freezer = nullptr;
|
|
ASSERT_NE(nullptr, freezer = ls->get_freezer());
|
|
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(TABLET_ID,
|
|
true,
|
|
0,
|
|
false, /* need_rewrite_meta */
|
|
ObFreezeSourceFlag::TEST_MODE));
|
|
ASSERT_EQ(OB_ENTRY_NOT_EXIST, protected_handle->get_active_memtable(memtable_handle));
|
|
ASSERT_EQ(OB_SUCCESS, protected_handle->get_boundary_memtable(memtable_handle));
|
|
ASSERT_EQ(OB_SUCCESS, memtable_handle.get_tablet_memtable(memtable));
|
|
STORAGE_LOG(INFO, "finish freeze data memtable", KPC(memtable));
|
|
|
|
// *********** CREATE DIRECT LOAD MEMTABLE ************
|
|
ASSERT_EQ(OB_SUCCESS, tablet->create_memtable(1, SCN::min_scn(), true /*for_direct_load*/, false /*for_replay*/));
|
|
ASSERT_EQ(OB_SUCCESS, protected_handle->get_active_memtable(memtable_handle));
|
|
ASSERT_EQ(OB_SUCCESS, memtable_handle.get_tablet_memtable(memtable));
|
|
ASSERT_EQ(ObITable::TableType::DIRECT_LOAD_MEMTABLE, memtable->get_table_type());
|
|
STORAGE_LOG(INFO, "finish create direct load memtable", KPC(memtable));
|
|
|
|
// *********** GET DIRECT LOAD MEMTABLE FOR WRITE ************
|
|
memtable->inc_write_ref();
|
|
|
|
// *********** CONCURRENT TABLET FREEZE ************
|
|
int64_t freeze_start_time = ObClockGenerator::getClock();
|
|
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(TABLET_ID,
|
|
false /*is_sync*/,
|
|
0,
|
|
false, /*need_rewrite_meta*/
|
|
ObFreezeSourceFlag::TEST_MODE));
|
|
|
|
sleep(2);
|
|
ASSERT_EQ(TabletMemtableFreezeState::ACTIVE, memtable->get_freeze_state());
|
|
STORAGE_LOG(INFO, "waiting write finish", KPC(memtable));
|
|
|
|
// *********** DIRECT LOAD MEMTABLE WRITE FINISH ************
|
|
memtable->dec_write_ref();
|
|
STORAGE_LOG(INFO, "write_ref_cnt should be zero", KPC(memtable));
|
|
sleep(1);
|
|
|
|
// *********** CHECK FREEZE RESULT ************
|
|
ASSERT_EQ(OB_ENTRY_NOT_EXIST, protected_handle->get_active_memtable(memtable_handle));
|
|
ASSERT_EQ(OB_SUCCESS, protected_handle->get_boundary_memtable(memtable_handle));
|
|
ASSERT_EQ(OB_SUCCESS, memtable_handle.get_tablet_memtable(memtable));
|
|
STORAGE_LOG(INFO, "get boundary memtable", KPC(memtable));
|
|
|
|
ASSERT_EQ(ObITable::TableType::DIRECT_LOAD_MEMTABLE, memtable->get_table_type());
|
|
ASSERT_NE(TabletMemtableFreezeState::ACTIVE, memtable->get_freeze_state());
|
|
ASSERT_EQ(true, memtable->is_in_prepare_list_of_data_checkpoint());
|
|
|
|
// *********** CREATE ANOTHER DIRECT LOAD MEMTABLE ************
|
|
ASSERT_EQ(OB_SUCCESS, tablet->create_memtable(1, SCN::min_scn(), true /*for_direct_load*/, false /*for_replay*/));
|
|
ASSERT_EQ(OB_SUCCESS, protected_handle->get_active_memtable(memtable_handle));
|
|
ASSERT_EQ(OB_SUCCESS, memtable_handle.get_tablet_memtable(memtable));
|
|
STORAGE_LOG(INFO, "create a new direct load memtable", KPC(memtable));
|
|
|
|
// *********** CONSTURCT A DIRECT LOAD TABLE GUARD ************
|
|
SCN fake_ddl_redo_scn = SCN::plus(SCN::min_scn(), 10);
|
|
ObDirectLoadTableGuard direct_load_guard(*tablet, fake_ddl_redo_scn, false);
|
|
|
|
// *********** CHECK DIRECT LOAD TABLE GUARD UASBLE ************
|
|
ObDDLKV *memtable_for_direct_load = nullptr;
|
|
ASSERT_EQ(OB_SUCCESS, direct_load_guard.prepare_memtable(memtable_for_direct_load));
|
|
ASSERT_NE(nullptr, memtable_for_direct_load);
|
|
ASSERT_EQ(OB_SUCCESS, memtable_for_direct_load->set_rec_scn(fake_ddl_redo_scn));
|
|
ASSERT_EQ(memtable, memtable_for_direct_load);
|
|
ASSERT_EQ(1, memtable_for_direct_load->get_write_ref());
|
|
direct_load_guard.reset();
|
|
ASSERT_EQ(0, memtable_for_direct_load->get_write_ref());
|
|
|
|
// *********** DO LOGSTREAM FREEZE ************
|
|
ASSERT_EQ(OB_SUCCESS, ls->logstream_freeze(checkpoint::INVALID_TRACE_ID,
|
|
true, /* is_sync */
|
|
0,
|
|
ObFreezeSourceFlag::TEST_MODE));
|
|
STORAGE_LOG(INFO, "finish logstream freeze");
|
|
|
|
// *********** CHECK LOGSTREAM FREEZE RESULT ************
|
|
ASSERT_EQ(OB_ENTRY_NOT_EXIST, protected_handle->get_active_memtable(memtable_handle));
|
|
ASSERT_EQ(OB_SUCCESS, protected_handle->get_boundary_memtable(memtable_handle));
|
|
ASSERT_EQ(OB_SUCCESS, memtable_handle.get_tablet_memtable(memtable));
|
|
ASSERT_EQ(ObITable::TableType::DIRECT_LOAD_MEMTABLE, memtable->get_table_type());
|
|
|
|
ASSERT_NE(TabletMemtableFreezeState::ACTIVE, memtable->get_freeze_state());
|
|
ASSERT_EQ(true, memtable->is_in_prepare_list_of_data_checkpoint());
|
|
STORAGE_LOG(INFO, "finish check logstream freeze result", KPC(memtable));
|
|
|
|
// *********** WAIT ALL MEMTABLES FLUSH ************
|
|
FLUSH_DISABLED = false;
|
|
int64_t retry_times = 0;
|
|
while (retry_times <= 10) {
|
|
sleep(1);
|
|
if (protected_handle->has_memtable()) {
|
|
ObSEArray<ObTableHandleV2, 2> handles;
|
|
ASSERT_EQ(OB_SUCCESS, protected_handle->get_all_memtables(handles));
|
|
STORAGE_LOG(INFO, "wait all memtable flushed", K(retry_times));
|
|
for (int i = 0; i < handles.count(); i++) {
|
|
ObITabletMemtable *memtable = static_cast<ObITabletMemtable*>(handles.at(i).get_table());
|
|
STORAGE_LOG(INFO, "PRINT Table", K(memtable->key_), K(memtable->get_freeze_state()));
|
|
}
|
|
retry_times++;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
ASSERT_FALSE(protected_handle->has_memtable());
|
|
fprintf(stdout, "finish basic test\n");
|
|
}
|
|
|
|
TEST_F(TestTabletMemtable, observer_start) { SERVER_LOG(INFO, "observer_start succ"); }
|
|
|
|
TEST_F(TestTabletMemtable, add_tenant)
|
|
{
|
|
// create tenant
|
|
ASSERT_EQ(OB_SUCCESS, create_tenant());
|
|
ASSERT_EQ(OB_SUCCESS, get_tenant_id(RunCtx.tenant_id_));
|
|
ASSERT_NE(0, RunCtx.tenant_id_);
|
|
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
|
|
}
|
|
|
|
TEST_F(TestTabletMemtable, init_config)
|
|
{
|
|
DEF_VAL_FOR_SQL
|
|
common::ObMySQLProxy &sys_proxy = get_curr_simple_server().get_sql_proxy();
|
|
sqlclient::ObISQLConnection *sys_conn = nullptr;
|
|
ASSERT_EQ(OB_SUCCESS, sys_proxy.acquire(sys_conn));
|
|
ASSERT_NE(nullptr, sys_conn);
|
|
sleep(2);
|
|
}
|
|
|
|
TEST_F(TestTabletMemtable, create_table)
|
|
{
|
|
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
|
|
|
OB_LOG(INFO, "create_table start");
|
|
ObSqlString sql;
|
|
int64_t affected_rows = 0;
|
|
EXE_SQL("create table if not exists test_tablet_memtable (c1 int, c2 int, primary key(c1))");
|
|
OB_LOG(INFO, "create_table succ");
|
|
}
|
|
|
|
TEST_F(TestTabletMemtable, basic_test)
|
|
{
|
|
FLUSH_DISABLED = true;
|
|
basic_test();
|
|
}
|
|
|
|
class TestTabletMemtableRestart : public ObSimpleClusterTestBase
|
|
{
|
|
public:
|
|
TestTabletMemtableRestart() : ObSimpleClusterTestBase(TEST_FILE_NAME) {}
|
|
private:
|
|
|
|
};
|
|
|
|
TEST_F(TestTabletMemtableRestart, observer_restart)
|
|
{
|
|
SERVER_LOG(INFO, "observer restart succ");
|
|
}
|
|
|
|
} // namespace unittest
|
|
} // namespace oceanbase
|
|
|
|
|
|
int main(int argc, char **argv)
|
|
{
|
|
int c = 0;
|
|
int time_sec = 0;
|
|
int concurrency = 1;
|
|
char *log_level = (char *)"DEBUG";
|
|
while (EOF != (c = getopt(argc, argv, "t:l:"))) {
|
|
switch (c) {
|
|
case 't':
|
|
time_sec = atoi(optarg);
|
|
break;
|
|
case 'l':
|
|
log_level = optarg;
|
|
oceanbase::unittest::ObSimpleClusterTestBase::enable_env_warn_log_ = false;
|
|
break;
|
|
case 'c':
|
|
concurrency = atoi(optarg);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
std::string gtest_file_name = std::string(TEST_FILE_NAME) + "_gtest.log";
|
|
oceanbase::unittest::init_gtest_output(gtest_file_name);
|
|
|
|
int ret = 0;
|
|
ObSimpleServerRestartHelper restart_helper(argc, argv, TEST_FILE_NAME, BORN_CASE_NAME,
|
|
RESTART_CASE_NAME);
|
|
helper_ptr = &restart_helper;
|
|
restart_helper.set_sleep_sec(time_sec);
|
|
restart_helper.run();
|
|
|
|
return ret;
|
|
}
|