forbid medium/major merge for middle status row store tablet in cs replica && fix mysqltest
This commit is contained in:
parent
bc8f935437
commit
c46c411ad9
@ -237,6 +237,40 @@ int ObCOTabletMergeCtx::build_ctx(bool &finish_flag)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCOTabletMergeCtx::check_merge_ctx_valid()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObMergeType &merge_type = get_merge_type();
|
||||
const ObITable *base_table = nullptr;
|
||||
const ObTablet *tablet = nullptr;
|
||||
if (OB_UNLIKELY(!tablet_handle_.is_valid()) || OB_ISNULL(tablet = tablet_handle_.get_obj())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid tablet", K(ret), K_(tablet_handle));
|
||||
} else if (tablet->is_row_store() && OB_UNLIKELY(!is_convert_co_major_merge(merge_type))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("only row store tablet need to do convert co major merge", K(ret), K(merge_type), KPC(tablet));
|
||||
} else if (OB_ISNULL(base_table = static_param_.tables_handle_.get_table(0))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("base table is null", K(ret), K_(static_param));
|
||||
} else if (OB_UNLIKELY(!base_table->is_major_sstable())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid base table type", K(ret), KPC(base_table));
|
||||
} else if (ObCOMajorMergePolicy::is_use_rs_build_schema_match_merge(static_param_.co_major_merge_type_)) {
|
||||
if (base_table->is_co_sstable()) {
|
||||
const ObSSTable *sstable = nullptr;
|
||||
const ObCOSSTableV2 *co_sstable = nullptr;
|
||||
if (OB_ISNULL(sstable = static_cast<const ObSSTable *>(base_table)) || OB_ISNULL(co_sstable = static_cast<const ObCOSSTableV2 *>(sstable))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid sstable type", K(ret), KPC(sstable), KPC(co_sstable));
|
||||
} else if (OB_UNLIKELY(!co_sstable->is_row_store_only_co_table())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("only row store only co sstable can do use_rs_build_schema_match_merge", K(ret), KPC(co_sstable));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCOTabletMergeCtx::cal_merge_param()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -61,6 +61,7 @@ struct ObCOTabletMergeCtx : public ObBasicTabletMergeCtx
|
||||
virtual int prepare_index_tree() override { return OB_SUCCESS; }
|
||||
virtual int collect_running_info() override;
|
||||
virtual int build_ctx(bool &finish_flag) override;
|
||||
virtual int check_merge_ctx_valid() override;
|
||||
OB_INLINE bool all_cg_finish() const // locked by ObCODagNet ctx_lock_
|
||||
{
|
||||
return exe_stat_.finish_cg_count_ == array_count_;
|
||||
|
@ -1599,6 +1599,8 @@ int ObCOMergeDagNet::prepare_co_merge_ctx()
|
||||
} else if (FALSE_IT(co_merge_ctx_->time_guard_click(ObStorageCompactionTimeGuard::DAG_WAIT_TO_SCHEDULE))) {
|
||||
} else if (OB_FAIL(co_merge_ctx_->build_ctx(useless_finish_flag))) {
|
||||
LOG_WARN("failed to build ctx", KR(ret), KP_(co_merge_ctx));
|
||||
} else if (OB_FAIL(co_merge_ctx_->check_merge_ctx_valid())) {
|
||||
LOG_WARN("invalid merge ctx", KR(ret), KPC_(co_merge_ctx));
|
||||
} else {
|
||||
update_merge_status(COMergeStatus::CTX_PREPARED);
|
||||
}
|
||||
|
@ -373,21 +373,21 @@ int ObCOMergeDagNet::create_dag(
|
||||
// create dag and connections
|
||||
if (OB_UNLIKELY(start_cg_idx > end_cg_idx)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid start/end cg idx", K(ret), K(start_cg_idx), K(end_cg_idx));
|
||||
STORAGE_LOG(WARN, "invalid start/end cg idx", K(ret), K(start_cg_idx), K(end_cg_idx));
|
||||
} else {
|
||||
// start/end cg idx are meaningless for DagNet
|
||||
basic_param_.start_cg_idx_ = start_cg_idx;
|
||||
basic_param_.end_cg_idx_ = end_cg_idx;
|
||||
if (OB_FAIL(MTL(share::ObTenantDagScheduler*)->alloc_dag(dag))) {
|
||||
LOG_WARN("fail to alloc dag", K(ret));
|
||||
STORAGE_LOG(WARN, "fail to alloc dag", K(ret));
|
||||
} else if (OB_FAIL(dag->init_by_param(&basic_param_))) {
|
||||
LOG_WARN("Fail to init prepare dag", K(ret));
|
||||
STORAGE_LOG(WARN, "Fail to init prepare dag", K(ret));
|
||||
} else if (nullptr != parent && OB_FAIL(parent->add_child(*dag))) {
|
||||
LOG_WARN("failed to add child", K(ret), KPC(parent), KPC(dag));
|
||||
STORAGE_LOG(WARN, "failed to add child", K(ret), KPC(parent), KPC(dag));
|
||||
} else if (nullptr == parent && OB_FAIL(add_dag_into_dag_net(*dag))) {
|
||||
LOG_WARN("fail to add dag into dag_net", K(ret));
|
||||
STORAGE_LOG(WARN, "fail to add dag into dag_net", K(ret));
|
||||
} else if (OB_FAIL(dag->create_first_task())) {
|
||||
LOG_WARN("failed to create first task", K(ret), KPC(dag));
|
||||
STORAGE_LOG(WARN, "failed to create first task", K(ret), KPC(dag));
|
||||
} else if (share::ObDagType::DAG_TYPE_CO_MERGE_BATCH_EXECUTE == dag->get_type()) {
|
||||
#ifdef ERRSIM
|
||||
dag->set_max_retry_times(30);
|
||||
@ -396,12 +396,12 @@ int ObCOMergeDagNet::create_dag(
|
||||
#endif
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
LOG_INFO("success to create dag", K(ret), K_(basic_param), KP(dag),
|
||||
STORAGE_LOG(INFO, "success to create dag", K(ret), K_(basic_param), KP(dag),
|
||||
"dag_type", ObIDag::get_dag_type_str(dag->get_type()), K(add_scheduler_flag), KP(parent));
|
||||
}
|
||||
if (OB_FAIL(ret) || !add_scheduler_flag) {
|
||||
} else if (OB_FAIL(MTL(share::ObTenantDagScheduler*)->add_dag(dag))) {
|
||||
LOG_WARN("Fail to add dag into dag_scheduler", K(ret));
|
||||
STORAGE_LOG(WARN, "Fail to add dag into dag_scheduler", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret) && nullptr != dag) {
|
||||
|
@ -412,6 +412,30 @@ int ObBasicTabletMergeCtx::build_ctx(bool &finish_flag)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBasicTabletMergeCtx::check_merge_ctx_valid()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObMergeType &merge_type = get_merge_type();
|
||||
const ObITable *base_table = nullptr;
|
||||
const ObTablet *tablet = nullptr;
|
||||
if (is_major_merge_type(merge_type) || is_meta_major_merge(merge_type)) {
|
||||
if (OB_UNLIKELY(!tablet_handle_.is_valid()) || OB_ISNULL(tablet = tablet_handle_.get_obj())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid tablet", K(ret), K_(tablet_handle));
|
||||
} else if (!tablet->is_row_store()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("column store table should do co merge", K(ret), KPC(tablet));
|
||||
} else if (OB_ISNULL(base_table = static_param_.tables_handle_.get_table(0))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("base table is null", K(ret), K_(static_param));
|
||||
} else if (OB_UNLIKELY(!base_table->is_major_sstable() || base_table->is_co_sstable())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid base table type", K(ret), KPC(base_table));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBasicTabletMergeCtx::build_ctx_after_init()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -133,6 +133,7 @@ public:
|
||||
void destroy();
|
||||
/* PREPARE SECTION */
|
||||
virtual int build_ctx(bool &finish_flag);
|
||||
virtual int check_merge_ctx_valid();
|
||||
int build_ctx_after_init();
|
||||
int prepare_merge_progress(
|
||||
ObPartitionMergeProgress *&progress,
|
||||
|
@ -803,6 +803,8 @@ int ObTabletMergeDag::prepare_merge_ctx(bool &finish_flag)
|
||||
LOG_WARN("failed to alloc ctx", KR(ret), K_(param));
|
||||
} else if (OB_FAIL(ctx_->build_ctx(finish_flag))) {
|
||||
LOG_WARN("failed to build ctx", KR(ret), K_(param), KP_(ctx));
|
||||
} else if (OB_FAIL(ctx_->check_merge_ctx_valid())) {
|
||||
LOG_WARN("invalid merge ctx", KR(ret), K_(param), KPC_(ctx));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -1033,6 +1033,44 @@ bool ObTenantTabletScheduler::check_tx_table_ready(ObLS &ls, const SCN &check_sc
|
||||
return tx_table_ready;
|
||||
}
|
||||
|
||||
int ObTenantTabletScheduler::check_ready_for_major_merge(
|
||||
const ObLSID &ls_id,
|
||||
const storage::ObTablet &tablet,
|
||||
const ObMergeType merge_type)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (tablet.is_row_store() && (is_medium_merge(merge_type) || is_major_merge(merge_type))) {
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls = nullptr;
|
||||
bool need_wait_major_convert = false;
|
||||
if (OB_FAIL(MTL(ObLSService*)->get_ls(ls_id, ls_handle, ObLSGetMod::HA_MOD))) {
|
||||
LOG_WARN("failed to get ls", K(ret), K(ls_id));
|
||||
} else if (OB_UNLIKELY(!ls_handle.is_valid()) || OB_ISNULL(ls = ls_handle.get_ls())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls is invalid or nullptr", K(ret), K(ls_id), K(ls_handle), KPC(ls));
|
||||
} else if (!ls->is_cs_replica()) {
|
||||
} else if (OB_FAIL(ObCSReplicaUtil::check_need_wait_major_convert(*ls, tablet.get_tablet_meta().tablet_id_, tablet, need_wait_major_convert))) {
|
||||
LOG_WARN("fail to check need wait major convert in cs replica", K(ret), KPC(ls), K(tablet));
|
||||
} else if (need_wait_major_convert) {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_WARN("need wait major convert in cs replica", K(ret), KPC(ls), K(tablet));
|
||||
// if ls migration finished, schedule convert co merge
|
||||
ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_MAX;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(ls->get_ls_meta().get_migration_status(migration_status))) {
|
||||
LOG_WARN("failed to get migration status", K(tmp_ret), KPC(ls));
|
||||
} else if (ObMigrationStatus::OB_MIGRATION_STATUS_NONE == migration_status) {
|
||||
ObDagId co_dag_net_id;
|
||||
co_dag_net_id.init(GCTX.self_addr());
|
||||
if (OB_TMP_FAIL(schedule_convert_co_merge_dag_net(ls_id, tablet, 0 /*retry_times*/, co_dag_net_id))) {
|
||||
LOG_WARN("failed to schedule convert co merge for cs replica", K(tmp_ret), K(ls_id), K(tablet));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTabletScheduler::schedule_merge_dag(
|
||||
const ObLSID &ls_id,
|
||||
const storage::ObTablet &tablet,
|
||||
@ -1041,7 +1079,9 @@ int ObTenantTabletScheduler::schedule_merge_dag(
|
||||
const ObDagId *dag_net_id /*= nullptr*/)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_major_merge_type(merge_type) && (!tablet.is_row_store() || is_convert_co_major_merge(merge_type))) {
|
||||
if (OB_FAIL(check_ready_for_major_merge(ls_id, tablet, merge_type))) {
|
||||
LOG_WARN("failed to check ready for major merge", K(ret), K(ls_id), K(tablet), K(merge_type));
|
||||
} else if (is_major_merge_type(merge_type) && (!tablet.is_row_store() || is_convert_co_major_merge(merge_type))) {
|
||||
ObCOMergeDagParam param;
|
||||
param.ls_id_ = ls_id;
|
||||
param.tablet_id_ = tablet.get_tablet_meta().tablet_id_;
|
||||
@ -1107,6 +1147,9 @@ int ObTenantTabletScheduler::schedule_tablet_meta_merge(
|
||||
if (OB_UNLIKELY(!ls_handle.is_valid() || !tablet_handle.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(ls_handle), K(tablet_handle));
|
||||
} else if (ls_handle.get_ls()->is_cs_replica()) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
LOG_WARN("meta merge is not supported in cs replica now", K(ret), K(ls_handle), K(tablet_handle));
|
||||
} else if (FALSE_IT(tablet = tablet_handle.get_obj())) {
|
||||
} else {
|
||||
const ObLSID &ls_id = ls_handle.get_ls()->get_ls_id();
|
||||
|
@ -253,6 +253,10 @@ public:
|
||||
static bool check_weak_read_ts_ready(
|
||||
const int64_t &merge_version,
|
||||
ObLS &ls);
|
||||
static int check_ready_for_major_merge(
|
||||
const ObLSID &ls_id,
|
||||
const storage::ObTablet &tablet,
|
||||
const ObMergeType merge_type);
|
||||
static int schedule_merge_dag(
|
||||
const share::ObLSID &ls_id,
|
||||
const storage::ObTablet &tablet,
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "observer/omt/ob_tenant_node_balancer.h"
|
||||
#include "share/scheduler/ob_dag_warning_history_mgr.h"
|
||||
#include "storage/meta_mem/ob_tenant_meta_mem_mgr.h"
|
||||
#include "storage/column_store/ob_co_merge_dag.h"
|
||||
|
||||
int64_t dag_cnt = 1;
|
||||
int64_t stress_time= 1; // 100ms
|
||||
@ -1705,6 +1706,59 @@ TEST_F(TestDagScheduler, test_destroy_when_running) //TODO(renju.rj): fix it
|
||||
// #endif
|
||||
}
|
||||
|
||||
TEST_F(TestDagScheduler, test_add_multi_co_merge_dag_net)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
|
||||
ASSERT_TRUE(nullptr != scheduler);
|
||||
|
||||
ObLSID ls_id(1001);
|
||||
ObTabletID tablet_id(200001);
|
||||
{
|
||||
compaction::ObCOMergeDagParam param;
|
||||
param.ls_id_ = ls_id;
|
||||
param.tablet_id_ = tablet_id;
|
||||
param.merge_type_ = compaction::ObMergeType::CONVERT_CO_MAJOR_MERGE;
|
||||
param.compat_mode_ = lib::Worker::CompatMode::MYSQL;
|
||||
param.transfer_seq_ = 0;
|
||||
ret = scheduler->create_and_add_dag_net<compaction::ObCOMergeDagNet>(¶m);
|
||||
EXPECT_EQ(OB_SUCCESS, ret);
|
||||
COMMON_LOG(INFO, "Success to create and add co convert dag net", K(ret));
|
||||
}
|
||||
{
|
||||
compaction::ObCOMergeDagParam param;
|
||||
param.ls_id_ = ls_id;
|
||||
param.tablet_id_ = tablet_id;
|
||||
param.merge_type_ = compaction::ObMergeType::CONVERT_CO_MAJOR_MERGE;
|
||||
param.compat_mode_ = lib::Worker::CompatMode::MYSQL;
|
||||
param.transfer_seq_ = 0;
|
||||
ret = scheduler->create_and_add_dag_net<compaction::ObCOMergeDagNet>(¶m);
|
||||
EXPECT_EQ(OB_TASK_EXIST, ret);
|
||||
COMMON_LOG(INFO, "Success to create and add co convert dag net", K(ret));
|
||||
}
|
||||
{
|
||||
compaction::ObCOMergeDagParam param;
|
||||
param.ls_id_ = ls_id;
|
||||
param.tablet_id_ = tablet_id;
|
||||
param.merge_type_ = compaction::ObMergeType::MAJOR_MERGE;
|
||||
param.compat_mode_ = lib::Worker::CompatMode::MYSQL;
|
||||
param.transfer_seq_ = 0;
|
||||
ret = scheduler->create_and_add_dag_net<compaction::ObCOMergeDagNet>(¶m);
|
||||
EXPECT_EQ(OB_SUCCESS, ret);
|
||||
COMMON_LOG(INFO, "Success to create and add co major dag net", K(ret));
|
||||
} {
|
||||
compaction::ObCOMergeDagParam param;
|
||||
param.ls_id_ = ls_id;
|
||||
param.tablet_id_ = tablet_id;
|
||||
param.merge_type_ = compaction::ObMergeType::MEDIUM_MERGE;
|
||||
param.compat_mode_ = lib::Worker::CompatMode::MYSQL;
|
||||
param.transfer_seq_ = 0;
|
||||
ret = scheduler->create_and_add_dag_net<compaction::ObCOMergeDagNet>(¶m);
|
||||
EXPECT_EQ(OB_SUCCESS, ret);
|
||||
COMMON_LOG(INFO, "Success to create and add co medium dag net", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user