[FIX] redesign ObFreezer to support multiple tablet freeze

This commit is contained in:
ZenoWang 2024-06-17 11:50:32 +00:00 committed by ob-robot
parent 0a4bfeca85
commit ca3bf29fe1
28 changed files with 1436 additions and 1022 deletions

View File

@ -323,8 +323,11 @@ int SimpleServerHelper::freeze(uint64_t tenant_id, ObLSID ls_id, ObTabletID tabl
int ret = OB_SUCCESS;
MTL_SWITCH(tenant_id) {
ObLSHandle ls_handle;
const bool need_rewrite_meta = false;
const bool is_sync = true;
const bool abs_timeout_ts = ObClockGenerator::getClock() + 60LL * 1000LL * 1000LL;
if (OB_FAIL(MTL(ObLSService*)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
} else if (OB_FAIL(ls_handle.get_ls()->tablet_freeze(tablet_id, true))) {
} else if (OB_FAIL(ls_handle.get_ls()->tablet_freeze(tablet_id, need_rewrite_meta, is_sync, abs_timeout_ts))) {
}
}
return ret;

View File

@ -34,8 +34,10 @@ storage_dml_unittest(test_ls_tablet_info_writer_and_reader test_ls_tablet_info_w
storage_dml_unittest(test_transfer_barrier test_transfer_barrier.cpp)
add_subdirectory(checkpoint)
add_subdirectory(blocksstable)
add_subdirectory(tenant_snapshot)
add_subdirectory(tablet_memtable)
target_link_libraries(test_memtable PUBLIC mock_tx_ctx mock_tx_log_adapter)

View File

@ -0,0 +1 @@
storage_unittest(test_direct_load_memtable)

View File

@ -0,0 +1,171 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#define protected public
#define private public
#define UNITTEST
#include <iostream>
#include <thread>
#include "mtlenv/mock_tenant_module_env.h"
#include "storage/init_basic_struct.h"
#include "storage/test_tablet_helper.h"
#include "storage/ls/ob_ls.h"
#undef private
#undef protected
int64_t TEST_TENANT_ID = 0;
const ObLSID TEST_LS_ID(1001);
const int64_t TEST_TABLE_ID = 500001;
const int64_t TEST_TABLET_ID = 200001;
namespace oceanbase
{
namespace storage
{
class TestDirectLoadPlusOffline : public ::testing::Test
{
public:
TestDirectLoadPlusOffline() {}
virtual ~TestDirectLoadPlusOffline() {}
virtual void SetUp() override { ASSERT_TRUE(MockTenantModuleEnv::get_instance().is_inited()); }
virtual void TearDown() override {}
static void SetUpTestCase()
{
EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init());
ObServerCheckpointSlogHandler::get_instance().is_started_ = true;
}
static void TearDownTestCase() { MockTenantModuleEnv::get_instance().destroy(); }
void offline_ls_with_active_direct_load_memtable();
public:
void create_ls(const ObLSID ls_id, ObLS *&ls);
void create_tablets(const ObLSID ls_id);
public:
common::ObArenaAllocator allocator_;
ObLS *ls_;
};
void TestDirectLoadPlusOffline::create_ls(const ObLSID ls_id, ObLS *&ls)
{
uint64_t tenant_id = MTL_ID();
ObCreateLSArg arg;
ObLSService *ls_svr = MTL(ObLSService*);
ObLSHandle handle;
obrpc::ObBatchCreateTabletArg create_tablet_arg;
ObMemberList member_list;
int64_t paxos_replica_num = 1;
int64_t leader_epoch = 0;
ObTabletHandle tablet_handle;
ObTablet *tablet =NULL;
(void)member_list.add_server(MockTenantModuleEnv::get_instance().self_addr_);
// create ls
ASSERT_EQ(OB_SUCCESS, storage::gen_create_ls_arg(tenant_id, ls_id, arg));
ASSERT_EQ(OB_SUCCESS, ls_svr->create_ls(arg));
EXPECT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, handle, ObLSGetMod::STORAGE_MOD));
ls = handle.get_ls();
ASSERT_NE(nullptr, ls);
GlobalLearnerList learner_list;
ASSERT_EQ(OB_SUCCESS, ls->set_initial_member_list(member_list,
paxos_replica_num,
learner_list));
for (int i=0;i<15;i++) {
ObRole role;
int64_t proposal_id = 0;
ASSERT_EQ(OB_SUCCESS, ls->get_log_handler()->get_role(role, proposal_id));
if (role == ObRole::LEADER) {
break;
}
::sleep(1);
}
}
void TestDirectLoadPlusOffline::create_tablets(const ObLSID ls_id){
// 1. create a tablet
ObLSHandle handle;
ObLSService *ls_svr = MTL(ObLSService *);
EXPECT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, handle, ObLSGetMod::STORAGE_MOD));
ObLS *ls = handle.get_ls();
ASSERT_NE(nullptr, ls);
share::schema::ObTableSchema table_schema;
const int64_t start_time = ObClockGenerator::getClock();
ObSqlString str;
ObTabletID tablet_id = ObTabletID(TEST_TABLET_ID);
str.assign_fmt("test_table_%ld", TEST_TABLE_ID);
ASSERT_EQ(OB_SUCCESS, build_test_schema(table_schema, TEST_TABLE_ID));
ASSERT_EQ(OB_SUCCESS, TestTabletHelper::create_tablet(handle, tablet_id, table_schema, allocator_));
// 2. test tablet
ObTabletHandle tablet_handle;
ObTablet *tablet = nullptr;
STORAGE_LOG(INFO, "TestLSService::tablet_test 2.");
ASSERT_EQ(OB_SUCCESS, ls->get_tablet(tablet_id, tablet_handle));
tablet = tablet_handle.get_obj();
ASSERT_NE(nullptr, tablet);
ASSERT_EQ(tablet_id, tablet->get_tablet_meta().tablet_id_);
const int64_t spend_time_ms = (ObClockGenerator::getClock() - start_time) / 1000;
}
void TestDirectLoadPlusOffline::offline_ls_with_active_direct_load_memtable()
{
int ret = OB_SUCCESS;
create_ls(TEST_LS_ID, ls_);
create_tablets(TEST_LS_ID);
ASSERT_EQ(OB_SUCCESS,
ls_->get_tablet_svr()->create_memtable(ObTabletID(TEST_TABLET_ID),
0 /* schema version */,
true /* for_direct_load */,
false /*for_replay*/,
SCN::min_scn() /*clog_checkpoint*/));
ASSERT_EQ(OB_SUCCESS, ls_->offline());
}
TEST_F(TestDirectLoadPlusOffline, offline_ls_with_active_direct_load_memtable)
{
TEST_TENANT_ID = MTL_ID();
offline_ls_with_active_direct_load_memtable();
}
} // namespace storage
} // namespace oceanbase
int main(int argc, char **argv)
{
int ret = 1;
system("rm -f test_direct_load_plus_offline.log*");
system("rm -fr run_*");
ObLogger &logger = ObLogger::get_logger();
logger.set_file_name("test_direct_load_plus_offline.log", true);
logger.set_log_level(OB_LOG_LEVEL_INFO);
testing::InitGoogleTest(&argc, argv);
ret = RUN_ALL_TESTS();
return ret;
}

View File

@ -48,7 +48,7 @@ using namespace palf;
namespace storage
{
class TestTxDataTable;
class TestTabletFreezeForDirectLoad;
class MockTxDataTable;
class MockTxTable;
static const uint64_t TEST_TENANT_ID = 1;
@ -111,7 +111,7 @@ public:
class MockTxDataTable : public ObTxDataTable
{
friend TestTxDataTable;
friend TestTabletFreezeForDirectLoad;
public:
MockTxDataTable() : ObTxDataTable() {}
@ -154,11 +154,11 @@ public:
MockTxTable(MockTxDataTable *tx_data_table) : ObTxTable(*tx_data_table) {}
};
class TestTxDataTable : public ::testing::Test
class TestTabletFreezeForDirectLoad : public ::testing::Test
{
public:
TestTxDataTable() : tx_table_(&tx_data_table_) {}
virtual ~TestTxDataTable() {}
TestTabletFreezeForDirectLoad() : tx_table_(&tx_data_table_) {}
virtual ~TestTabletFreezeForDirectLoad() {}
static void SetUpTestCase()
{
@ -221,7 +221,7 @@ public:
ObArenaAllocator allocator_;
};
void TestTxDataTable::make_freezing_to_frozen_(ObTxDataMemtableMgr *memtable_mgr)
void TestTabletFreezeForDirectLoad::make_freezing_to_frozen_(ObTxDataMemtableMgr *memtable_mgr)
{
ObArray<ObTableHandleV2> memtables;
ObTxDataMemtable *memtable = nullptr;
@ -234,7 +234,7 @@ void TestTxDataTable::make_freezing_to_frozen_(ObTxDataMemtableMgr *memtable_mgr
}
}
void TestTxDataTable::insert_tx_data_()
void TestTabletFreezeForDirectLoad::insert_tx_data_()
{
insert_start_scn.convert_for_logservice(ObTimeUtil::current_time_ns());
ObTxData *tx_data = nullptr;
@ -273,7 +273,7 @@ void TestTxDataTable::insert_tx_data_()
}
}
void TestTxDataTable::insert_rollback_tx_data_()
void TestTabletFreezeForDirectLoad::insert_rollback_tx_data_()
{
auto tx_id = transaction::ObTransID(INT64_MAX-2);
share::SCN max_end_scn = share::SCN::min_scn();
@ -306,7 +306,7 @@ void TestTxDataTable::insert_rollback_tx_data_()
}
}
void TestTxDataTable::insert_abort_tx_data_()
void TestTabletFreezeForDirectLoad::insert_abort_tx_data_()
{
insert_start_scn.convert_for_logservice(ObTimeUtil::current_time_ns());
ObTxData *tx_data = nullptr;
@ -328,7 +328,7 @@ void TestTxDataTable::insert_abort_tx_data_()
ASSERT_EQ(OB_SUCCESS, tx_data_table_.insert(tx_data));
}
void TestTxDataTable::generate_past_commit_version_(ObCommitVersionsArray &past_commit_versions)
void TestTabletFreezeForDirectLoad::generate_past_commit_version_(ObCommitVersionsArray &past_commit_versions)
{
share::SCN start_scn = share::SCN::minus(insert_start_scn, 300LL * ONE_SEC_NS);
share::SCN commit_version = share::SCN::plus(start_scn, 2LL * ONE_SEC_NS);
@ -339,7 +339,7 @@ void TestTxDataTable::generate_past_commit_version_(ObCommitVersionsArray &past_
}
}
void TestTxDataTable::set_freezer_()
void TestTabletFreezeForDirectLoad::set_freezer_()
{
ObLSID ls_id(1);
ObLSWRSHandler ls_loop_worker;
@ -351,7 +351,7 @@ void TestTxDataTable::set_freezer_()
tx_data_table_.freezer_.init(&ls_);
}
void TestTxDataTable::init_memtable_mgr_(ObTxDataMemtableMgr *memtable_mgr)
void TestTabletFreezeForDirectLoad::init_memtable_mgr_(ObTxDataMemtableMgr *memtable_mgr)
{
ASSERT_NE(nullptr, memtable_mgr);
memtable_mgr->set_freezer(&tx_data_table_.freezer_);
@ -359,7 +359,7 @@ void TestTxDataTable::init_memtable_mgr_(ObTxDataMemtableMgr *memtable_mgr)
ASSERT_EQ(1, memtable_mgr->get_memtable_count_());
}
void TestTxDataTable::check_freeze_(ObTxDataMemtableMgr *memtable_mgr,
void TestTabletFreezeForDirectLoad::check_freeze_(ObTxDataMemtableMgr *memtable_mgr,
ObTxDataMemtable *&freezing_memtable,
ObTxDataMemtable *&active_memtable)
{
@ -383,7 +383,7 @@ void TestTxDataTable::check_freeze_(ObTxDataMemtableMgr *memtable_mgr,
freezing_memtable->set_state(ObTxDataMemtable::State::FROZEN);
}
void TestTxDataTable::do_basic_test()
void TestTabletFreezeForDirectLoad::do_basic_test()
{
// init tx data table
ASSERT_EQ(OB_SUCCESS, tx_data_table_.init(ls_));
@ -457,7 +457,7 @@ void TestTxDataTable::do_basic_test()
memtable_mgr->offline();
}
void TestTxDataTable::do_undo_status_test()
void TestTabletFreezeForDirectLoad::do_undo_status_test()
{
// init tx data table
ASSERT_EQ(OB_SUCCESS, tx_data_table_.init(ls_));
@ -516,7 +516,7 @@ void TestTxDataTable::do_undo_status_test()
}
}
void TestTxDataTable::test_serialize_with_action_cnt_(int cnt)
void TestTabletFreezeForDirectLoad::test_serialize_with_action_cnt_(int cnt)
{
ObTxData *tx_data = nullptr;
ObTxDataGuard tx_data_guard;
@ -562,7 +562,7 @@ void TestTxDataTable::test_serialize_with_action_cnt_(int cnt)
}
void TestTxDataTable::do_tx_data_serialize_test()
void TestTabletFreezeForDirectLoad::do_tx_data_serialize_test()
{
ASSERT_EQ(OB_SUCCESS, tx_data_table_.init(ls_));
ObTxDataMemtableMgr *memtable_mgr = tx_data_table_.get_memtable_mgr_();
@ -578,7 +578,7 @@ void TestTxDataTable::do_tx_data_serialize_test()
memtable_mgr->offline();
}
void TestTxDataTable::test_commit_versions_serialize_()
void TestTabletFreezeForDirectLoad::test_commit_versions_serialize_()
{
ObCommitVersionsArray cur_array;
ObCommitVersionsArray past_array;
@ -637,7 +637,7 @@ void TestTxDataTable::test_commit_versions_serialize_()
ASSERT_EQ(s_pos, d_pos);
}
void TestTxDataTable::do_repeat_insert_test() {
void TestTabletFreezeForDirectLoad::do_repeat_insert_test() {
ASSERT_EQ(OB_SUCCESS, tx_data_table_.init(ls_));
set_freezer_();
@ -682,7 +682,7 @@ void TestTxDataTable::do_repeat_insert_test() {
}
void TestTxDataTable::fake_ls_(ObLS &ls)
void TestTabletFreezeForDirectLoad::fake_ls_(ObLS &ls)
{
ls.ls_meta_.tenant_id_ = 1;
ls.ls_meta_.ls_id_.id_ = 1001;
@ -692,7 +692,7 @@ void TestTxDataTable::fake_ls_(ObLS &ls)
ls.ls_meta_.rebuild_seq_ = 0;
}
void TestTxDataTable::do_print_leak_slice_test()
void TestTabletFreezeForDirectLoad::do_print_leak_slice_test()
{
const int32_t CONCURRENCY = 4;
ObMemAttr attr;
@ -727,17 +727,17 @@ void TestTxDataTable::do_print_leak_slice_test()
slice_allocator.destroy();
}
TEST_F(TestTxDataTable, basic_test)
TEST_F(TestTabletFreezeForDirectLoad, basic_test)
{
tx_data_num = const_data_num;
do_basic_test();
}
TEST_F(TestTxDataTable, repeat_insert_test) { do_repeat_insert_test(); }
TEST_F(TestTabletFreezeForDirectLoad, repeat_insert_test) { do_repeat_insert_test(); }
TEST_F(TestTxDataTable, undo_status_test) { do_undo_status_test(); }
TEST_F(TestTabletFreezeForDirectLoad, undo_status_test) { do_undo_status_test(); }
TEST_F(TestTxDataTable, serialize_test) { do_tx_data_serialize_test(); }
TEST_F(TestTabletFreezeForDirectLoad, serialize_test) { do_tx_data_serialize_test(); }
// TEST_F(TestTxDataTable, print_leak_slice) { do_print_leak_slice_test(); }

View File

@ -30,6 +30,11 @@ function(errsim_ha_unittest_observer case)
target_link_libraries(${case} PRIVATE gtest gmock observer_test oceanbase)
endfunction()
function(ob_freeze_observer case)
ob_unittest(${ARGV})
target_link_libraries(${case} PRIVATE gtest gmock observer_test oceanbase)
endfunction()
function(ob_offline_observer case case_file)
add_executable(${case}
EXCLUDE_FROM_ALL
@ -66,7 +71,6 @@ ob_unittest_observer(test_balance_operator test_tenant_balance_operator.cpp)
ob_unittest_observer(test_transfer_partition_task test_transfer_partition_task.cpp)
ob_unittest_observer(test_mds_table_checkpoint test_mds_table_checkpoint.cpp)
ob_unittest_observer(test_ob_black_list_service test_ob_black_list_service.cpp)
ob_unittest_observer(test_ob_minor_freeze test_ob_minor_freeze.cpp)
ob_unittest_observer(test_ob_table_lock_service test_ob_table_lock_service.cpp)
ob_unittest_observer(test_ob_obj_lock_garbage_collector test_ob_obj_lock_garbage_collector.cpp)
ob_unittest_observer(test_observer_expand_shrink test_observer_expand_shrink.cpp)
@ -116,6 +120,13 @@ ob_unittest_observer(test_transfer_commit_action test_transfer_with_commit_actio
ob_unittest_observer(test_transfer_rollback_to test_transfer_between_rollback_to.cpp)
ob_unittest_observer(test_memtable_new_safe_to_destroy test_memtable_new_safe_to_destroy.cpp)
ob_unittest_observer(test_tablet_to_ls_cache test_tablet_to_ls_cache.cpp)
####### freeze case #######
#ob_freeze_observer(test_frequently_freeze freeze/test_frequently_freeze.cpp)
ob_freeze_observer(test_ob_minor_freeze freeze/test_ob_minor_freeze.cpp)
####### freeze case #######
# TODO(muwei.ym): open later
ob_ha_unittest_observer(test_transfer_handler storage_ha/test_transfer_handler.cpp)
ob_ha_unittest_observer(test_transfer_and_restart_basic storage_ha/test_transfer_and_restart_basic.cpp)
@ -126,4 +137,4 @@ ob_ha_unittest_observer(test_transfer_doing_stage_restart_with_mds_flush storage
ob_ha_unittest_observer(test_transfer_complete_restart_with_mds_flush storage_ha/test_transfer_complete_restart_with_mds_flush.cpp)
ob_ha_unittest_observer(test_transfer_with_empty_shell storage_ha/test_transfer_with_empty_shell.cpp)
ob_ha_unittest_observer(test_mds_transaction test_mds_transaction.cpp)
errsim_ha_unittest_observer(errsim_test_transfer_handler errsim/storage_ha/errsim_test_transfer_handler.cpp)
errsim_ha_unittest_observer(errsim_test_transfer_handler errsim/storage_ha/errsim_test_transfer_handler.cpp)

View File

@ -41,8 +41,8 @@ public:
observer::ObSimpleServer& get_curr_simple_server() { return *cluster_; }
int create_tenant(const char *tenant_name = "tt1",
const char *memory_size = "2G",
const char *log_disk_size = "2G",
const char *memory_size = "4G",
const char *log_disk_size = "4G",
const bool oracle_mode = false,
int64_t tenant_cpu = 2);
int delete_tenant(const char *tenant_name = "tt1");

View File

@ -0,0 +1,335 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#define USING_LOG_PREFIX STORAGE
#define protected public
#define private public
#include "env/ob_simple_cluster_test_base.h"
#include "lib/mysqlclient/ob_mysql_result.h"
#include "storage/tx_storage/ob_ls_service.h"
#undef private
#undef protected
using namespace oceanbase::transaction;
using namespace oceanbase::storage;
using namespace oceanbase::unittest;
class TestRunCtx
{
public:
uint64_t tenant_id_ = 0;
int64_t time_sec_ = 0;
};
TestRunCtx RunCtx;
namespace oceanbase
{
namespace storage
{
class TestFrequentlyFreeze : public ObSimpleClusterTestBase
{
public:
TestFrequentlyFreeze() : ObSimpleClusterTestBase("test_frequently_freeze_") {}
void fill_data(const int64_t idx);
void async_tablet_freeze(const int64_t idx);
void sync_tablet_freeze(const int64_t idx);
void ls_freeze(const bool is_sync);
void check_async_freeze_tablets_empty();
void get_ls(const share::ObLSID &ls_id, ObLS *&ls);
};
#define EXE_SQL(sql_str) \
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
#define EXE_SQL_FMT(...) \
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
#define WRITE_SQL_BY_CONN(conn, sql_str) \
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
#define WRITE_SQL_FMT_BY_CONN(conn, ...) \
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
#define DEF_VAL_FOR_SQL \
int ret = OB_SUCCESS; \
ObSqlString sql; \
int64_t affected_rows = 0;
const int64_t SINGLE_TABLE_PARTITION_COUNT = 4000;
const int64_t TABLET_FREEZE_THREAD_COUNT = 10;
int64_t FIRST_TABLET_ID = 0;
int64_t LAST_TABLET_ID = 0;
void TestFrequentlyFreeze::get_ls(const share::ObLSID &ls_id, ObLS *&ls)
{
ls = nullptr;
ObLSIterator *ls_iter = nullptr;
ObLSHandle ls_handle;
ObLSService *ls_svr = MTL(ObLSService *);
ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD));
ASSERT_NE(nullptr, ls = ls_handle.get_ls());
}
void TestFrequentlyFreeze::fill_data(const int64_t idx)
{
DEF_VAL_FOR_SQL
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
sqlclient::ObISQLConnection *connection = nullptr;
ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection));
ASSERT_NE(nullptr, connection);
WRITE_SQL_BY_CONN(connection, "set ob_trx_timeout = 3000000000");
WRITE_SQL_BY_CONN(connection, "set ob_trx_idle_timeout = 3000000000");
WRITE_SQL_BY_CONN(connection, "set ob_query_timeout = 3000000000");
WRITE_SQL_FMT_BY_CONN(connection, "create sequence seq_%ld cache 1000000 noorder;", idx);
STORAGE_LOG(INFO, "start fill data", K(idx));
for (int i = 0; i < 5; i++) {
fprintf(stdout, "start fill data. thread_idx = %ld, round = %d\n", idx, i);
WRITE_SQL_FMT_BY_CONN(
connection,
"Insert /*+ parallel(4) enable_parallel_dml */ into gl_t_%ld select seq_%ld.nextval, 'ob' from "
"table(generator(20000));",
idx,
idx);
fprintf(stdout, "finish fill data. thread_idx = %ld, round = %d\n", idx, i);
}
}
void TestFrequentlyFreeze::async_tablet_freeze(const int64_t idx)
{
ObLS *ls = nullptr;
(void)get_ls(share::ObLSID(1001), ls);
int64_t int_tablet_id_to_freeze = FIRST_TABLET_ID + idx;
fprintf(stdout, "async tablet freeze start. thread = %ld\n", idx);
STORAGE_LOG(INFO, "start async tablet freeze", K(idx));
while (int_tablet_id_to_freeze <= LAST_TABLET_ID) {
const bool is_sync = false;
ObTabletID tablet_to_freeze(int_tablet_id_to_freeze);
STORAGE_LOG(INFO, "start tablet freeze", K(tablet_to_freeze), K(is_sync), K(idx));
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(tablet_to_freeze, is_sync));
usleep(100 * 1000);
STORAGE_LOG(INFO, "finish tablet freeze", K(tablet_to_freeze), K(is_sync), K(idx));
int_tablet_id_to_freeze += TABLET_FREEZE_THREAD_COUNT;
}
fprintf(stdout, "async tablet freeze finish. thread = %ld\n", idx);
STORAGE_LOG(INFO, "one async tablet freeze task finish", K(idx));
}
void TestFrequentlyFreeze::sync_tablet_freeze(const int64_t idx)
{
ObLS *ls = nullptr;
(void)get_ls(share::ObLSID(1001), ls);
ObTabletID tablet_to_freeze(FIRST_TABLET_ID + idx);
fprintf(stdout, "sync tablet freeze start. thread = %ld\n", idx);
STORAGE_LOG(INFO, "start sync tablet freeze", K(idx));
while (tablet_to_freeze.id() <= LAST_TABLET_ID) {
const bool is_sync = true;
const int64_t abs_timeout_ts = ObClockGenerator::getClock() + 600LL * 1000LL * 1000LL;
STORAGE_LOG(INFO, "start tablet freeze", K(tablet_to_freeze), K(is_sync), K(idx));
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(tablet_to_freeze, is_sync, abs_timeout_ts));
::sleep(2);
tablet_to_freeze = ObTabletID(tablet_to_freeze.id() + TABLET_FREEZE_THREAD_COUNT);
STORAGE_LOG(INFO, "finish tablet freeze", K(tablet_to_freeze), K(is_sync), K(idx));
}
fprintf(stdout, "sync tablet freeze finish. thread = %ld\n", idx);
STORAGE_LOG(INFO, "one async tablet freeze task finish", K(idx));
}
void TestFrequentlyFreeze::ls_freeze(const bool is_sync)
{
ObLS *ls = nullptr;
(void)get_ls(share::ObLSID(1001), ls);
fprintf(stdout, "ls freeze start. is_sync = %d\n", is_sync);
STORAGE_LOG(INFO, "start ls freeze", K(is_sync));
const int64_t abs_timeout_ts = ObClockGenerator::getClock() + 600LL * 1000LL * 1000LL;
for (int i = 0; i < 4; i++) {
ASSERT_EQ(OB_SUCCESS, ls->logstream_freeze(-1, is_sync, abs_timeout_ts));
sleep(200);
}
fprintf(stdout, "ls freeze finish. is_sync = %d\n", is_sync);
}
void TestFrequentlyFreeze::check_async_freeze_tablets_empty()
{
ObLS *ls = nullptr;
(void)get_ls(share::ObLSID(1001), ls);
bool async_freeze_finished = false;
int64_t max_retry_times = 100;
while (max_retry_times-- > 0) {
if (ls->get_freezer()->get_async_freeze_tablets().empty()) {
async_freeze_finished = true;
break;
}
sleep(1);
}
ASSERT_EQ(true, async_freeze_finished);
fprintf(stdout, "check async freeze tablets finish. \n");
}
TEST_F(TestFrequentlyFreeze, observer_start)
{
SERVER_LOG(INFO, "observer_start succ");
}
TEST_F(TestFrequentlyFreeze, add_tenant)
{
ASSERT_EQ(OB_SUCCESS, create_tenant());
ASSERT_EQ(OB_SUCCESS, get_tenant_id(RunCtx.tenant_id_));
ASSERT_NE(0, RunCtx.tenant_id_);
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
}
TEST_F(TestFrequentlyFreeze, create_table)
{
DEF_VAL_FOR_SQL
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
sqlclient::ObISQLConnection *connection = nullptr;
ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection));
ASSERT_NE(nullptr, connection);
WRITE_SQL_BY_CONN(connection, "set ob_trx_timeout = 3000000000");
WRITE_SQL_BY_CONN(connection, "set ob_trx_idle_timeout = 3000000000");
WRITE_SQL_BY_CONN(connection, "set ob_query_timeout = 3000000000");
FIRST_TABLET_ID = 200000 + 1;
WRITE_SQL_FMT_BY_CONN(
connection,
"create table gl_t_%d (sid int primary key, sname varchar(10)) partition by hash(sid) partitions %ld",
0,
SINGLE_TABLE_PARTITION_COUNT);
WRITE_SQL_FMT_BY_CONN(
connection,
"create table gl_t_%d (sid int primary key, sname varchar(10)) partition by hash(sid) partitions %ld",
1,
SINGLE_TABLE_PARTITION_COUNT);
WRITE_SQL_FMT_BY_CONN(
connection,
"create table gl_t_%d (sid int primary key, sname varchar(10)) partition by hash(sid) partitions %ld",
2,
SINGLE_TABLE_PARTITION_COUNT);
LAST_TABLET_ID = 200000 + SINGLE_TABLE_PARTITION_COUNT * 3;
}
/**
* case主要用于验证在超多Tablet且都存在少量写入的场景下触发频繁的冻结是否会有问题
*
* 1. 4000hash
* 2. 5
* 3. 20线tablet冻结1010101
* 13Tablet级冻结
* 4. 1线5020
* 5. 1线5020
* 6. ObFreezer中没有残留的待异步的冻结的分区
*/
TEST_F(TestFrequentlyFreeze, frequently_freeze)
{
int ret = OB_SUCCESS;
std::vector<std::thread> worker_threads;
for (int i = 0; i < 3; i++) {
worker_threads.push_back(std::thread([i, this]() {
int ret = OB_SUCCESS;
MTL_SWITCH(RunCtx.tenant_id_) { this->fill_data(i); };
}));
}
::sleep(30);
for (int i = 0; i < TABLET_FREEZE_THREAD_COUNT; i++) {
worker_threads.push_back(std::thread([i, this]() {
int ret = OB_SUCCESS;
MTL_SWITCH(RunCtx.tenant_id_)
{
for (int k = 0; k < 10; k++) {
this->async_tablet_freeze(i);
sleep(10);
}
};
}));
}
for (int i = 0; i < TABLET_FREEZE_THREAD_COUNT; i++) {
worker_threads.push_back(std::thread([i, this]() {
int ret = OB_SUCCESS;
MTL_SWITCH(RunCtx.tenant_id_)
{
this->sync_tablet_freeze(i);
};
}));
}
worker_threads.push_back(std::thread([this]() {
int ret = OB_SUCCESS;
MTL_SWITCH(RunCtx.tenant_id_) { this->ls_freeze(true); };
}));
sleep(100);
worker_threads.push_back(std::thread([this]() {
int ret = OB_SUCCESS;
MTL_SWITCH(RunCtx.tenant_id_) { this->ls_freeze(false); };
}));
for (auto &my_thread : worker_threads) {
my_thread.join();
}
MTL_SWITCH(RunCtx.tenant_id_) { check_async_freeze_tablets_empty(); }
}
} // namespace storage
} // namespace oceanbase
int main(int argc, char **argv)
{
int64_t c = 0;
int64_t time_sec = 0;
char *log_level = (char*)"WDIAG";
while(EOF != (c = getopt(argc,argv,"t:l:"))) {
switch(c) {
case 't':
time_sec = atoi(optarg);
break;
case 'l':
log_level = optarg;
oceanbase::unittest::ObSimpleClusterTestBase::enable_env_warn_log_ = false;
break;
default:
break;
}
}
oceanbase::unittest::init_log_and_gtest(argc, argv);
OB_LOGGER.set_log_level(log_level);
LOG_INFO("main>>>");
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -337,7 +337,7 @@ void ObMinorFreezeTest::logstream_freeze()
for (int j = 0; j < OB_DEFAULT_TABLE_COUNT; ++j) {
int ret = OB_EAGAIN;
while (OB_EAGAIN == ret) {
ret = ls_handles_.at(j).get_ls()->logstream_freeze((i % 2 == 0) ? true : false);
ret = ls_handles_.at(j).get_ls()->logstream_freeze(checkpoint::INVALID_TRACE_ID, true/*is_sync*/, 0);
if (OB_EAGAIN == ret) {
ob_usleep(rand() % SLEEP_TIME);
@ -362,7 +362,7 @@ void ObMinorFreezeTest::tablet_freeze()
for (int j = 0; j < OB_DEFAULT_TABLE_COUNT; ++j) {
int ret = OB_EAGAIN;
while (OB_EAGAIN == ret) {
ret = ls_handles_.at(j).get_ls()->tablet_freeze(tablet_ids_.at(j), (i % 2 == 0) ? true : false);
ret = ls_handles_.at(j).get_ls()->tablet_freeze(tablet_ids_.at(j), false, (i % 2 == 0) ? true : false, 0);
if (OB_EAGAIN == ret) {
ob_usleep(rand() % SLEEP_TIME);
}
@ -383,7 +383,8 @@ void ObMinorFreezeTest::batch_tablet_freeze()
const int64_t start = ObTimeUtility::current_time();
while (ObTimeUtility::current_time() - start <= freeze_duration_) {
ASSERT_EQ(OB_SUCCESS, ls_handles_.at(0).get_ls()->batch_tablet_freeze(0, tablet_ids_, (i % 2 == 0) ? true : false));
ASSERT_EQ(OB_SUCCESS,
ls_handles_.at(0).get_ls()->tablet_freeze(-1, tablet_ids_, false, (i % 2 == 0) ? true : false, 0));
i = i + 1;
}
}

View File

@ -221,7 +221,7 @@ TEST_F(ObLockTableBeforeRestartTest, test_commit_log)
// wait until ls checkpoint updated.
LOG_INFO("ObLockTableBeforeRestartTest::test_commit_log 1.4");
// freeze data, make sure other type flushed.
ASSERT_EQ(OB_SUCCESS, ls->logstream_freeze(false));
ASSERT_EQ(OB_SUCCESS, ls->logstream_freeze(checkpoint::INVALID_TRACE_ID, false /*is_sync*/));
while (ls->get_clog_checkpoint_scn() < rec_scn) {
usleep(100 * 1000); // sleep 100 ms
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {

View File

@ -47,11 +47,16 @@ public:
TestRunCtx RunCtx;
ObLSID LS_ID;
ObTabletID TABLET_ID;
bool FLUSH_DISABLED;
namespace storage {
int ObDDLKV::flush(ObLSID ls_id)
{
if (FLUSH_DISABLED) {
return 0;
}
int ret = OB_SUCCESS;
if (!ready_for_flush()) {
return OB_ERR_UNEXPECTED;
@ -174,7 +179,7 @@ void TestTabletMemtable::basic_test() {
// *********** DO TABLET FREEZE ************
ObFreezer *freezer = nullptr;
ASSERT_NE(nullptr, freezer = ls->get_freezer());
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(TABLET_ID, true /* is_sync */));
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(TABLET_ID, false /* need_rewrite_meta */, true /* is_sync */, 0));
ASSERT_EQ(OB_ENTRY_NOT_EXIST, protected_handle->get_active_memtable(memtable_handle));
ASSERT_EQ(OB_SUCCESS, protected_handle->get_boundary_memtable(memtable_handle));
ASSERT_EQ(OB_SUCCESS, memtable_handle.get_tablet_memtable(memtable));
@ -191,9 +196,8 @@ void TestTabletMemtable::basic_test() {
memtable->inc_write_ref();
// *********** CONCURRENT TABLET FREEZE ************
ObFuture<int> result;
int64_t freeze_start_time = ObClockGenerator::getClock();
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze_with_rewrite_meta(TABLET_ID, &result));
ASSERT_EQ(OB_SUCCESS, ls->tablet_freeze(TABLET_ID, false /*need_rewrite_meta*/, false /*is_sync*/, INT64_MAX));
sleep(2);
ASSERT_EQ(TabletMemtableFreezeState::ACTIVE, memtable->get_freeze_state());
@ -201,8 +205,8 @@ void TestTabletMemtable::basic_test() {
// *********** DIRECT LOAD MEMTABLE WRITE FINISH ************
memtable->dec_write_ref();
ASSERT_EQ(OB_SUCCESS, ls->wait_freeze_finished(result));
STORAGE_LOG(INFO, "write_ref_cnt should be zero", KPC(memtable));
sleep(1);
// *********** CHECK FREEZE RESULT ************
ASSERT_EQ(OB_ENTRY_NOT_EXIST, protected_handle->get_active_memtable(memtable_handle));
@ -227,7 +231,7 @@ void TestTabletMemtable::basic_test() {
// *********** CHECK DIRECT LOAD TABLE GUARD UASBLE ************
ObDDLKV *memtable_for_direct_load = nullptr;
ASSERT_EQ(OB_SUCCESS, direct_load_guard.prepare_memtable(memtable_for_direct_load));
ASSERT_EQ(OB_ERR_UNEXPECTED, direct_load_guard.prepare_memtable(memtable_for_direct_load));
ASSERT_NE(nullptr, memtable_for_direct_load);
ASSERT_EQ(OB_SUCCESS, memtable_for_direct_load->set_rec_scn(fake_ddl_redo_scn));
ASSERT_EQ(memtable, memtable_for_direct_load);
ASSERT_EQ(1, memtable_for_direct_load->get_write_ref());
@ -235,7 +239,7 @@ void TestTabletMemtable::basic_test() {
ASSERT_EQ(0, memtable_for_direct_load->get_write_ref());
// *********** DO LOGSTREAM FREEZE ************
ASSERT_EQ(OB_SUCCESS, ls->logstream_freeze(true /* is_sync */));
ASSERT_EQ(OB_SUCCESS, ls->logstream_freeze(checkpoint::INVALID_TRACE_ID, true /* is_sync */));
STORAGE_LOG(INFO, "finish logstream freeze");
// *********** CHECK LOGSTREAM FREEZE RESULT ************
@ -249,6 +253,7 @@ void TestTabletMemtable::basic_test() {
STORAGE_LOG(INFO, "finish check logstream freeze result", KPC(memtable));
// *********** WAIT ALL MEMTABLES FLUSH ************
FLUSH_DISABLED = false;
int64_t retry_times = 0;
while (retry_times <= 10) {
sleep(1);
@ -304,6 +309,7 @@ TEST_F(TestTabletMemtable, create_table)
TEST_F(TestTabletMemtable, basic_test)
{
FLUSH_DISABLED = true;
basic_test();
}

View File

@ -910,13 +910,16 @@ int ObDataCheckpoint::freeze_base_on_needs_(const int64_t trace_id,
need_flush_num * 100 / wait_flush_num > TABLET_FREEZE_PERCENT;
}
}
if (logstream_freeze) {
if (OB_FAIL(ls_->logstream_freeze(trace_id, false /* !is_sync */))) {
const bool is_sync = false;
const bool abs_timeout_ts = 0; // async freeze do not need
if (OB_FAIL(ret)) {
} else if (logstream_freeze) {
if (OB_FAIL(ls_->logstream_freeze(trace_id, is_sync, abs_timeout_ts))) {
STORAGE_LOG(WARN, "minor freeze failed", K(ret), K(ls_->get_ls_id()));
}
} else if (OB_FAIL(ls_->batch_tablet_freeze(trace_id, need_flush_tablets, false /* !is_sync */))) {
STORAGE_LOG(WARN, "batch tablet freeze failed",
K(ret), K(ls_->get_ls_id()), K(need_flush_tablets));
} else if (OB_FAIL(ls_->tablet_freeze(trace_id, need_flush_tablets, is_sync))) {
STORAGE_LOG(WARN, "batch tablet freeze failed", K(ret), K(ls_->get_ls_id()), K(need_flush_tablets));
}
}
}

View File

@ -1351,20 +1351,22 @@ int ObBatchFreezeTabletsTask::process()
const ObTabletSchedulePair &cur_pair = param.tablet_pairs_.at(i);
ObTabletHandle tablet_handle;
ObTablet *tablet = nullptr;
const bool is_sync = true;
const bool need_rewrite_meta = true;
if (OB_UNLIKELY(!cur_pair.is_valid())) {
tmp_ret = OB_ERR_UNEXPECTED;
LOG_WARN_RET(tmp_ret, "get invalid tablet pair", K(cur_pair));
} else if (cur_pair.schedule_merge_scn_ > weak_read_ts) {
// no need to force freeze
} else if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(cur_pair.tablet_id_, true/*need_rewrite*/, true/*is_sync*/))) {
} else if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(cur_pair.tablet_id_, need_rewrite_meta, is_sync))) {
LOG_WARN_RET(tmp_ret, "failed to force freeze tablet", K(param), K(cur_pair));
++fail_freeze_cnt;
} else if (!MTL(ObTenantTabletScheduler *)->could_major_merge_start()) {
// merge is suspended
} else if (OB_TMP_FAIL(ls->get_tablet_svr()->get_tablet(cur_pair.tablet_id_,
tablet_handle,
0/*timeout_us*/,
0 /*timeout_us*/,
storage::ObMDSGetTabletMode::READ_ALL_COMMITED))) {
LOG_WARN_RET(tmp_ret, "failed to get tablet", K(param), K(cur_pair));
} else if (FALSE_IT(tablet = tablet_handle.get_obj())) {
@ -1372,10 +1374,8 @@ int ObBatchFreezeTabletsTask::process()
// do nothing
} else if (!tablet->is_data_complete()) {
// no need to schedule merge
} else if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_merge_dag(param.ls_id_,
*tablet,
MEDIUM_MERGE,
cur_pair.schedule_merge_scn_))) {
} else if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_merge_dag(
param.ls_id_, *tablet, MEDIUM_MERGE, cur_pair.schedule_merge_scn_))) {
if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) {
ret = tmp_ret;
LOG_WARN_RET(tmp_ret, "failed to schedule medium merge dag", K(param), K(cur_pair));

View File

@ -1329,13 +1329,17 @@ int ObTenantTabletScheduler::schedule_ls_minor_merge(
}
} // end of while
if (FALSE_IT(start_time_us = common::ObTimeUtility::current_time())) {
} else if (OB_TMP_FAIL(ls.batch_tablet_freeze(checkpoint::INVALID_TRACE_ID, need_fast_freeze_tablets, true/*is_sync*/))) {
const bool is_sync = true;
start_time_us = ObClockGenerator::getClock();
if (OB_TMP_FAIL(ls.tablet_freeze(checkpoint::INVALID_TRACE_ID, need_fast_freeze_tablets, is_sync))) {
LOG_WARN("failt to batch freeze tablet", KR(tmp_ret), K(ls_id), K(need_fast_freeze_tablets));
} else {
LOG_INFO("fast freeze by batch_tablet_freeze finish", KR(tmp_ret),
"freeze cnt", need_fast_freeze_tablets.count(),
"cost time(ns)", common::ObTimeUtility::current_time() - start_time_us);
LOG_INFO("fast freeze by batch_tablet_freeze finish",
KR(tmp_ret),
"freeze cnt",
need_fast_freeze_tablets.count(),
"cost time(ns)",
common::ObTimeUtility::current_time() - start_time_us);
}
} // else
return ret;

View File

@ -206,9 +206,10 @@ int ObDDLIncCommitClogCb::on_success()
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls should not be null", K(ret), K(log_basic_.get_tablet_id()));
} else {
(void)ls->async_tablet_freeze_for_direct_load(log_basic_.get_tablet_id());
const bool is_sync = false;
(void)ls->tablet_freeze(log_basic_.get_tablet_id(), is_sync);
if (log_basic_.get_lob_meta_tablet_id().is_valid()) {
(void)ls->async_tablet_freeze_for_direct_load(log_basic_.get_lob_meta_tablet_id());
(void)ls->tablet_freeze(log_basic_.get_lob_meta_tablet_id(), is_sync);
}
}

View File

@ -400,6 +400,8 @@ int ObDDLIncRedoLogWriter::local_write_inc_start_log(
ObDDLIncStartClogCb *cb = nullptr;
ObPartTransCtx *trans_ctx = nullptr;
ObDDLIncLogHandle handle;
const bool is_sync = true;
const int64_t abs_timeout_ts = ObClockGenerator::getClock() + DEFAULT_RETRY_TIMEOUT_US;
ObDDLRedoLockGuard guard(tablet_id_.hash());
if (OB_UNLIKELY(!log.is_valid())) {
@ -410,10 +412,9 @@ int ObDDLIncRedoLogWriter::local_write_inc_start_log(
} else if (OB_ISNULL(ls) || !tablet_id_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KP(ls), K(tablet_id_));
} else if (OB_FAIL(ls->sync_tablet_freeze_for_direct_load(tablet_id_, DEFAULT_RETRY_TIMEOUT_US))) {
} else if (OB_FAIL(ls->tablet_freeze(tablet_id_, is_sync, abs_timeout_ts))) {
LOG_WARN("sync tablet freeze failed", K(ret), K(tablet_id_));
} else if (lob_meta_tablet_id.is_valid() &&
OB_FAIL(ls->sync_tablet_freeze_for_direct_load(lob_meta_tablet_id, DEFAULT_RETRY_TIMEOUT_US))) {
} else if (lob_meta_tablet_id.is_valid() && OB_FAIL(ls->tablet_freeze(lob_meta_tablet_id, is_sync, abs_timeout_ts))) {
LOG_WARN("sync tablet freeze failed", K(ret), K(lob_meta_tablet_id));
} else if (OB_ISNULL(cb = OB_NEW(ObDDLIncStartClogCb, ObMemAttr(MTL_ID(), "DDL_IRLW")))) {
ret = OB_ALLOCATE_MEMORY_FAILED;

View File

@ -638,6 +638,37 @@ int ObDDLIncStartReplayExecutor::init(
return ret;
}
struct SyncTabletFreezeHelper {
ObLS *ls_;
const ObTabletID &tablet_id_;
const ObTabletID &lob_meta_tablet_id_;
SyncTabletFreezeHelper(ObLS *ls, const ObTabletID &tablet_id, const ObTabletID &lob_meta_tablet_id)
: ls_(ls), tablet_id_(tablet_id), lob_meta_tablet_id_(lob_meta_tablet_id) {}
int operator()()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(ls_) || !tablet_id_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KP(ls_), K(tablet_id_));
} else {
const bool is_sync = true;
// try freeze for ten seconds
int64_t abs_timeout_ts = ObClockGenerator::getClock() + 10LL * 1000LL * 1000LL;
int tmp_ret = ls_->tablet_freeze(tablet_id_, is_sync, abs_timeout_ts);
int tmp_ret_lob = OB_SUCCESS;
if (lob_meta_tablet_id_.is_valid()) {
abs_timeout_ts = ObClockGenerator::getClock() + 10LL * 1000LL * 1000LL;
tmp_ret_lob = ls_->tablet_freeze(lob_meta_tablet_id_, is_sync, abs_timeout_ts);
}
if (OB_SUCCESS != (tmp_ret | tmp_ret_lob)) {
ret = OB_EAGAIN;
LOG_WARN("sync freeze failed", K(ret), K(tmp_ret), K(tmp_ret_lob), K(tablet_id_), K(lob_meta_tablet_id_));
}
}
return ret;
}
};
int ObDDLIncStartReplayExecutor::do_replay_(ObTabletHandle &tablet_handle)
{
int ret = OB_SUCCESS;
@ -655,22 +686,9 @@ int ObDDLIncStartReplayExecutor::do_replay_(ObTabletHandle &tablet_handle)
} else if (!need_replay) {
// do nothing
} else {
ObTabletID tablet_id = log_->get_log_basic().get_tablet_id();
ObTabletID lob_meta_tablet_id = log_->get_log_basic().get_lob_meta_tablet_id();
if (OB_ISNULL(ls_) || !tablet_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KP(ls_), K(tablet_id));
} else {
int tmp_ret = ls_->sync_tablet_freeze_for_direct_load(tablet_id);
int tmp_ret_lob = OB_SUCCESS;
if (lob_meta_tablet_id.is_valid()) {
tmp_ret_lob = ls_->sync_tablet_freeze_for_direct_load(lob_meta_tablet_id);
}
if (OB_SUCCESS != (tmp_ret | tmp_ret_lob)) {
ret = OB_EAGAIN;
LOG_WARN("sync freeze failed", KR(ret), KR(tmp_ret), KR(tmp_ret_lob), K(tablet_id), K(lob_meta_tablet_id));
}
}
SyncTabletFreezeHelper sync_tablet_freeze(
ls_, log_->get_log_basic().get_tablet_id(), log_->get_log_basic().get_lob_meta_tablet_id());
return sync_tablet_freeze();
}
return ret;
@ -723,22 +741,9 @@ int ObDDLIncCommitReplayExecutor::do_replay_(ObTabletHandle &tablet_handle)
} else if (!need_replay) {
// do nothing
} else {
ObTabletID tablet_id = log_->get_log_basic().get_tablet_id();
ObTabletID lob_meta_tablet_id = log_->get_log_basic().get_lob_meta_tablet_id();
if (OB_ISNULL(ls_) || !tablet_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KP(ls_), K(tablet_id));
} else {
int tmp_ret = ls_->sync_tablet_freeze_for_direct_load(tablet_id);
int tmp_ret_lob = OB_SUCCESS;
if (lob_meta_tablet_id.is_valid()) {
tmp_ret_lob = ls_->sync_tablet_freeze_for_direct_load(lob_meta_tablet_id);
}
if (OB_SUCCESS != (tmp_ret | tmp_ret_lob)) {
ret = OB_EAGAIN;
LOG_WARN("sync freeze failed", KR(ret), KR(tmp_ret), KR(tmp_ret_lob), K(tablet_id), K(lob_meta_tablet_id));
}
}
SyncTabletFreezeHelper sync_tablet_freeze(
ls_, log_->get_log_basic().get_tablet_id(), log_->get_log_basic().get_lob_meta_tablet_id());
return sync_tablet_freeze();
}
return ret;
}

View File

@ -606,7 +606,8 @@ int ObTabletBackfillTXTask::get_backfill_tx_memtables_(
}
} else if (!table->is_frozen_memtable()) {
is_memtable_ready = false;
if (OB_FAIL(ls->tablet_freeze(tablet_info_.tablet_id_))) {
const bool is_sync = false;
if (OB_FAIL(ls->tablet_freeze(tablet_info_.tablet_id_, is_sync))) {
if (OB_EAGAIN == ret) {
ret = OB_SUCCESS;
} else {

View File

@ -1153,6 +1153,7 @@ int ObTransferHandler::wait_tablet_write_end_(
// table lock operation end
ObTransID failed_tx_id;
bool has_active_memtable = false;
const bool is_sync = true;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ls->get_lock_table()->enable_check_tablet_status(true))) {
LOG_WARN("failed to enable check tablet status", KR(ret), K(task_info));
@ -1160,7 +1161,7 @@ int ObTransferHandler::wait_tablet_write_end_(
LOG_WARN("failed to wait tx_write end", KR(ret), K(task_info));
} else if (OB_FAIL(ls->get_tx_svr()->traverse_trans_to_submit_redo_log(failed_tx_id))) {
LOG_WARN("failed to submit tx log", KR(ret), K(task_info));
} else if (OB_FAIL(ls->batch_tablet_freeze(checkpoint::INVALID_TRACE_ID, tablet_list, true))) {
} else if (OB_FAIL(ls->tablet_freeze(checkpoint::INVALID_TRACE_ID, tablet_list, is_sync))) {
LOG_WARN("batch tablet freeze failed", KR(ret), KPC(ls), K(task_info));
} else if (OB_FAIL(ls->check_tablet_no_active_memtable(tablet_list, has_active_memtable))) {
LOG_WARN("check tablet has active memtable failed", KR(ret), KPC(ls), K(task_info));

File diff suppressed because it is too large Load Diff

View File

@ -46,7 +46,7 @@ class ObLSTabletService;
class ObTablet;
class ObLSWRSHandler;
class ObTableHandleV2;
struct FreezeTaskFunctor;
struct AsyncFreezeFunctor;
namespace checkpoint
{
class ObDataCheckpoint;
@ -160,12 +160,32 @@ private:
ObSpinLock lock_;
};
struct AsyncFreezeTabletInfo {
ObTabletID tablet_id_;
int64_t epoch_;
int64_t hash() const { return tablet_id_.id(); }
AsyncFreezeTabletInfo() : tablet_id_(ObTabletID(ObTabletID::INVALID_TABLET_ID)), epoch_(-1) {}
int hash(uint64_t &hash_val) const
{
hash_val = hash();
return OB_SUCCESS;
}
bool operator==(const AsyncFreezeTabletInfo &rhs) const
{
return (tablet_id_ == rhs.tablet_id_) && (epoch_ == rhs.epoch_);
}
TO_STRING_KV(K(tablet_id_), K(epoch_));
};
class ObFreezer
{
public:
friend FreezeTaskFunctor;
static const int64_t MAX_WAIT_READY_FOR_FLUSH_TIME = 10 * 1000 * 1000; // 10_s
typedef common::ObSEArray<ObTableHandleV2, OB_DEFAULT_TABLET_ID_COUNT> ObTableHandleArray;
friend AsyncFreezeFunctor;
static const int64_t SYNC_FREEZE_DEFAULT_RETRY_TIME = 10LL * 1000LL * 1000LL; // 10 senconds
public:
ObFreezer();
@ -178,13 +198,28 @@ public:
void online() { enable_ = true; }
public:
/* freeze */
void commit_async_tablet_freeze_task_once(const ObTabletID &tablet_id, const uint64_t epoch);
int logstream_freeze(const int64_t trace_id, ObFuture<int> *result = nullptr);
int tablet_freeze(const ObTabletID &tablet_id, ObFuture<int> *result = nullptr);
int tablet_freeze_task_for_direct_load(const ObTabletID &tablet_id, ObFuture<int> *result = nullptr);
int tablet_freeze_with_rewrite_meta(const ObTabletID &tablet_id, ObFuture<int> *result = nullptr);
int batch_tablet_freeze(const int64_t trace_id, const ObIArray<ObTabletID> &tablet_ids, ObFuture<int> *result = nullptr);
/********************** freeze **********************/
int logstream_freeze(const int64_t trace_id);
int wait_ls_freeze_finish();
int wait_tablet_freeze_finish(ObIArray<ObTableHandleV2> &frozen_memtable_handles,
ObIArray<ObTabletID> &freeze_failed_tablets);
int ls_inner_tablet_freeze(const ObTabletID &tablet_id);
int tablet_freeze(const int64_t trace_id,
const ObIArray<ObTabletID> &tablet_ids,
const bool need_rewrite_meta,
ObIArray<ObTableHandleV2> &frozen_memtable_handles,
ObIArray<ObTabletID> &freeze_failed_tablets);
int get_all_async_freeze_tablets(const int64_t ls_epoch, ObIArray<ObTabletID> &tablet_ids);
bool is_async_freeze_tablets_empty() const { return async_freeze_tablets_.empty(); }
bool is_async_tablet_freeze_task_running() { return ATOMIC_LOAD(&is_async_tablet_freeze_task_running_); }
bool set_async_tablet_freeze_task_start_succ() { return ATOMIC_BCAS(&is_async_tablet_freeze_task_running_, false, true); }
void set_async_freeze_task_stop() { ATOMIC_STORE(&is_async_tablet_freeze_task_running_, false); }
void record_async_freeze_tablet(const AsyncFreezeTabletInfo &async_freeze_tablet_info);
void erase_async_freeze_tablet(const AsyncFreezeTabletInfo &async_freeze_tablet_info);
void commit_an_async_freeze_task(const int64_t trace_id, const bool is_ls_freeze);
void async_tablet_freeze_consumer(const int64_t trace_id);
common::hash::ObHashSet<AsyncFreezeTabletInfo> &get_async_freeze_tablets() { return async_freeze_tablets_; }
/********************** freeze **********************/
/* freeze_flag */
bool is_freeze(uint32_t is_freeze=UINT32_MAX) const;
@ -225,8 +260,6 @@ public:
ObFreezerStat& get_stat() { return stat_; }
bool need_resubmit_log() { return ATOMIC_LOAD(&need_resubmit_log_); }
void set_need_resubmit_log(bool flag) { return ATOMIC_STORE(&need_resubmit_log_, flag); }
// only used after start freeze_task successfully
int wait_freeze_finished(ObFuture<int> &result);
int pend_ls_replay();
int restore_ls_replay();
@ -242,7 +275,7 @@ private:
class ObTabletFreezeGuard
{
public:
ObTabletFreezeGuard(ObFreezer &parent, const bool try_guard = false);
ObTabletFreezeGuard(ObFreezer &parent, const bool try_guard);
~ObTabletFreezeGuard();
int try_set_tablet_freeze_begin();
private:
@ -259,26 +292,22 @@ private:
/* freeze_flag */
int set_freeze_flag();
int set_freeze_flag_without_inc_freeze_clock();
int loop_set_freeze_flag();
int loop_set_freeze_flag(const int64_t max_loop_time);
void unset_freeze_();
void undo_freeze_();
void try_freeze_tx_data_();
/* inner subfunctions for freeze process */
void inner_logstream_freeze(ObFuture<int> *result);
int inner_logstream_freeze();
int submit_log_for_freeze(const bool is_tablet_freeze, const bool is_try);
void submit_log_if_needed_for_tablet_freeze_(ObTableHandleV2 &handle);
void submit_log_if_needed_(ObIArray<ObTableHandleV2> &frozen_memtable_handles);
void try_submit_log_for_freeze_(const bool is_tablet_freeze);
int ls_freeze_task_();
int tablet_freeze_task_(ObTableHandleV2 handle);
int tablet_freeze_task_for_direct_load_(const ObTabletID &tablet_id, const uint64_t epoch);
int do_data_memtable_tablet_freeze_(ObITabletMemtable *tablet_memtable);
int do_direct_load_memtable_tablet_freeze_(ObITabletMemtable *tablet_memtable);
int do_tablet_freeze_(const bool need_rewrite_meta,
const int64_t start_time,
const ObTabletID &tablet_id,
ObFuture<int> *result,
ObTableHandleV2 &frozen_memtable_handle);
int wait_data_memtable_freeze_finish_(ObITabletMemtable *tablet_memtable);
int wait_direct_load_memtable_freeze_finish_(ObITabletMemtable *tablet_memtable);
int set_tablet_freeze_flag_(const int64_t trace_id,
const ObTabletID tablet_id,
const bool need_rewrite_meta,
const SCN freeze_snapshot_version,
ObIArray<ObTableHandleV2> &frozen_memtable_handles);
int handle_no_active_memtable_(const ObTabletID &tablet_id,
const ObTablet *tablet,
const share::SCN freeze_snapshot_version);
@ -288,7 +317,12 @@ private:
const ObTablet *tablet,
const share::SCN freeze_snapshot_version,
int &ret);
void submit_freeze_task_(const bool is_ls_freeze, ObFuture<int> *result, ObTableHandleV2 &handle);
void init_tablet_freeze_param_(const ObIArray<ObTabletID> &tablet_ids,
const bool need_rewrite_meta,
int64_t &max_loop_time,
ObTabletID &record_tablet_id,
bool &try_guard);
int submit_wait_freeze_finish_task_(const bool is_ls_freeze, ObFuture<int> *result, ObTableHandleV2 &handle);
int wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tablet_memtable);
int try_set_tablet_freeze_begin_();
void set_tablet_freeze_begin_();
@ -296,15 +330,13 @@ private:
void set_ls_freeze_begin_();
void set_ls_freeze_end_();
int check_ls_state(); // must be used under the protection of ls_lock
int freeze_normal_tablet_(const ObTabletID &tablet_id,
ObFuture<int> *result = nullptr,
const bool for_direct_load = false);
int freeze_ls_inner_tablet_(const ObTabletID &tablet_id);
int batch_tablet_freeze_(const int64_t trace_id, const ObIArray<ObTabletID> &tablet_ids,
ObFuture<int> *result, bool &need_freeze);
void submit_batch_tablet_freeze_task(const ObTableHandleArray &tables_array, ObFuture<int> *result);
int batch_tablet_freeze_task(ObTableHandleArray tables_array);
int finish_freeze_with_ls_lock(ObITabletMemtable *tablet_memtable);
int tablet_freeze_(const int64_t trace_id,
const ObIArray<ObTabletID> &tablet_ids,
const bool need_rewrite_meta,
const share::SCN freeze_snapshot_version,
ObIArray<ObTableHandleV2> &frozen_memtable_handles,
ObIArray<ObTabletID> &freeze_failed_tablets);
int inner_wait_memtable_freeze_finish_(ObTableHandleV2 &memtable_handle);
int try_wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tablet_memtable,
bool &ready_for_flush,
bool &is_force_released,
@ -333,12 +365,11 @@ private:
int64_t low_priority_freeze_cnt_; // freeze tablet cnt
int64_t pend_replay_cnt_;
common::ObByteLock byte_lock_; // only used to control pend_replay_cnt_
bool need_resubmit_log_;
bool enable_; // whether we can do freeze now
bool is_inited_;
bool is_async_tablet_freeze_task_running_;
common::hash::ObHashSet<AsyncFreezeTabletInfo> async_freeze_tablets_;
};
} // namespace storage

View File

@ -1821,211 +1821,213 @@ int ObLS::replay_get_tablet(
return ret;
}
int ObLS::logstream_freeze(const int64_t trace_id, const bool is_sync, const int64_t abs_timeout_ts)
int ObLS::logstream_freeze(const int64_t trace_id, const bool is_sync, const int64_t input_abs_timeout_ts)
{
int ret = OB_SUCCESS;
ObFuture<int> result;
if (is_sync) {
const int64_t abs_timeout_ts = (0 == input_abs_timeout_ts)
? ObClockGenerator::getClock() + ObFreezer::SYNC_FREEZE_DEFAULT_RETRY_TIME
: input_abs_timeout_ts;
ret = logstream_freeze_task(trace_id, abs_timeout_ts);
} else {
const bool is_ls_freeze = true;
(void)ls_freezer_.commit_an_async_freeze_task(trace_id, is_ls_freeze);
}
return ret;
}
int ObLS::logstream_freeze_task(const int64_t trace_id, const int64_t abs_timeout_ts)
{
int ret = OB_SUCCESS;
const int64_t start_time = ObClockGenerator::getClock();
{
int64_t read_lock = LSLOCKALL;
int64_t write_lock = 0;
ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock, abs_timeout_ts);
if (!lock_myself.locked()) {
ret = OB_TIMEOUT;
LOG_WARN("lock ls failed, please retry later", K(ret), K(ls_meta_));
STORAGE_LOG(WARN, "lock ls failed, please retry later", K(ret), K(ls_meta_));
} else if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ls is not inited", K(ret));
STORAGE_LOG(WARN, "ls is not inited", K(ret));
} else if (OB_UNLIKELY(is_offline())) {
ret = OB_LS_OFFLINE;
LOG_WARN("offline ls not allowed freeze", K(ret), K_(ls_meta));
} else if (OB_FAIL(ls_freezer_.logstream_freeze(trace_id, &result))) {
LOG_WARN("logstream freeze failed", K(ret), K_(ls_meta));
} else {
// do nothing
STORAGE_LOG(WARN, "offline ls not allowed freeze", K(ret), K_(ls_meta));
} else if (OB_FAIL(ls_freezer_.logstream_freeze(trace_id))) {
STORAGE_LOG(WARN, "logstream freeze failed", K(ret), K_(ls_meta));
}
}
if (OB_SUCC(ret) && is_sync) {
ret = ls_freezer_.wait_freeze_finished(result);
}
return ret;
}
int ObLS::tablet_freeze(const ObTabletID &tablet_id,
const bool is_sync,
const int64_t abs_timeout_ts)
{
int ret = OB_SUCCESS;
ObFuture<int> result;
{
int64_t read_lock = LSLOCKALL;
int64_t write_lock = 0;
ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock, abs_timeout_ts);
if (!lock_myself.locked()) {
ret = OB_TIMEOUT;
LOG_WARN("lock failed, please retry later", K(ret), K(ls_meta_));
} else if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ls is not inited", K(ret));
} else if (OB_UNLIKELY(is_offline())) {
ret = OB_LS_OFFLINE;
LOG_WARN("offline ls not allowed freeze", K(ret), K_(ls_meta));
} else if (OB_FAIL(ls_freezer_.tablet_freeze(tablet_id, &result))) {
LOG_WARN("tablet freeze failed", K(ret), K(tablet_id));
} else {
// do nothing
}
}
if (OB_SUCC(ret) && is_sync) {
ret = ls_freezer_.wait_freeze_finished(result);
}
return ret;
}
int ObLS::tablet_freeze_with_rewrite_meta(const ObTabletID &tablet_id,
ObFuture<int> *result,
const int64_t abs_timeout_ts)
{
int ret = OB_SUCCESS;
int64_t read_lock = LSLOCKALL;
int64_t write_lock = 0;
ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock, abs_timeout_ts);
if (!lock_myself.locked()) {
ret = OB_TIMEOUT;
LOG_WARN("lock failed, please retry later", K(ret), K(ls_meta_));
} else if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ls is not inited", K(ret));
} else if (OB_UNLIKELY(is_offline())) {
ret = OB_LS_OFFLINE;
LOG_WARN("offline ls not allowed freeze", K(ret), K_(ls_meta));
} else if (OB_FAIL(ls_freezer_.tablet_freeze_with_rewrite_meta(tablet_id, result))) {
LOG_WARN("tablet force freeze failed", K(ret), K(tablet_id));
} else {
// do nothing
}
return ret;
}
/**
* @brief Used for both async and sync freeze
*
* @param tablet_id tablet to be freezed
* @param epoch to check if logstream has offlined
*/
int ObLS::tablet_freeze_task_for_direct_load(const ObTabletID &tablet_id, const uint64_t epoch, ObFuture<int> *result)
{
int ret = OB_SUCCESS;
int64_t read_lock = LSLOCKALL;
int64_t write_lock = 0;
ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock);
if (!lock_myself.locked()) {
ret = OB_TIMEOUT;
STORAGE_LOG(WARN, "lock failed, please retry later", K(ret), K(ls_meta_));
} else if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ls is not inited", K(ret));
} else if (OB_UNLIKELY(is_offline())) {
ret = OB_LS_OFFLINE;
LOG_WARN("ls has offlined", K(ret), K_(ls_meta));
} else if (ATOMIC_LOAD(&switch_epoch_) != epoch) {
// happened in async freeze situation. This ls has offlined and onlined again
ret = OB_SUCCESS;
FLOG_INFO("quit freeze because logstream epoch has changed", K(ret), K(tablet_id), K(epoch), K(ls_meta_));
} else if (OB_FAIL(ls_freezer_.tablet_freeze_task_for_direct_load(tablet_id, result))) {
LOG_WARN("tablet force freeze failed", K(ret), K(tablet_id));
} else {
// freeze success
}
if (OB_FAIL(ret)) {
int origin_ret = ret;
// reset ret to EAGAIN to retry freeze
ret = OB_EAGAIN;
if (REACH_TIME_INTERVAL(1LL * 1000LL * 1000LL /* 1 second */)) {
STORAGE_LOG(INFO, "reset ret code to stop retry", KR(ret), KR(origin_ret));
} else if (OB_FAIL(ls_freezer_.wait_ls_freeze_finish())) {
STORAGE_LOG(WARN, "wait ls freeze finish failed", KR(ret));
}
const int64_t ls_freeze_task_spend_time = ObClockGenerator::getClock() - start_time;
STORAGE_LOG(INFO,
"[Freezer] logstream freeze task finish",
K(ret),
K(ls_freeze_task_spend_time),
K(trace_id),
KTIME(abs_timeout_ts));
return ret;
}
/**
* @brief for single tablet freeze
*
*/
int ObLS::tablet_freeze(const ObTabletID &tablet_id,
const bool is_sync,
const int64_t input_abs_timeout_ts,
const bool need_rewrite_meta)
{
int ret = OB_SUCCESS;
if (tablet_id.is_ls_inner_tablet()) {
ret = ls_freezer_.ls_inner_tablet_freeze(tablet_id);
} else {
ObSEArray<ObTabletID, 1> tablet_ids;
if (OB_FAIL(tablet_ids.push_back(tablet_id))) {
STORAGE_LOG(WARN, "push back tablet id failed", KR(ret), K(tablet_id));
} else {
ret = tablet_freeze(checkpoint::INVALID_TRACE_ID, tablet_ids, is_sync, input_abs_timeout_ts, need_rewrite_meta);
}
}
return ret;
}
/**
* @brief sync freeze only retry for a while.
*
* @param tablet_id
* @param max_retry_time
* @return int
*/
int ObLS::sync_tablet_freeze_for_direct_load(const ObTabletID &tablet_id, const int64_t max_retry_time)
int ObLS::tablet_freeze(const int64_t trace_id,
const ObIArray<ObTabletID> &tablet_ids,
const bool is_sync,
const int64_t input_abs_timeout_ts,
const bool need_rewrite_meta)
{
int ret = OB_SUCCESS;
const uint64_t epoch = ATOMIC_LOAD(&switch_epoch_);
const int64_t start_time = ObClockGenerator::getClock();
const int64_t RETRY_INTERVAL = 100 * 1000; // 100 ms
STORAGE_LOG(
DEBUG, "start tablet freeze", K(tablet_ids), K(is_sync), KTIME(input_abs_timeout_ts), K(need_rewrite_meta));
int64_t freeze_epoch = ATOMIC_LOAD(&switch_epoch_);
if (need_rewrite_meta && (!is_sync)) {
ret = OB_NOT_SUPPORTED;
STORAGE_LOG(ERROR,
"tablet freeze for rewrite meta must be sync freeze ",
KR(ret),
K(need_rewrite_meta),
K(is_sync),
K(tablet_ids));
} else if (is_sync) {
const int64_t start_time = ObClockGenerator::getClock();
const int64_t abs_timeout_ts =
(0 == input_abs_timeout_ts) ? start_time + ObFreezer::SYNC_FREEZE_DEFAULT_RETRY_TIME : input_abs_timeout_ts;
bool is_retry_code = false;
bool is_not_timeout = false;
do {
ret = tablet_freeze_task(trace_id, tablet_ids, need_rewrite_meta, is_sync, abs_timeout_ts, freeze_epoch);
const int64_t current_time = ObClockGenerator::getClock();
if (OB_FAIL(ret) &&
current_time - start_time > 10LL * 1000LL * 1000LL &&
REACH_TIME_INTERVAL(5LL * 1000LL * 1000LL)) {
STORAGE_LOG(WARN, "sync tablet freeze for long time", KR(ret), KTIME(start_time), KTIME(abs_timeout_ts));
}
do {
ret = OB_SUCCESS;
ObFuture<int> result;
if (OB_FAIL(tablet_freeze_task_for_direct_load(tablet_id, epoch, &result))) {
usleep(RETRY_INTERVAL);
} else if (OB_FAIL(ls_freezer_.wait_freeze_finished(result))) {
STORAGE_LOG(WARN, "freeze task failed", KR(ret));
is_retry_code = OB_EAGAIN == ret || OB_MINOR_FREEZE_NOT_ALLOW == ret || OB_ALLOCATE_MEMORY_FAILED == ret;
is_not_timeout = current_time < abs_timeout_ts;
} while (is_retry_code && is_not_timeout);
} else {
(void)record_async_freeze_tablets_(tablet_ids, freeze_epoch);
if (ls_freezer_.is_async_tablet_freeze_task_running()) {
// do not need another async batch freeze task
} else {
const bool is_ls_freeze = false;
(void)ls_freezer_.commit_an_async_freeze_task(trace_id, is_ls_freeze);
}
} while (OB_FAIL(ret) && ObClockGenerator::getClock() - start_time < max_retry_time);
}
return ret;
}
/**
* @brief Record switch_epoch when commit the root freeze task. Async tablet freeze task will check if epoch is the
* same.
*/
void ObLS::async_tablet_freeze_for_direct_load(const ObTabletID &tablet_id)
{
const uint64_t epoch = ATOMIC_LOAD(&switch_epoch_);
FLOG_INFO("commit root freeze task", K(tablet_id), K(epoch));
(void)ls_freezer_.commit_async_tablet_freeze_task_once(tablet_id, epoch);
}
int ObLS::batch_tablet_freeze(const int64_t trace_id,
const ObIArray<ObTabletID> &tablet_ids,
const bool is_sync,
const int64_t abs_timeout_ts)
int ObLS::tablet_freeze_task(const int64_t trace_id,
const ObIArray<ObTabletID> &tablet_ids,
const bool need_rewrite_meta,
const bool is_sync,
const int64_t abs_timeout_ts,
const int64_t freeze_epoch)
{
int ret = OB_SUCCESS;
ObFuture<int> result;
bool print_warn_log = false;
const int64_t start_time = ObClockGenerator::getClock();
ObSEArray<ObTableHandleV2, 32> frozen_memtable_handles;
ObSEArray<ObTabletID, 32> freeze_failed_tablets;
{
int64_t read_lock = LSLOCKALL;
int64_t write_lock = 0;
ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock, abs_timeout_ts);
if (!lock_myself.locked()) {
ret = OB_TIMEOUT;
LOG_WARN("lock failed, please retry later", K(ret), K(ls_meta_));
STORAGE_LOG(WARN, "lock failed, please retry later", K(ret), K(ls_meta_));
} else if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ls is not inited", K(ret));
STORAGE_LOG(WARN, "ls is not inited", K(ret));
} else if (OB_UNLIKELY(is_offline())) {
ret = OB_LS_OFFLINE;
LOG_WARN("offline ls not allowed freeze", K(ret), K_(ls_meta));
} else if (OB_FAIL(ls_freezer_.batch_tablet_freeze(trace_id, tablet_ids, &result))) {
LOG_WARN("batch tablet freeze failed", K(ret));
} else {
// do nothing
STORAGE_LOG(WARN, "ls has offlined", K(ret), K_(ls_meta));
} else if (OB_FAIL(ls_freezer_.tablet_freeze(
trace_id, tablet_ids, need_rewrite_meta, frozen_memtable_handles, freeze_failed_tablets))) {
if (REACH_TIME_INTERVAL(1LL * 1000LL * 1000LL)) {
STORAGE_LOG(WARN, "tablet freeze failed", KR(ret), K(ls_meta_.ls_id_), K(tablet_ids), K(freeze_failed_tablets));
}
}
}
if (OB_SUCC(ret) && is_sync) {
ret = ls_freezer_.wait_freeze_finished(result);
// ATTENTION : if frozen memtable handles not empty, must wait freeze finish
if (!frozen_memtable_handles.empty()) {
(void)ls_freezer_.wait_tablet_freeze_finish(frozen_memtable_handles, freeze_failed_tablets);
}
// handle freeze failed tablets
if (!freeze_failed_tablets.empty()) {
if (OB_SUCC(ret)) {
// some tablet freeze failed need retry
ret = OB_EAGAIN;
}
if (!is_sync) {
(void)record_async_freeze_tablets_(freeze_failed_tablets, freeze_epoch);
}
}
if (OB_SUCC(ret)) {
const int64_t tablet_freeze_task_spend_time = ObClockGenerator::getClock() - start_time;
STORAGE_LOG(INFO,
"[Freezer] tablet freeze task success",
K(ret),
K(need_rewrite_meta),
K(is_sync),
K(tablet_freeze_task_spend_time),
K(trace_id),
KTIME(abs_timeout_ts));
}
return ret;
}
void ObLS::record_async_freeze_tablets_(const ObIArray<ObTabletID> &tablet_ids, const int64_t epoch)
{
for (int64_t i = 0; i < tablet_ids.count(); i++) {
AsyncFreezeTabletInfo tablet_info;
tablet_info.tablet_id_ = tablet_ids.at(i);
tablet_info.epoch_ = epoch;
(void)ls_freezer_.record_async_freeze_tablet(tablet_info);
}
}
void ObLS::record_async_freeze_tablet_(const ObTabletID &tablet_id, const int64_t epoch)
{
AsyncFreezeTabletInfo tablet_info;
tablet_info.tablet_id_ = tablet_id;
tablet_info.epoch_ = epoch;
(void)ls_freezer_.record_async_freeze_tablet(tablet_info);
}
int ObLS::advance_checkpoint_by_flush(SCN recycle_scn, const int64_t abs_timeout_ts, const bool is_tennat_freeze)
{
int ret = OB_SUCCESS;

View File

@ -251,7 +251,7 @@ public:
{ return running_state_.is_stopped(); }
int64_t get_state_seq() const
{ return ATOMIC_LOAD(&state_seq_); }
int64_t get_switch_epoch() const { return ATOMIC_LOAD(&switch_epoch_); }
ObLSTxService *get_tx_svr() { return &ls_tx_svr_; }
ObLockTable *get_lock_table() { return &lock_table_; }
ObTxTable *get_tx_table() { return &tx_table_; }
@ -856,42 +856,46 @@ public:
// ObReplayHandler interface:
DELEGATE_WITH_RET(replay_handler_, replay, int);
// ObFreezer interface:
// freeze the data of ls:
// @param [in] trace_id, for checkpoint diagnose
// @param [in] is_sync, only used for wait_freeze_finished()
// @param [in] abs_timeout_ts, wait until timeout if lock conflict
int logstream_freeze(const int64_t trace_id,
const bool is_sync = false,
const int64_t abs_timeout_ts = INT64_MAX);
// tablet freeze
// @param [in] is_sync, only used for wait_freeze_finished()
// @param [in] abs_timeout_ts, wait until timeout if lock conflict
/**
* @brief freeze this logstream
*
* @param[in] trace_id
* @param[in] is_sync if is_sync == true, call logstream_freeze_task directly. Or commit an async task to execute
* logstream_freeze_task
* @param[in] abs_timeout_ts only used when is_sync == true, 0 as default, which means retry for
* ObFreezer::SYNC_FREEZE_DEFAULT_RETRY_TIME seconds
*/
int logstream_freeze(const int64_t trace_id, const bool is_sync, const int64_t abs_timeout_ts = 0);
int logstream_freeze_task(const int64_t trace_id,
const int64_t abs_timeout_ts);
int tablet_freeze(const ObTabletID &tablet_id,
const bool is_sync = false,
const int64_t abs_timeout_ts = INT64_MAX);
// tablet_freeze_with_rewrite_meta
// @param [in] abs_timeout_ts, wait until timeout if lock conflict
int tablet_freeze_with_rewrite_meta(const ObTabletID &tablet_id,
ObFuture<int> *result = nullptr,
const int64_t abs_timeout_ts = INT64_MAX);
int tablet_freeze_task_for_direct_load(const ObTabletID &tablet_id,
const uint64_t epoch,
ObFuture<int> *result = nullptr);
int sync_tablet_freeze_for_direct_load(const ObTabletID &tablet_id,
const int64_t max_retry_time = 5LL * 1000LL * 1000LL /*5 seconds*/);
void async_tablet_freeze_for_direct_load(const ObTabletID &tablet_id);
DELEGATE_WITH_RET(ls_freezer_, wait_freeze_finished, int);
// batch tablet freeze
// @param [in] tablet_ids
// @param [in] is_sync
// @param [in] abs_timeout_ts, wait until timeout if lock conflict
int batch_tablet_freeze(const int64_t trace_id,
const ObIArray<ObTabletID> &tablet_ids,
const bool is_sync = false,
const int64_t abs_timeout_ts = INT64_MAX);
const bool is_sync,
const int64_t input_abs_timeout_ts = 0,
const bool need_rewrite_meta = false);
/**
* @brief freeze one or multiple tablets. if is_sync is true, retry until timeout. or commit an async task and retry
* till die
*
* @param[in] trace_id
* @param[in] tablet_ids
* @param[in] is_sync if is_sync == true, call tablet_freeze_task directly. Or commit an async task to execute
* logstream_freeze_task
* @param[in] need_rewrite_meta
* @param[in] abs_timeout_ts only used when is_sync == true, 0 as default, which means retry for
* ObFreezer::SYNC_FREEZE_DEFAULT_RETRY_TIME seconds
*/
int tablet_freeze(const int64_t trace_id,
const ObIArray<ObTabletID> &tablet_ids,
const bool is_sync,
const int64_t abs_timeout_ts = 0,
const bool need_rewrite_meta = false);
int tablet_freeze_task(const int64_t trace_id,
const ObIArray<ObTabletID> &tablet_ids,
const bool need_rewrite_meta,
const bool is_sync,
const int64_t abs_timeout_ts,
const int64_t freeze_epoch);
// ObTxTable interface
DELEGATE_WITH_RET(tx_table_, get_tx_table_guard, int);
DELEGATE_WITH_RET(tx_table_, get_upper_trans_version_before_given_scn, int);
@ -946,6 +950,10 @@ public:
const share::ObLSRestoreStatus &restore_status,
bool &allow_read);
private:
void record_async_freeze_tablets_(const ObIArray<ObTabletID> &tablet_ids, const int64_t epoch);
void record_async_freeze_tablet_(const ObTabletID &tablet_id, const int64_t epoch);
private:
// StorageBaseUtil
// table manager: create, remove and guard get.

View File

@ -37,7 +37,7 @@ ObDirectLoadTableGuard::ObDirectLoadTableGuard(ObTablet &tablet, const share::SC
void ObDirectLoadTableGuard::reset()
{
const int64_t MAX_HOLD_GUARD_TIME = 1LL * 1000LL * 1000LL; // one second
const int64_t MAX_HOLD_GUARD_TIME = 10LL * 1000LL * 1000LL; // ten seconds
const int64_t reset_time = ObClockGenerator::getClock();
bool need_print_debug_log = false;
if (reset_time - construct_timestamp_ > MAX_HOLD_GUARD_TIME) {
@ -241,7 +241,8 @@ void ObDirectLoadTableGuard::async_freeze_()
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "ls should not be null", K(ret), KPC(this));
} else {
(void)ls->async_tablet_freeze_for_direct_load(tablet_id_);
const bool is_sync = false;
(void)ls->tablet_freeze(tablet_id_, is_sync);
}
}

View File

@ -659,7 +659,6 @@ int ObTabletMemtableMgr::set_is_tablet_freeze_for_active_memtable(
TRANS_LOG(INFO, "not set is_tablet_freeze because the memtable cannot be freezed", KPC(active_tablet_memtable));
}
return ret;
}

View File

@ -95,14 +95,11 @@ int ObTenantFreezer::init()
KP(GCTX.rs_rpc_proxy_), KP(GCTX.rs_mgr_), K(GCONF.self_addr_));
} else if (OB_FAIL(freeze_trigger_pool_.init_and_start(FREEZE_TRIGGER_THREAD_NUM))) {
LOG_WARN("[TenantFreezer] fail to initialize freeze trigger pool", KR(ret));
} else if (OB_FAIL(freeze_thread_pool_.init_and_start(
FREEZE_THREAD_NUM, 13 /* queue_size = 2^13(8192) */))) {
} else if (OB_FAIL(freeze_thread_pool_.init_and_start(FREEZE_THREAD_NUM))) {
LOG_WARN("[TenantFreezer] fail to initialize freeze thread pool", KR(ret));
} else if (OB_FAIL(freeze_trigger_timer_.init_and_start(
freeze_trigger_pool_, TIME_WHEEL_PRECISION, "FrzTrigger"))) {
} else if (OB_FAIL(freeze_trigger_timer_.init_and_start(freeze_trigger_pool_, TIME_WHEEL_PRECISION, "FrzTrigger"))) {
LOG_WARN("[TenantFreezer] fail to initialize freeze trigger timer", K(ret));
} else if (OB_FAIL(rpc_proxy_.init(GCTX.net_frame_->get_req_transport(),
GCONF.self_addr_))) {
} else if (OB_FAIL(rpc_proxy_.init(GCTX.net_frame_->get_req_transport(), GCONF.self_addr_))) {
LOG_WARN("[TenantFreezer] fail to init rpc proxy", KR(ret));
} else {
is_freezing_tx_data_ = false;
@ -205,12 +202,7 @@ bool ObTenantFreezer::exist_ls_freezing()
return exist_ls_freezing_;
}
// force freeze means we must do another freeze rather than use the freeze
// result of others
int ObTenantFreezer::ls_freeze_(ObLS *ls,
const bool is_sync,
const bool need_rewrite_tablet_meta,
const int64_t abs_timeout_ts)
int ObTenantFreezer::ls_freeze_(ObLS *ls, const bool is_sync, const int64_t abs_timeout_ts)
{
int ret = OB_SUCCESS;
const int64_t SLEEP_TS = 1000 * 1000; // 1s
@ -223,14 +215,9 @@ int ObTenantFreezer::ls_freeze_(ObLS *ls,
do {
need_retry = false;
retry_times++;
if (OB_SUCC(ls->logstream_freeze(-1, is_sync, abs_timeout_ts))) {
if (OB_SUCC(ls->logstream_freeze(checkpoint::INVALID_TRACE_ID, is_sync, abs_timeout_ts))) {
} else {
current_ts = ObTimeUtil::current_time();
is_timeout = (current_ts >= abs_timeout_ts);
// retry condition 1
need_retry = (!is_timeout);
// retry condition 2, 3
need_retry = need_retry && ((OB_EAGAIN == ret) || (need_rewrite_tablet_meta && OB_ENTRY_EXIST == ret));
need_retry = (ObClockGenerator::getClock() < abs_timeout_ts) && (OB_EAGAIN == ret);
}
if (need_retry) {
ob_usleep(SLEEP_TS);
@ -296,9 +283,7 @@ int ObTenantFreezer::tablet_freeze_(ObLS *ls,
do {
need_retry = false;
retry_times++;
if (OB_SUCC(need_rewrite_tablet_meta
? ls->tablet_freeze_with_rewrite_meta(tablet_id, nullptr/* result */, abs_timeout_ts)
: ls->tablet_freeze(tablet_id, is_sync, abs_timeout_ts))) {
if (OB_SUCC(ls->tablet_freeze(tablet_id, is_sync, abs_timeout_ts, need_rewrite_tablet_meta))) {
} else {
current_ts = ObTimeUtil::current_time();
is_timeout = (current_ts >= abs_timeout_ts);
@ -337,7 +322,9 @@ int ObTenantFreezer::tenant_freeze_()
for (; OB_SUCC(iter->get_next(ls)); ++ls_cnt) {
// wait until this ls freeze finished to make sure not freeze frequently because
// of this ls freeze stuck.
if (OB_FAIL(ls_freeze_(ls))) {
const bool is_sync = true;
const int64_t abs_timeout_ts = 0;
if (OB_FAIL(ls_freeze_(ls, is_sync, abs_timeout_ts))) {
if (OB_SUCCESS == first_fail_ret) {
first_fail_ret = ret;
}
@ -410,7 +397,7 @@ int ObTenantFreezer::ls_freeze(const share::ObLSID &ls_id)
ObLS *ls = nullptr;
const bool is_sync = false;
const bool need_rewrite_tablet_meta = false;
int64_t abs_timeout_ts = INT64_MAX;
int64_t abs_timeout_ts = 0;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -423,7 +410,7 @@ int ObTenantFreezer::ls_freeze(const share::ObLSID &ls_id)
} else if (OB_ISNULL(ls = handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("[TenantFreezer] ls is null", KR(ret), K(ls_id));
} else if (OB_FAIL(ls_freeze_(ls, is_sync, need_rewrite_tablet_meta, abs_timeout_ts))) {
} else if (OB_FAIL(ls_freeze_(ls, is_sync, abs_timeout_ts))) {
LOG_WARN("[TenantFreezer] logstream freeze failed", KR(ret), K(ls_id));
}
@ -451,7 +438,6 @@ int ObTenantFreezer::tablet_freeze(share::ObLSID ls_id,
ObLSHandle handle;
ObLS *ls = nullptr;
int64_t abs_timeout_ts = INT64_MAX;
FLOG_INFO("[TenantFreezer] tablet_freeze start", KR(ret), K(tablet_id));
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -466,6 +452,14 @@ int ObTenantFreezer::tablet_freeze(share::ObLSID ls_id,
}
}
FLOG_INFO("[TenantFreezer] tablet_freeze start",
KR(ret),
K(ls_id),
K(is_sync),
K(need_rewrite_tablet_meta),
K(tablet_id),
KTIME(abs_timeout_ts));
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ls_srv->get_ls(ls_id, handle, ObLSGetMod::TXSTORAGE_MOD))) {
LOG_WARN("[TenantFreezer] fail to get ls", K(ret), K(ls_id));
@ -1831,5 +1825,7 @@ void ObTenantFreezerStatHistory::reset()
length_ = 0;
}
}
}

View File

@ -210,6 +210,8 @@ public:
int64_t &pos);
// if major freeze is failed and need retry, set the major freeze into at retry_major_info_.
const ObRetryMajorInfo &get_retry_major_info() const { return retry_major_info_; }
void record_freeze_failed_tablet(const ObTabletID &tablet_id);
void erase_freeze_failed_tablet(const ObTabletID &tablet_id);
void set_retry_major_info(const ObRetryMajorInfo &retry_major_info)
{
retry_major_info_ = retry_major_info;
@ -238,10 +240,7 @@ private:
int64_t &last_check_timestamp,
bool &is_out_of_mem,
const bool from_user = true);
static int ls_freeze_(ObLS *ls,
const bool is_sync = true,
const bool need_rewrite_tablet_meta = true,
const int64_t abs_timeout_ts = INT64_MAX);
static int ls_freeze_(ObLS *ls, const bool is_sync, const int64_t abs_timeout_ts);
static int ls_freeze_all_unit_(ObLS *ls,
const int64_t abs_timeout_ts = INT64_MAX);
static int tablet_freeze_(ObLS *ls,

View File

@ -22,12 +22,12 @@ namespace oceanbase
namespace storage
{
int __attribute__((weak)) build_test_schema(share::schema::ObTableSchema &table_schema, uint64_t table_id)
int __attribute__((weak)) build_test_schema(share::schema::ObTableSchema &table_schema, uint64_t table_id, const char* table_name)
{
int ret = OB_SUCCESS;
ObColumnSchemaV2 column;
table_schema.reset();
table_schema.set_table_name("test_merge");
table_schema.set_table_name(table_name);
table_schema.set_tenant_id(1);
table_schema.set_tablegroup_id(1);
table_schema.set_database_id(1);
@ -47,6 +47,11 @@ int __attribute__((weak)) build_test_schema(share::schema::ObTableSchema &table
return ret;
}
int __attribute__((weak)) build_test_schema(share::schema::ObTableSchema &table_schema, uint64_t table_id)
{
return build_test_schema(table_schema, table_id, "test_merge");
}
int __attribute__((weak)) gen_create_ls_arg(const int64_t tenant_id,
const share::ObLSID &ls_id,
obrpc::ObCreateLSArg &arg)