Files
oceanbase/mittest/simple_server/mv/test_collect_mv.cpp
2025-02-10 08:46:19 +00:00

830 lines
35 KiB
C++

// owner: lana.lgx
// owner group: data
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#define USING_LOG_PREFIX SERVER
#define protected public
#define private public
#include "simple_server/env/ob_simple_cluster_test_base.h"
#include "rootserver/ob_tenant_balance_service.h"
#include "share/balance/ob_balance_job_table_operator.h"
#include "mittest/env/ob_simple_server_helper.h"
#include "storage/concurrency_control/ob_multi_version_garbage_collector.h"
#include "rootserver/mview/ob_mview_maintenance_service.h"
#include "storage/compaction/ob_tenant_tablet_scheduler.h"
namespace oceanbase
{
namespace unittest
{
using namespace oceanbase::transaction;
using namespace oceanbase::storage;
using namespace oceanbase::rootserver;
using namespace oceanbase::compaction;
#define EQ(x, y) GTEST_ASSERT_EQ(x, y);
#define NE(x, y) GTEST_ASSERT_NE(x, y);
#define LE(x, y) GTEST_ASSERT_LE(x, y);
#define GE(x, y) GTEST_ASSERT_GE(x, y);
#define GT(x, y) GTEST_ASSERT_GT(x, y);
#define LT(x, y) GTEST_ASSERT_LT(x, y);
class TestRunCtx
{
public:
uint64_t tenant_id_ = 0;
int64_t time_sec_ = 0;
int64_t start_time_ = ObTimeUtil::current_time();
bool stop_ = false;
std::thread th_;
std::thread worker_;
};
TestRunCtx R;
class ObCollectMV : public ObSimpleClusterTestBase
{
public:
// 指定case运行目录前缀 test_ob_simple_cluster_
ObCollectMV() : ObSimpleClusterTestBase("test_collect_mv_", "20G", "20G") {}
int wait_major_mv_refresh_finish(uint64_t scn);
int wait_acquired_snapshot_advance(uint64_t scn);
int wait_old_sstable_gc_end(uint64_t scn);
void process();
int do_balance_inner_(uint64_t tenant_id);
private:
};
int ObCollectMV::wait_major_mv_refresh_finish(uint64_t scn)
{
int ret = OB_SUCCESS;
ObSqlString sql_string;
sql_string.assign_fmt("select min(last_refresh_scn) val from oceanbase.__all_mview where refresh_mode=4");
uint64_t val = 0;
while (OB_SUCC(ret)) {
if (OB_FAIL(SSH::g_select_uint64(R.tenant_id_, sql_string.ptr(), val))) {
} else if (val >= scn) {
break;
} else {
LOG_INFO("wait major mv refresh", K(scn), K(val));
::sleep(2);
}
}
return ret;
}
int ObCollectMV::wait_acquired_snapshot_advance(uint64_t scn)
{
int ret = OB_SUCCESS;
ObSqlString sql_string;
sql_string.assign_fmt("select min(snapshot_scn) val from oceanbase.__all_acquired_snapshot where snapshot_type=5");
uint64_t val = 0;
while (OB_SUCC(ret)) {
if (OB_FAIL(SSH::g_select_uint64(R.tenant_id_, sql_string.ptr(), val))) {
} else if (val >= scn) {
break;
} else {
LOG_INFO("wait acquired_snapshot_advance", K(scn), K(val));
MTL_SWITCH(R.tenant_id_) {
storage::ObTenantFreezeInfoMgr *mgr = MTL(storage::ObTenantFreezeInfoMgr *);
const int64_t snapshot_for_tx = mgr->get_min_reserved_snapshot_for_tx();
LOGI("wait_acquired_snapshot_advance: %ld %ld %ld", scn, snapshot_for_tx,val);
}
::sleep(2);
}
}
return ret;
}
int ObCollectMV::wait_old_sstable_gc_end(uint64_t scn)
{
int ret = OB_SUCCESS;
ObSqlString sql_string;
sql_string.assign_fmt("select min(end_log_scn) val from oceanbase.__all_virtual_table_mgr where table_type=10 and tablet_id>200000");
uint64_t val = 0;
while (OB_SUCC(ret)) {
if (OB_FAIL(SSH::g_select_uint64(R.tenant_id_, sql_string.ptr(), val))) {
} else if (val >= scn) {
break;
} else {
LOG_INFO("wait old sstable gc", K(scn), K(val));
::sleep(2);
}
}
return ret;
}
void ObCollectMV::process()
{
int ret = OB_SUCCESS;
MTL_SWITCH(R.tenant_id_) {
ObMViewMaintenanceService *mv_service = MTL(ObMViewMaintenanceService*);
mv_service->mview_push_refresh_scn_task_.runTimerTask();
mv_service->mview_push_snapshot_task_.runTimerTask();
mv_service->replica_safe_check_task_.runTimerTask();
mv_service->collect_mv_merge_info_task_.runTimerTask();
mv_service->mview_clean_snapshot_task_.runTimerTask();
MTL(ObTenantTabletScheduler*)->timer_task_mgr_.sstable_gc_task_.runTimerTask();
}
}
int ObCollectMV::do_balance_inner_(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
static std::mutex mutex;
mutex.lock();
MTL_SWITCH(tenant_id) {
LOG_INFO("worker to do partition_balance");
auto b_svr = MTL(rootserver::ObTenantBalanceService*);
b_svr->reset();
b_svr->stop();
int64_t job_cnt = 0;
int64_t start_time = OB_INVALID_TIMESTAMP, finish_time = OB_INVALID_TIMESTAMP;
ObBalanceJob job;
if (OB_FAIL(b_svr->gather_stat_())) {
LOG_WARN("failed to gather stat", KR(ret));
} else if (OB_FAIL(b_svr->gather_ls_status_stat(tenant_id, b_svr->ls_array_))) {
LOG_WARN("failed to gather stat", KR(ret));
} else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job(
tenant_id, false, *GCTX.sql_proxy_, job, start_time, finish_time))) {
if (OB_ENTRY_NOT_EXIST == ret) {
//NO JOB, need check current ls status
ret = OB_SUCCESS;
job_cnt = 0;
} else {
LOG_WARN("failed to get balance job", KR(ret), K(tenant_id));
}
} else if (OB_FAIL(b_svr->try_finish_current_job_(job, job_cnt))) {
LOG_WARN("failed to finish current job", KR(ret), K(job));
}
if (OB_SUCC(ret) && job_cnt == 0 && OB_FAIL(b_svr->partition_balance_(true))) {
LOG_WARN("failed to do partition balance", KR(ret));
}
}
mutex.unlock();
return ret;
}
TEST_F(ObCollectMV, prepare)
{
int ret = OB_SUCCESS;
LOGI("observer start");
R.th_ = std::thread([]() {
while (!R.stop_) {
fflush(stdout);
::usleep(100 * 1000);
}});
concurrency_control::ObMultiVersionGarbageCollector::GARBAGE_COLLECT_EXEC_INTERVAL = 1_s;
concurrency_control::ObMultiVersionGarbageCollector::GARBAGE_COLLECT_RETRY_INTERVAL = 3_s;
concurrency_control::ObMultiVersionGarbageCollector::GARBAGE_COLLECT_RECLAIM_DURATION = 3_s;
LOGI("create tenant begin");
// 创建普通租户tt1
EQ(OB_SUCCESS, create_tenant("tt1", "12G", "16G", false, 10));
// 获取租户tt1的tenant_id
EQ(OB_SUCCESS, get_tenant_id(R.tenant_id_));
NE(0, R.tenant_id_);
// 初始化普通租户tt1的sql proxy
EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
int tenant_id = R.tenant_id_;
LOGI("create tenant finish");
// 在单节点ObServer下创建新的日志流, 注意避免被RS任务GC掉
EQ(0, SSH::create_ls(R.tenant_id_, get_curr_observer().self_addr_));
int64_t ls_count = 0;
EQ(0, SSH::g_select_int64(R.tenant_id_, "select count(ls_id) as val from oceanbase.__all_ls where ls_id!=1", ls_count));
EQ(2, ls_count);
int64_t affected_rows;
EQ(0, GCTX.sql_proxy_->write("alter system set _enable_parallel_table_creation = false tenant=all", affected_rows));
EQ(0, GCTX.sql_proxy_->write("alter system set undo_retention='0' tenant=all", affected_rows));
EQ(0, GCTX.sql_proxy_->write("alter system set ob_compaction_schedule_interval='5s' tenant=all", affected_rows));
EQ(0, GCTX.sql_proxy_->write("alter system set merger_check_interval='10s' tenant=all", affected_rows));
MTL_SWITCH(R.tenant_id_) {
TG_STOP(MTL(ObTenantTabletScheduler *)->timer_task_mgr_.sstable_gc_tg_id_);
}
R.worker_ = std::thread([this]() {
while (!R.stop_) {
process();
do_balance_inner_(R.tenant_id_);
::sleep(3);
}});
}
TEST_F(ObCollectMV, keep_snapshot_for_long_tx)
{
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
int64_t affected_rows;
int ret = OB_SUCCESS;
EQ(0, sql_proxy.write("create table t(c1 int)", affected_rows));
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0"));
EQ(0, SSH::write(conn, "SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ"));
EQ(0, SSH::write(conn, "set ob_query_timeout=99900000000"));
EQ(0, SSH::write(conn, "set ob_trx_timeout=1000000000"));
int64_t val = 0;
EQ(0, SSH::select_int64(conn, "select count(*) val from t", val));
ObString trace_id;
ObTransID tx_id;
int64_t session_id = -1;
EQ(0, SSH::find_session(conn, session_id));
EQ(0, SSH::find_trace_id(conn, trace_id));
EQ(0, SSH::find_tx(conn, tx_id));
ObTxDesc *tx_desc = NULL;
EQ(0, SSH::get_tx_desc(R.tenant_id_, tx_id, tx_desc));
uint64_t read_snapshot = tx_desc->snapshot_version_.get_val_for_gts();
int tx_state = (int)tx_desc->state_;
EQ(0, SSH::revert_tx_desc(R.tenant_id_, tx_desc));
GT(read_snapshot, 0);
LOGI("session_id:%ld tx_id:%ld read_snapshot:%ld %d", session_id, tx_id.tx_id_,read_snapshot, tx_state);
bool wait_end = false;
while (OB_SUCC(ret) && !wait_end) {
::sleep(2);
MTL_SWITCH(R.tenant_id_) {
storage::ObTenantFreezeInfoMgr *mgr = MTL(storage::ObTenantFreezeInfoMgr *);
const int64_t snapshot_for_tx = mgr->get_min_reserved_snapshot_for_tx();
LOGI("snapshot_for_tx:%ld", snapshot_for_tx);
if (snapshot_for_tx == read_snapshot) {
wait_end = true;
} else if (snapshot_for_tx > read_snapshot) {
ret = OB_ERR_UNEXPECTED;
LOGE("snapshot_for_tx big than read_snapshot: %ld %ld", snapshot_for_tx, read_snapshot);
}
}
}
EQ(OB_SUCCESS, ret);
EQ(0, SSH::select_int64(conn, "select count(*) val from t", val));
EQ(0, conn->commit());
EQ(0, sql_proxy.close(conn, true));
}
TEST_F(ObCollectMV, update_full_column)
{
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
int64_t affected_rows = 0;
LOGI("create tg");
EQ(0, sql_proxy.write("create tablegroup ufc_tg1 sharding='PARTITION'", affected_rows));
LOGI("create table");
EQ(0, sql_proxy.write("create table ufc_t1(c1 int,c2 int, primary key(c1,c2)) tablegroup='ufc_tg1' partition by hash(c1) partitions 2", affected_rows));
EQ(0, sql_proxy.write("create table ufc_t2(c2 int, c3 int, c4 int,c5 int,c6 int,c7 int ,primary key(c2)) duplicate_scope = 'cluster' duplicate_read_consistency='weak'", affected_rows));
EQ(0, sql_proxy.write("insert into ufc_t1 values(1,1),(2,2),(3,3)", affected_rows));
EQ(0, sql_proxy.write("insert into ufc_t2 values(1,100,1000,1000,1000,10000),(2,200,2000,2000,2000,2000),(3,300,3000,3000,3000,3000)", affected_rows));
EQ(0, sql_proxy.write("update ufc_t2 set c3 = 101 where c2=1", affected_rows));
EQ(0, sql_proxy.write("update ufc_t2 set c3 = 201,c4=20001 where c2=2", affected_rows));
EQ(0, sql_proxy.write("update ufc_t2 set c3 = 301,c4=30001,c5=30001,c6=30001,c7=30001 where c2=3", affected_rows));
int64_t val = 0;
EQ(0, SSH::g_select_int64(R.tenant_id_, "select tablet_id val from oceanbase.__all_table where table_name='ufc_t2'", val));
ObTabletID tablet_id(val);
EQ(0, SSH::g_select_int64(R.tenant_id_, "select ls_id val from oceanbase.__all_tablet_to_ls where tablet_id in (select tablet_id from oceanbase.__all_table where table_name='ufc_t2')", val));
ObTenantSwitchGuard tenant_guard;
EQ(0, tenant_guard.switch_to(R.tenant_id_));
ObLSID ls_id(val);
ObLSHandle ls_handle;
EQ(0, MTL(ObLSService*)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD));
ObTabletHandle tablet_handle;
EQ(0, ls_handle.get_ls()->get_tablet(tablet_id, tablet_handle));
ObTableHandleV2 table_handle;
EQ(0, tablet_handle.get_obj()->get_active_memtable(table_handle));
memtable::ObMemtable *memtable = NULL;
EQ(0, table_handle.get_data_memtable(memtable));
using namespace memtable;
ObQueryEngine::Iterator<typename ObQueryEngine::BtreeIterator> iter;
memtable::ObStoreRowkeyWrapper scan_start_key_wrapper(&ObStoreRowkey::MIN_STORE_ROWKEY);
memtable::ObStoreRowkeyWrapper scan_end_key_wrapper(&ObStoreRowkey::MAX_STORE_ROWKEY);
iter.reset();
memtable::ObQueryEngine::KeyBtree &kbtree = memtable->query_engine_.keybtree_;
EQ(0, kbtree.set_key_range(iter.get_read_handle(),
scan_start_key_wrapper,
true, /*start_exclusive*/
scan_end_key_wrapper,
true /*end_exclusive*/));
{
int ret = OB_SUCCESS;
blocksstable::ObRowReader row_reader;
blocksstable::ObDatumRow datum_row;
SMART_VAR(ObCStringHelper, helper) {
for (int64_t row_idx = 0; OB_SUCC(ret) && OB_SUCC(iter.next_internal()); row_idx++) {
const ObMemtableKey *key = iter.get_key();
ObMvccRow *row = iter.get_value();
for (ObMvccTransNode *node = row->get_list_head(); OB_SUCC(ret) && OB_NOT_NULL(node); node = node->prev_) {
const ObMemtableDataHeader *mtd = reinterpret_cast<const ObMemtableDataHeader *>(node->buf_);
helper.reset();
if (OB_FAIL(row_reader.read_row(mtd->buf_, mtd->buf_len_, nullptr, datum_row))) {
TRANS_LOG(WARN, "Failed to read datum row", K(ret));
} else {
if (node->get_dml_flag() == ObDmlFlag::DF_UPDATE) {
printf("update column count: %ld\n", datum_row.get_column_count());
if (datum_row.get_column_count() != 6) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "column count not expected", K(ret), K(datum_row.get_column_count()));
}
}
}
}
}
}
if (ret == OB_ITER_END) {
ret = OB_SUCCESS;
}
EQ(0, ret);
}
}
TEST_F(ObCollectMV, basic)
{
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
int64_t affected_rows = 0;
LOGI("create tg");
EQ(0, sql_proxy.write("create tablegroup tg1 sharding='PARTITION'", affected_rows));
LOGI("create table");
EQ(0, sql_proxy.write("create table t1(c1 int,c2 int, primary key(c1,c2)) tablegroup='tg1' partition by hash(c1) partitions 2", affected_rows));
EQ(0, sql_proxy.write("create table t2(c2 int, c3 int, primary key(c2)) duplicate_scope = 'cluster' duplicate_read_consistency='weak'", affected_rows));
EQ(0, sql_proxy.write("insert into t1 values(1,1),(2,2),(3,3)", affected_rows));
EQ(0, sql_proxy.write("insert into t2 values(1,100),(2,200),(3,300)", affected_rows));
LOGI("create mview");
EQ(0, sql_proxy.write(
"create materialized view compact_mv_1 (primary key(t1_c1,t1_c2)) tablegroup='tg1' "
"partition by hash(t1_c1) partitions 2 "
"REFRESH FAST ON DEMAND ENABLE QUERY REWRITE ENABLE ON QUERY COMPUTATION "
"as select /*+read_consistency(weak) use_nl(t1 t2) leading(t1 t2) "
"use_das(t2) no_use_nl_materialization(t2)*/ t1.c1 t1_c1,t1.c2 t1_c2,t2.c2 t2_c2,t2.c3 t2_c3 "
"from t1 join t2 on t1.c2=t2.c2",
affected_rows));
int64_t not_match_count = 0;
EQ(0, SSH::select_int64(sql_proxy, "select count(*) val from (select * from test.compact_mv_1 except (select t1.*,t2.* from t1,t2 where t1.c2=t2.c2) union all select t1.*,t2.* from test.t1,test.t2 where t1.c2=t2.c2 except select * from test.compact_mv_1)", not_match_count));
LOGI("check refresh_mode");
int64_t refresh_mode = 0;
EQ(0, SSH::g_select_int64(R.tenant_id_, "select refresh_mode val from oceanbase.__all_mview t1, oceanbase.__all_table t2 where t1.mview_id=t2.table_id and t2.table_name='compact_mv_1'", refresh_mode));
EQ(4, refresh_mode);
LOGI("get refresh_scn");
uint64_t refresh_scn = 0;
EQ(0, SSH::g_select_uint64(R.tenant_id_, "select last_refresh_scn val from oceanbase.__all_mview t1, oceanbase.__all_table t2 where t1.mview_id=t2.table_id and t2.table_name='compact_mv_1'", refresh_scn));
GT(refresh_scn, 0);
ObSqlString sql_string;
int64_t now = ObTimeUtil::current_time_ns();
sql_string.assign_fmt("select /*+no_mv_rewrite*/ count(*) val from compact_mv_1 as of snapshot %ld", now);
int64_t val = 0;
EQ(0, SSH::select_int64(sql_proxy, sql_string.ptr(), val));
LOGI("start major freeze>>>");
EQ(0, sql_proxy.write("alter system major freeze", affected_rows));
uint64_t new_freeze_scn = 0;
EQ(0, SSH::g_select_uint64(R.tenant_id_, "select max(frozen_scn) val from oceanbase.__all_freeze_info", new_freeze_scn));
GT(new_freeze_scn, refresh_scn);
LOGI("wait major mv refresh finish>>>");
EQ(0, wait_major_mv_refresh_finish(new_freeze_scn));
LOGI("wait acquired snapshot advance>>>");
EQ(0, wait_acquired_snapshot_advance(new_freeze_scn));
LOGI("wait old sstable gc>>>");
EQ(0, wait_old_sstable_gc_end(new_freeze_scn));
NE(0, SSH::select_int64(sql_proxy, sql_string.ptr(), affected_rows));
sql_string.assign_fmt("select /*+no_mv_rewrite*/ count(*) val from compact_mv_1 as of snapshot %ld", new_freeze_scn);
EQ(0, SSH::select_int64(sql_proxy, sql_string.ptr(), val));
}
TEST_F(ObCollectMV, read_safe)
{
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
int64_t affected_rows;
int ret = OB_SUCCESS;
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0"));
EQ(0, SSH::write(conn, "SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ"));
EQ(0, SSH::write(conn, "set ob_query_timeout=99900000000"));
EQ(0, SSH::write(conn, "set ob_trx_timeout=1000000000"));
int64_t val = 0;
EQ(0, SSH::select_int64(conn, "select count(*) val from compact_mv_1", val));
ObString trace_id;
ObTransID tx_id;
EQ(0, SSH::find_trace_id(conn, trace_id));
EQ(0, SSH::find_tx(conn, tx_id));
ObTxDesc *tx_desc = NULL;
EQ(0, SSH::get_tx_desc(R.tenant_id_, tx_id, tx_desc));
uint64_t read_snapshot = tx_desc->snapshot_version_.get_val_for_gts();
EQ(0, SSH::revert_tx_desc(R.tenant_id_, tx_desc));
LOGI("read trans stmt trace_id: %s tx_id: %ld snapshot: %ld", trace_id.ptr(), tx_id.tx_id_, read_snapshot);
LOGI("start major freeze >>>");
EQ(0, sql_proxy.write("alter system major freeze", affected_rows));
uint64_t new_freeze_scn = 0;
EQ(0, SSH::g_select_uint64(R.tenant_id_, "select max(frozen_scn) val from oceanbase.__all_freeze_info", new_freeze_scn));
LOGI("wait major mv refresh finish >>>");
EQ(0, wait_major_mv_refresh_finish(new_freeze_scn));
LOGI("read with old read_snapshot");
EQ(0, SSH::select_int64(conn, "select count(*) val from compact_mv_1", val));
EQ(0, SSH::find_trace_id(conn, trace_id));
LOGI("read trans stmt trace_id: %s", trace_id.ptr());
::sleep(30);
int64_t snapshot_for_tx = 0;
MTL_SWITCH(R.tenant_id_) {
storage::ObTenantFreezeInfoMgr *mgr = MTL(storage::ObTenantFreezeInfoMgr *);
snapshot_for_tx = mgr->get_min_reserved_snapshot_for_tx();
}
EQ(OB_SUCCESS, ret);
LOGI("read with old read_snapshot snapshot_for_tx:%ld read_snapshot:%ld", snapshot_for_tx, read_snapshot);
LE(snapshot_for_tx, read_snapshot);
EQ(0, SSH::select_int64(conn, "select count(*) val from compact_mv_1", val));
EQ(0, SSH::find_trace_id(conn, trace_id));
LOGI("read trans stmt trace_id: %s", trace_id.ptr());
EQ(0, conn->commit());
EQ(0, sql_proxy.close(conn, true));
LOGI("wait old sstable gc>>>");
EQ(0, wait_old_sstable_gc_end(new_freeze_scn));
}
TEST_F(ObCollectMV, snapshot_gc)
{
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
ObSqlString sql_string;
int64_t affected_rows = 0;
int64_t base_tablet_id = 0;
int64_t cnt = 0;
int64_t snapshot_val = 0;
LOGI("create tg");
EQ(0, sql_proxy.write("create tablegroup gc_tg1 sharding='PARTITION'", affected_rows));
LOGI("create table");
EQ(0, sql_proxy.write("create table gc_t1(c1 int,c2 int, primary key(c1,c2)) tablegroup='gc_tg1' "
"partition by hash(c1) partitions 2",
affected_rows));
EQ(0, sql_proxy.write("create table gc_t2(c2 int, c3 int, primary key(c2)) duplicate_scope = "
"'cluster' duplicate_read_consistency='weak'",
affected_rows));
EQ(0, SSH::select_int64(
sql_proxy, "select t2.tablet_id val from oceanbase.__all_table t1 join oceanbase.__all_tablet_to_ls t2 on t1.table_id = t2.table_id where t1.table_name = 'gc_t1' limit 1",
base_tablet_id));
LOGI("create mview1");
int64_t mv1_tablet_id = 0;
int64_t snapshot_scn_of_base_by_mv1 = 0;
EQ(0, sql_proxy.write(
"create materialized view gc_compact_mv_1 (primary key(t1_c1,t1_c2)) tablegroup='gc_tg1' "
"partition by hash(t1_c1) partitions 2 "
"REFRESH FAST ON DEMAND ENABLE QUERY REWRITE ENABLE ON QUERY COMPUTATION "
"as select /*+read_consistency(weak) use_nl(t1 t2) leading(t1 t2) "
"use_das(t2) no_use_nl_materialization(t2)*/ t1.c1 t1_c1,t1.c2 t1_c2,t2.c2 t2_c2,t2.c3 t2_c3 "
"from gc_t1 t1 join gc_t2 t2 on t1.c2=t2.c2",
affected_rows));
EQ(0, SSH::select_int64(
sql_proxy,
"select t3.tablet_id val from oceanbase.__all_table t1, oceanbase.__all_table t2, oceanbase.__all_tablet_to_ls t3 where "
"t1.table_id = t2.data_table_id and t2.table_name='gc_compact_mv_1' and t1.table_id = t3.table_id limit 1",
mv1_tablet_id));
sql_string.assign_fmt(
"select count(*) val from oceanbase.__all_acquired_snapshot where tablet_id = %ld",
mv1_tablet_id);
EQ(0, SSH::select_int64(sql_proxy, sql_string.ptr(), cnt));
EQ(1, cnt);
sql_string.assign_fmt(
"select snapshot_scn val from oceanbase.__all_acquired_snapshot where tablet_id = %ld",
base_tablet_id);
EQ(0, SSH::select_int64(sql_proxy, sql_string.ptr(), snapshot_scn_of_base_by_mv1));
LOGI("create mview2");
int64_t mv2_tablet_id = 0;
EQ(0, sql_proxy.write(
"create materialized view gc_compact_mv_2 (primary key(t1_c1,t1_c2)) tablegroup='gc_tg1' "
"partition by "
"hash(t1_c1) partitions 2 REFRESH FAST ON DEMAND ENABLE QUERY REWRITE ENABLE ON QUERY "
"COMPUTATION as select /*+read_consistency(weak) use_nl(t1 t2) leading(t1 t2) "
"use_das(t2) no_use_nl_materialization(t2)*/ t1.c1 t1_c1,t1.c2 t1_c2,t2.c2 t2_c2,t2.c3 t2_c3 "
"from gc_t1 t1 join gc_t2 t2 on t1.c2=t2.c2",
affected_rows));
EQ(0, SSH::select_int64(
sql_proxy,
"select t3.tablet_id val from oceanbase.__all_table t1, oceanbase.__all_table t2, oceanbase.__all_tablet_to_ls t3 where "
"t1.table_id = t2.data_table_id and t2.table_name='gc_compact_mv_2' and t1.table_id = t3.table_id limit 1",
mv2_tablet_id));
sql_string.assign_fmt(
"select count(*) val from oceanbase.__all_acquired_snapshot where tablet_id = %ld", mv2_tablet_id);
EQ(0, SSH::select_int64(sql_proxy, sql_string.ptr(), cnt));
EQ(1, cnt);
LOGI("create mview3");
int64_t mv3_tablet_id = 0;
EQ(0, sql_proxy.write(
"create materialized view gc_compact_mv_3 (primary key(t1_c1,t1_c2)) tablegroup='gc_tg1' "
"partition by hash(t1_c1) partitions 2 "
"REFRESH FAST ON DEMAND ENABLE QUERY REWRITE ENABLE ON QUERY COMPUTATION "
"as select /*+read_consistency(weak) use_nl(t1 t2) leading(t1 t2) "
"use_das(t2) no_use_nl_materialization(t2)*/ t1.c1 t1_c1,t1.c2 t1_c2,t2.c2 t2_c2,t2.c3 t2_c3 "
"from gc_t1 t1 join gc_t2 t2 on t1.c2=t2.c2",
affected_rows));
EQ(0, SSH::select_int64(
sql_proxy,
"select t3.tablet_id val from oceanbase.__all_table t1, oceanbase.__all_table t2, oceanbase.__all_tablet_to_ls t3 where "
"t1.table_id = t2.data_table_id and t2.table_name='gc_compact_mv_3' and t1.table_id = t3.table_id limit 1",
mv3_tablet_id));
sql_string.assign_fmt(
"select count(*) val from oceanbase.__all_acquired_snapshot where tablet_id = %ld", mv3_tablet_id);
EQ(0, SSH::select_int64(sql_proxy, sql_string.ptr(), cnt));
EQ(1, cnt);
while (true) {
sql_string.assign_fmt(
"select count(*) val from oceanbase.__all_acquired_snapshot where tablet_id = %ld",
base_tablet_id);
EQ(0, SSH::select_int64(sql_proxy, sql_string.ptr(), cnt));
if (cnt == 1) {
break;
}
::sleep(2);
LOGI("wait remove redundant snapshot");
}
sql_string.assign_fmt(
"select snapshot_scn val from oceanbase.__all_acquired_snapshot where tablet_id = %ld",
base_tablet_id);
EQ(0, SSH::select_int64(sql_proxy, sql_string.ptr(), snapshot_val));
// 清除冗余的快照后,留下的应该是最小的快照
EQ(snapshot_scn_of_base_by_mv1, snapshot_val);
LOGI("remove mview1");
EQ(0, sql_proxy.write("drop materialized view gc_compact_mv_1", affected_rows));
while (true) {
sql_string.assign_fmt(
"select count(*) val from oceanbase.__all_acquired_snapshot where tablet_id = %ld",
mv1_tablet_id);
EQ(0, SSH::select_int64(sql_proxy, sql_string.ptr(), cnt));
if (cnt == 0) {
break;
}
::sleep(2);
LOGI("wait remove snapshot of mview1");
}
sql_string.assign_fmt(
"select count(*) val from oceanbase.__all_acquired_snapshot where tablet_id = %ld",
base_tablet_id);
EQ(0, SSH::select_int64(sql_proxy, sql_string.ptr(), cnt));
// 基表的快照还不应该被回收掉
EQ(1, cnt);
LOGI("remove mview2 and mview3");
EQ(0, sql_proxy.write("drop materialized view gc_compact_mv_2", affected_rows));
EQ(0, sql_proxy.write("drop materialized view gc_compact_mv_3", affected_rows));
while (true) {
sql_string.assign_fmt("select count(*) val from oceanbase.__all_acquired_snapshot where "
"tablet_id = %ld or tablet_id = %ld",
mv2_tablet_id, mv3_tablet_id);
EQ(0, SSH::select_int64(sql_proxy, sql_string.ptr(), cnt));
if (cnt == 0) {
break;
}
::sleep(2);
LOGI("wait remove snapshot of mview2 and mview3");
}
sql_string.assign_fmt(
"select count(*) val from oceanbase.__all_acquired_snapshot where tablet_id = %ld",
base_tablet_id);
EQ(0, SSH::select_int64(sql_proxy, sql_string.ptr(), cnt));
// 基表的快照应该被回收掉了
EQ(0, cnt);
LOGI("clean up");
EQ(0, sql_proxy.write("drop table gc_t1", affected_rows));
EQ(0, sql_proxy.write("drop table gc_t2", affected_rows));
}
TEST_F(ObCollectMV, tablegroup)
{
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
int64_t affected_rows = 0;
EQ(0, sql_proxy.write("create table t100(c1 int) partition by hash(c1) partitions 2", affected_rows));
EQ(0, sql_proxy.write("create tablegroup tg100 sharding='PARTITION'", affected_rows));
EQ(0, sql_proxy.write("create materialized view mv100 partition by hash(c1) partitions 2 as select * from t100", affected_rows));
EQ(0, sql_proxy.write("alter table t100 tablegroup='tg100'", affected_rows));
EQ(0, sql_proxy.write("alter table mv100 tablegroup='tg100'", affected_rows));
int64_t val = -2;
EQ(0, SSH::select_int64(sql_proxy, "select tablegroup_id val from oceanbase.__all_table where table_name='t100'", val));
GT(val, 0);
// mview
val = -2;
EQ(0, SSH::select_int64(sql_proxy, "select tablegroup_id val from oceanbase.__all_table where table_name='mv100'", val));
EQ(val, -1);
//mview container
val = -2;
EQ(0, SSH::select_int64(sql_proxy, "select tablegroup_id val from oceanbase.__all_table where table_id in ( \
select data_table_id from oceanbase.__all_table where table_name='mv100')", val));
GT(val, 0);
EQ(0, sql_proxy.write("alter table mv100 tablegroup=''", affected_rows));
EQ(0, sql_proxy.write("alter table t100 tablegroup=''", affected_rows));
EQ(0, sql_proxy.write("drop tablegroup tg100", affected_rows));
EQ(0, sql_proxy.write("drop table t100", affected_rows));
EQ(0, sql_proxy.write("drop materialized view mv100", affected_rows));
}
TEST_F(ObCollectMV, new_ls)
{
int ret = OB_SUCCESS;
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
int64_t val = 0;
EQ(0, SSH::select_int64(sql_proxy, "select column_value val from oceanbase.__all_core_table where column_name='major_refresh_mv_merge_scn'", val));
GT(val, 0);
int64_t merge_scn = val;
EQ(0, SSH::create_ls(R.tenant_id_, get_curr_observer().self_addr_));
EQ(0, SSH::select_int64(sql_proxy, "select max(ls_id) val from oceanbase.__all_ls", val));
ObLSID ls_id(val);
ObLSMeta ls_meta;
MTL_SWITCH(R.tenant_id_) {
ObLSHandle ls_handle;
if (OB_FAIL(MTL(ObLSService*)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
} else if (OB_FAIL(ls_handle.get_ls()->get_ls_meta(ls_meta))) {
}
}
EQ(0, ret);
EQ(ls_meta.major_mv_merge_info_.major_mv_merge_scn_.get_val_for_gts(), merge_scn);
}
TEST_F(ObCollectMV, mv_transfer)
{
int ret = OB_SUCCESS;
int64_t affected_rows = 0;
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
EQ(0, sql_proxy.write("create table tf_t1(c1 int,c2 int, primary key(c1,c2)) "
"partition by hash(c1) partitions 2",
affected_rows));
EQ(0, sql_proxy.write("create table tf_t2(c2 int, c3 int, primary key(c2)) duplicate_scope = "
"'cluster' duplicate_read_consistency='weak'",
affected_rows));
EQ(0, sql_proxy.write(
"create materialized view tf_compact_mv_1 (primary key(t1_c1,t1_c2)) tablegroup='tg1' "
"partition by hash(t1_c1) partitions 2 "
"REFRESH FAST ON DEMAND ENABLE QUERY REWRITE ENABLE ON QUERY COMPUTATION "
"as select /*+read_consistency(weak) use_nl(t1 t2) leading(t1 t2) "
"use_das(t2) no_use_nl_materialization(t2)*/ t1.c1 t1_c1,t1.c2 t1_c2,t2.c2 t2_c2,t2.c3 "
"t2_c3 "
"from tf_t1 t1 join tf_t2 t2 on t1.c2=t2.c2",
affected_rows));
EQ(0, sql_proxy.write("create tablegroup tg200 sharding='NONE'", affected_rows));
EQ(0, sql_proxy.write("alter table tf_compact_mv_1 tablegroup='tg200'", affected_rows));
int64_t val = -1;
while (true) {
EQ(0, SSH::select_int64(sql_proxy, "select count(distinct ls_id) val from oceanbase.__all_tablet_to_ls where table_id in (select data_table_id from oceanbase.__all_table where table_name='tf_compact_mv_1')", val));
if (val == 1) {
break;
} else {
LOGI("wait transfer finish:%ld",val);
::sleep(1);
}
}
}
//TEST_F(ObCollectMV, create_mview_and_major_merge_concurrent)
//{
// common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
// int64_t affected_rows = 0;
// LOGI("create tg");
// EQ(0, sql_proxy.write("create tablegroup tg1000 sharding='PARTITION'", affected_rows));
// LOGI("create table");
// EQ(0, sql_proxy.write("create table t1000(c1 int,c2 int, primary key(c1,c2)) tablegroup='tg1000' partition by hash(c1) partitions 2", affected_rows));
// EQ(0, sql_proxy.write("create table t2000(c2 int, c3 int, primary key(c2)) duplicate_scope = 'cluster' duplicate_read_consistency='weak'", affected_rows));
//
// LOGI("insert data");
// EQ(0, sql_proxy.write("insert into t1000 values(1, 1)", affected_rows));;
// EQ(0, sql_proxy.write("insert into t2000 values(1, 1000)", affected_rows));;
// int64_t start_time = ObTimeUtil::current_time();
// while (ObTimeUtil::current_time() - start_time < 10 * 1000 * 1000) {
// ObSqlString sql;
// int64_t val = 0;
// EQ(0, SSH::select_int64(sql_proxy, "select max(c1) val from t1000", val));
// EQ(0, sql.assign_fmt("insert into t1000 select c1+%ld,c2+%ld from t1000", val, val));
// EQ(0, sql_proxy.write(sql.ptr(), affected_rows));;
// EQ(0, sql.assign_fmt("insert into t2000 select c2+%ld,c3 from t2000", val));
// EQ(0, sql_proxy.write(sql.ptr(), affected_rows));;
// }
// int64_t row_count = 0;
// EQ(0, SSH::select_int64(sql_proxy, "select count(*) val from t1000", row_count));
//
// uint64_t new_freeze_scn = 0;
// std::thread th([&]() {
// ::sleep(1);
// LOGI("major freeze")
// sql_proxy.write("alter system major freeze", affected_rows);
// EQ(0, SSH::g_select_uint64(R.tenant_id_, "select max(frozen_scn) val from oceanbase.__all_freeze_info", new_freeze_scn));
// });
//
// LOGI("create mview start");
// EQ(0, sql_proxy.write(
// "create materialized view compact_mv_1000 (primary key(t1000_c1,t1000_c2)) tablegroup='tg1000' "
// "partition by hash(t1000_c1) partitions 2 "
// "REFRESH FAST ON DEMAND ENABLE QUERY REWRITE ENABLE ON QUERY COMPUTATION "
// "as select /*+read_consistency(weak) use_nl(t1000 t2000) leading(t1000 t2000) "
// "use_das(t2000) no_use_nl_materialization(t2000)*/ t1000.c1 t1000_c1,t1000.c2 t1000_c2,t2000.c2 t2000_c2,t2000.c3 t2000_c3 "
// "from t1000 join t2000 on t1000.c2=t2000.c2",
// affected_rows));
// LOGI("create mview finish");
// int64_t mv_row_count = 0;
// EQ(0, SSH::select_int64(sql_proxy, "select /*+no_mv_rewrite*/count(*) val from compact_mv_1000", mv_row_count));
// EQ(row_count, mv_row_count);
// LOGI("row_count:%ld %ld", row_count, mv_row_count);
// th.join();
// LOGI("wait major merge %ld", new_freeze_scn);
// EQ(0, wait_major_mv_refresh_finish(new_freeze_scn));
// LOGI("wait major merge finish %ld", new_freeze_scn);
//
//
// LOGI("mview fast_refresh");
// int64_t max_val = 0;
// EQ(0, SSH::select_int64(sql_proxy, "select max(c1) val from t1000", max_val));
// ObSqlString sql;
// sql.assign_fmt("insert into t1000 values(%ld, %ld)", max_val+1, max_val+1);
// EQ(0, sql_proxy.write(sql.ptr(), affected_rows));
// sql.assign_fmt("insert into t2000 values(%ld, 1000)", max_val+1);
// EQ(0, sql_proxy.write(sql.ptr(), affected_rows));
// EQ(0, sql_proxy.write("update t2000 set c3=2000 where c2<1000", affected_rows));
// EQ(0, sql_proxy.write("delete from t1000 where c1<1000", affected_rows));
//
//
// EQ(0, sql_proxy.write("alter system major freeze", affected_rows));
// EQ(0, SSH::g_select_uint64(R.tenant_id_, "select max(frozen_scn) val from oceanbase.__all_freeze_info", new_freeze_scn));
// EQ(0, wait_major_mv_refresh_finish(new_freeze_scn));
//
// EQ(0, SSH::select_int64(sql_proxy, "select count(*) val from t1000", row_count));
// EQ(0, SSH::select_int64(sql_proxy, "select /*+no_mv_rewrite*/count(*) val from compact_mv_1000", mv_row_count));
// EQ(row_count, mv_row_count);
// LOGI("row_count:%ld %ld", row_count, mv_row_count);
//}
TEST_F(ObCollectMV, end)
{
int64_t wait_us = R.time_sec_ * 1000 * 1000;
while (ObTimeUtil::current_time() - R.start_time_ < wait_us) {
ob_usleep(1000 * 1000);
}
R.stop_ = true;
R.th_.join();
R.worker_.join();
}
} // end unittest
} // end oceanbase
int main(int argc, char **argv)
{
int c = 0;
int64_t time_sec = 0;
char *log_level = (char*)"INFO";
while(EOF != (c = getopt(argc,argv,"t:l:"))) {
switch(c) {
case 't':
time_sec = atoi(optarg);
break;
case 'l':
log_level = optarg;
oceanbase::unittest::ObSimpleClusterTestBase::enable_env_warn_log_ = false;
break;
default:
break;
}
}
oceanbase::unittest::init_log_and_gtest(argc, argv);
OB_LOGGER.set_log_level(log_level);
LOG_INFO("main>>>");
oceanbase::unittest::R.time_sec_ = time_sec;
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}