From 494e1e06f64909297e2882716dec061fe1f0428d Mon Sep 17 00:00:00 2001 From: hiddenbomb Date: Thu, 20 Jul 2023 07:18:18 +0000 Subject: [PATCH] refactor mds table merge dag --- src/share/ob_occam_time_guard.h | 3 +- src/share/scheduler/ob_dag_scheduler.cpp | 11 +- src/storage/compaction/ob_compaction_util.cpp | 21 ++-- src/storage/compaction/ob_compaction_util.h | 28 +++-- .../compaction/ob_tablet_merge_ctx.cpp | 1 + src/storage/compaction/ob_tablet_merge_ctx.h | 1 + .../compaction/ob_tablet_merge_task.cpp | 37 +++--- src/storage/compaction/ob_tablet_merge_task.h | 20 +++- src/storage/ls/ob_ls.cpp | 75 +++++++++---- src/storage/ls/ob_ls.h | 8 +- .../multi_data_source/mds_table_base.cpp | 4 +- .../ob_mds_table_merge_dag.cpp | 106 +++++------------- .../ob_mds_table_merge_dag.h | 24 ++-- .../ob_mds_table_merge_dag_param.cpp | 5 +- .../ob_mds_table_merge_dag_param.h | 30 +---- .../ob_mds_table_merge_task.cpp | 94 ++++++++++------ .../ob_mds_table_merge_task.h | 7 +- .../runtime_utility/common_define.h | 1 + src/storage/tablet/ob_tablet.cpp | 12 +- 19 files changed, 249 insertions(+), 239 deletions(-) diff --git a/src/share/ob_occam_time_guard.h b/src/share/ob_occam_time_guard.h index 4f5ab76410..39d2f28104 100644 --- a/src/share/ob_occam_time_guard.h +++ b/src/share/ob_occam_time_guard.h @@ -598,7 +598,8 @@ struct TimeGuardFactory __LINE__,\ "["#mod"] ") #define CLICK() ({ static_assert(__LINE__ >= 0 && __LINE__ <= UINT16_MAX, "line num greater than 65535"); (__time_guard__.click(__LINE__));}) -#define CLICK_FAIL(stmt) OB_UNLIKELY(CLICK() && OB_FAIL(stmt)) +#define CLICK_FAIL(stmt) (CLICK(), OB_FAIL(stmt)) +#define CLICK_TMP_FAIL(stmt) (CLICK(), OB_TMP_FAIL(stmt)) }// namespace occam }// namespace common diff --git a/src/share/scheduler/ob_dag_scheduler.cpp b/src/share/scheduler/ob_dag_scheduler.cpp index 1b9d6fd3e4..3c311e3262 100755 --- a/src/share/scheduler/ob_dag_scheduler.cpp +++ b/src/share/scheduler/ob_dag_scheduler.cpp @@ -2191,14 +2191,9 @@ int ObTenantDagScheduler::check_ls_compaction_dag_exist_with_cancel( ObIDag *head = dag_list_[READY_DAG_LIST].get_head(ObIDag::MergeDagPrio[i]); ObIDag *cur = head->get_next(); while (head != cur) { - if (ObDagType::DAG_TYPE_MDS_TABLE_MERGE == cur->get_type()) { - // TODO (bowen.gbw) : make ObMdsTableMergeDag inherit from ObTabletMergeDag - const mds::ObMdsTableMergeDag *mds_dag = static_cast(cur); - cancel_flag = (ls_id == mds_dag->get_param().ls_id_); - } else { - dag = static_cast(cur); - cancel_flag = (ls_id == dag->get_ls_id()); - } + dag = static_cast(cur); + cancel_flag = (ls_id == dag->get_ls_id()); + if (cancel_flag) { if (cur->get_dag_status() == ObIDag::DAG_STATUS_READY) { cancel_dag = cur; diff --git a/src/storage/compaction/ob_compaction_util.cpp b/src/storage/compaction/ob_compaction_util.cpp index 5069ad8dce..aeb16c5ceb 100644 --- a/src/storage/compaction/ob_compaction_util.cpp +++ b/src/storage/compaction/ob_compaction_util.cpp @@ -1,11 +1,15 @@ -// OceanBase is licensed under Mulan PubL v2. -// You can use this software according to the terms and conditions of the Mulan PubL v2. -// You may obtain a copy of Mulan PubL v2 at: -// http://license.coscl.org.cn/MulanPubL-2.0 -// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -// See the Mulan PubL v2 for more details. +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + #include "storage/compaction/ob_compaction_util.h" #include "share/ob_define.h" namespace oceanbase @@ -22,6 +26,7 @@ const char * ObMergeTypeStr[] = { "MEDIUM_MERGE", "DDL_KV_MERGE", "BACKFILL_TX_MERGE", + "MDS_TABLE_MERGE", "EMPTY_MERGE_TYPE" }; diff --git a/src/storage/compaction/ob_compaction_util.h b/src/storage/compaction/ob_compaction_util.h index a0a59b8bd8..e24adc65aa 100644 --- a/src/storage/compaction/ob_compaction_util.h +++ b/src/storage/compaction/ob_compaction_util.h @@ -1,12 +1,14 @@ -//Copyright (c) 2022 OceanBase -// OceanBase is licensed under Mulan PubL v2. -// You can use this software according to the terms and conditions of the Mulan PubL v2. -// You may obtain a copy of Mulan PubL v2 at: -// http://license.coscl.org.cn/MulanPubL-2.0 -// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -// See the Mulan PubL v2 for more details. +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ #ifndef OB_STORAGE_COMPACTION_UTIL_H_ #define OB_STORAGE_COMPACTION_UTIL_H_ @@ -26,7 +28,8 @@ enum ObMergeType MEDIUM_MERGE = 5, DDL_KV_MERGE = 6, BACKFILL_TX_MERGE = 7, - MERGE_TYPE_MAX, + MDS_TABLE_MERGE = 8, + MERGE_TYPE_MAX }; const char *merge_type_to_str(const ObMergeType &merge_type); @@ -76,6 +79,11 @@ inline bool is_backfill_tx_merge(const ObMergeType &merge_type) return BACKFILL_TX_MERGE == merge_type; } +inline bool is_mds_table_merge(const ObMergeType &merge_type) +{ + return MDS_TABLE_MERGE == merge_type; +} + } // namespace storage } // namespace oceanbase diff --git a/src/storage/compaction/ob_tablet_merge_ctx.cpp b/src/storage/compaction/ob_tablet_merge_ctx.cpp index 17ba22a0a7..38910ee786 100755 --- a/src/storage/compaction/ob_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_tablet_merge_ctx.cpp @@ -381,6 +381,7 @@ const char *ObCompactionTimeGuard::ObTabletCompactionEventStr[] = { "UPDATE_TABLET", "RELEASE_MEMTABLE", "SCHEDULE_OTHER_COMPACTION", + "GET_TABLET", "DAG_FINISH" }; diff --git a/src/storage/compaction/ob_tablet_merge_ctx.h b/src/storage/compaction/ob_tablet_merge_ctx.h index 49fcc8768e..2bac30cfc8 100755 --- a/src/storage/compaction/ob_tablet_merge_ctx.h +++ b/src/storage/compaction/ob_tablet_merge_ctx.h @@ -118,6 +118,7 @@ public: RELEASE_MEMTABLE, SCHEDULE_OTHER_COMPACTION, DAG_FINISH, + GET_TABLET, COMPACTION_EVENT_MAX }; const static char *ObTabletCompactionEventStr[]; diff --git a/src/storage/compaction/ob_tablet_merge_task.cpp b/src/storage/compaction/ob_tablet_merge_task.cpp index 0b420e8a61..b4d45c74c4 100755 --- a/src/storage/compaction/ob_tablet_merge_task.cpp +++ b/src/storage/compaction/ob_tablet_merge_task.cpp @@ -203,6 +203,22 @@ bool ObTabletMergeDagParam::is_valid() const && (!is_major_merge_type(merge_type_) || merge_version_ >= 0); } +int64_t ObTabletMergeDagParam::to_string(char* buf, const int64_t buf_len) const +{ + int64_t pos = 0; + J_OBJ_START(); + J_KV("merge_type", merge_type_to_str(merge_type_), + K_(merge_version), + K_(ls_id), + K_(tablet_id), + KP(report_), + K_(for_diagnose), + K_(is_tenant_major_merge), + K_(need_swap_tablet_flag)); + J_OBJ_END(); + return pos; +} + ObBasicTabletMergeDag::ObBasicTabletMergeDag( const ObDagType::ObDagTypeEnum type) : ObIDag(type), @@ -374,8 +390,8 @@ int64_t ObMergeDagHash::inner_hash() const int64_t hash_value = 0; // make two merge type same hash_value = common::murmurhash(&merge_type_, sizeof(merge_type_), hash_value); - hash_value += ls_id_.hash(); - hash_value += tablet_id_.hash(); + hash_value = common::murmurhash(&ls_id_, sizeof(ls_id_), hash_value); + hash_value = common::murmurhash(&tablet_id_, sizeof(tablet_id_), hash_value); return hash_value; } @@ -399,7 +415,7 @@ int ObBasicTabletMergeDag::fill_info_param(compaction::ObIBasicInfoParam *&out_p int ret = OB_SUCCESS; if (!is_inited_) { ret = OB_NOT_INIT; - LOG_WARN("ls basic table merge dag do not init", K(ret)); + LOG_WARN("ls basic tablet merge dag do not init", K(ret)); } else { const char *merge_type = merge_type_to_str(merge_type_); if (OB_FAIL(ADD_DAG_WARN_INFO_PARAM(out_param, allocator, get_type(), @@ -431,21 +447,6 @@ ObTabletMergeDag::ObTabletMergeDag(const ObDagType::ObDagTypeEnum type) { } -template -int ObTabletMergeDag::create_first_task() -{ - int ret = OB_SUCCESS; - T *task = nullptr; - if (OB_FAIL(alloc_task(task))) { - STORAGE_LOG(WARN, "fail to alloc task", K(ret)); - } else if (OB_FAIL(task->init())) { - STORAGE_LOG(WARN, "failed to init task", K(ret)); - } else if (OB_FAIL(add_task(*task))) { - STORAGE_LOG(WARN, "fail to add task", K(ret), K_(ls_id), K_(tablet_id), K_(ctx)); - } - return ret; -} - int ObTabletMergeDag::gene_compaction_info(compaction::ObTabletCompactionProgress &input_progress) { int ret = OB_SUCCESS; diff --git a/src/storage/compaction/ob_tablet_merge_task.h b/src/storage/compaction/ob_tablet_merge_task.h index 12c5b8ec6c..f4c154b979 100644 --- a/src/storage/compaction/ob_tablet_merge_task.h +++ b/src/storage/compaction/ob_tablet_merge_task.h @@ -95,8 +95,7 @@ struct ObTabletMergeDagParam : public share::ObIDagInitParam return is_tenant_major_merge_ ? MAJOR_MERGE : merge_type_; } - TO_STRING_KV("merge_type",merge_type_to_str(merge_type_), K_(merge_version), K_(ls_id), K_(tablet_id), - KP(report_), K_(for_diagnose), K_(is_tenant_major_merge), K_(need_swap_tablet_flag)); + virtual int64_t to_string(char* buf, const int64_t buf_len) const; bool for_diagnose_; bool is_tenant_major_merge_; @@ -247,13 +246,28 @@ class ObTabletMergeDag : public ObBasicTabletMergeDag public: ObTabletMergeDag(const share::ObDagType::ObDagTypeEnum type); virtual ~ObTabletMergeDag() {} - template + template int create_first_task(); virtual int gene_compaction_info(compaction::ObTabletCompactionProgress &progress) override; virtual int diagnose_compaction_info(compaction::ObDiagnoseTabletCompProgress &progress) override; }; +template +int ObTabletMergeDag::create_first_task() +{ + int ret = common::OB_SUCCESS; + T *task = nullptr; + if (OB_FAIL(alloc_task(task))) { + STORAGE_LOG(WARN, "fail to alloc task", K(ret)); + } else if (OB_FAIL(task->init())) { + STORAGE_LOG(WARN, "failed to init task", K(ret)); + } else if (OB_FAIL(add_task(*task))) { + STORAGE_LOG(WARN, "fail to add task", K(ret), K_(ls_id), K_(tablet_id), K_(ctx)); + } + return ret; +} + class ObTabletMajorMergeDag: public ObTabletMergeDag { public: diff --git a/src/storage/ls/ob_ls.cpp b/src/storage/ls/ob_ls.cpp index a73d99bdd4..27b73a2b06 100755 --- a/src/storage/ls/ob_ls.cpp +++ b/src/storage/ls/ob_ls.cpp @@ -1333,27 +1333,26 @@ int ObLS::update_tablet_table_store( const int64_t read_lock = LSLOCKLOGMETA; const int64_t write_lock = 0; ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock); + const share::ObLSID &ls_id = ls_meta_.ls_id_; + const int64_t rebuild_seq = ls_meta_.get_rebuild_seq(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ls is not inited", K(ret)); } else if (OB_UNLIKELY(!tablet_id.is_valid() || !param.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("update tablet table store get invalid argument", K(ret), K(tablet_id), K(param)); - } else { - const int64_t rebuild_seq = ls_meta_.get_rebuild_seq(); - if (param.rebuild_seq_ != rebuild_seq) { - ret = OB_EAGAIN; - LOG_WARN("update tablet table store rebuild seq not same, need retry", - K(ret), K(tablet_id), K(rebuild_seq), K(param)); - } else if (OB_FAIL(ls_tablet_svr_.update_tablet_table_store(tablet_id, param, handle))) { - LOG_WARN("failed to update tablet table store", K(ret), K(tablet_id), K(param)); - } + LOG_WARN("update tablet table store get invalid argument", K(ret), K(ls_id), K(tablet_id), K(param)); + } else if (param.rebuild_seq_ != rebuild_seq) { + ret = OB_EAGAIN; + LOG_WARN("update tablet table store rebuild seq not same, need retry", + K(ret), K(ls_id), K(tablet_id), K(rebuild_seq), K(param)); + } else if (OB_FAIL(ls_tablet_svr_.update_tablet_table_store(tablet_id, param, handle))) { + LOG_WARN("failed to update tablet table store", K(ret), K(ls_id), K(tablet_id), K(param)); } return ret; } int ObLS::update_tablet_table_store( - const int64_t rebuild_seq, + const int64_t ls_rebuild_seq, const ObTabletHandle &old_tablet_handle, const ObIArray &tables) { @@ -1368,12 +1367,14 @@ int ObLS::update_tablet_table_store( ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(old_tablet_handle), K(tables)); } else { - const int64_t seq = ls_meta_.get_rebuild_seq(); - if (rebuild_seq != seq) { + const share::ObLSID &ls_id = ls_meta_.ls_id_; + const common::ObTabletID &tablet_id = old_tablet_handle.get_obj()->get_tablet_meta().tablet_id_; + const int64_t rebuild_seq = ls_meta_.get_rebuild_seq(); + if (OB_UNLIKELY(ls_rebuild_seq != rebuild_seq)) { ret = OB_EAGAIN; - LOG_WARN("rebuild seq has changed, retry", K(ret), K(seq), K(rebuild_seq)); + LOG_WARN("rebuild seq has changed, retry", K(ret), K(ls_id), K(tablet_id), K(rebuild_seq), K(ls_rebuild_seq)); } else if (OB_FAIL(ls_tablet_svr_.update_tablet_table_store(old_tablet_handle, tables))) { - LOG_WARN("fail to replace small sstables in the tablet", K(ret), K(old_tablet_handle), K(tables)); + LOG_WARN("fail to replace small sstables in the tablet", K(ret), K(ls_id), K(tablet_id), K(old_tablet_handle), K(tables)); } } return ret; @@ -1387,21 +1388,47 @@ int ObLS::build_ha_tablet_new_table_store( int64_t read_lock = LSLOCKLOGMETA; int64_t write_lock = 0; ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock); + const share::ObLSID &ls_id = ls_meta_.ls_id_; const int64_t rebuild_seq = ls_meta_.get_rebuild_seq(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ls is not inited", K(ret)); } else if (!tablet_id.is_valid() || !param.is_valid()) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("build ha tablet new table store get invalid argument", K(ret), K(tablet_id), K(param)); - } else { - if (param.rebuild_seq_ != rebuild_seq) { - ret = OB_EAGAIN; - LOG_WARN("build ha tablet new table store rebuild seq not same, need retry", - K(ret), K(tablet_id), K(rebuild_seq), K(param)); - } else if (OB_FAIL(ls_tablet_svr_.build_ha_tablet_new_table_store(tablet_id, param))) { - LOG_WARN("failed to update tablet table store", K(ret), K(tablet_id), K(param)); - } + LOG_WARN("build ha tablet new table store get invalid argument", K(ret), K(ls_id), K(tablet_id), K(param)); + } else if (param.rebuild_seq_ != rebuild_seq) { + ret = OB_EAGAIN; + LOG_WARN("build ha tablet new table store rebuild seq not same, need retry", + K(ret), K(ls_id), K(tablet_id), K(rebuild_seq), K(param)); + } else if (OB_FAIL(ls_tablet_svr_.build_ha_tablet_new_table_store(tablet_id, param))) { + LOG_WARN("failed to update tablet table store", K(ret), K(ls_id), K(tablet_id), K(param)); + } + return ret; +} + +int ObLS::build_new_tablet_from_mds_table( + const int64_t ls_rebuild_seq, + const common::ObTabletID &tablet_id, + const share::SCN &flush_scn) +{ + int ret = OB_SUCCESS; + const int64_t read_lock = LSLOCKLOGMETA; + const int64_t write_lock = 0; + ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock); + const share::ObLSID &ls_id = ls_meta_.ls_id_; + const int64_t rebuild_seq = ls_meta_.get_rebuild_seq(); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ls is not inited", K(ret)); + } else if (OB_UNLIKELY(!tablet_id.is_valid() || !flush_scn.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", K(ret), K(ls_id), K(tablet_id), K(flush_scn)); + } else if (OB_UNLIKELY(ls_rebuild_seq != rebuild_seq)) { + ret = OB_EAGAIN; + LOG_WARN("rebuild seq is not the same with current ls, need retry", + K(ret), K(ls_id), K(tablet_id), K(ls_rebuild_seq), K(rebuild_seq), K(flush_scn)); + } else if (OB_FAIL(ls_tablet_svr_.build_new_tablet_from_mds_table(tablet_id, flush_scn))) { + LOG_WARN("failed to build new tablet from mds table", K(ret), K(ls_id), K(tablet_id), K(flush_scn)); } return ret; } diff --git a/src/storage/ls/ob_ls.h b/src/storage/ls/ob_ls.h index 66a5dc4002..08024ea65d 100755 --- a/src/storage/ls/ob_ls.h +++ b/src/storage/ls/ob_ls.h @@ -316,7 +316,7 @@ public: // get tablet but don't check user_data while replaying clog, because user_data may not exist. int replay_get_tablet_no_check( const common::ObTabletID &tablet_id, - const SCN &scn, + const share::SCN &scn, ObTabletHandle &tablet_handle) const; int flush_if_need(const bool need_flush); @@ -788,12 +788,16 @@ public: const ObUpdateTableStoreParam ¶m, ObTabletHandle &handle); int update_tablet_table_store( - const int64_t rebuild_seq, + const int64_t ls_rebuild_seq, const ObTabletHandle &old_tablet_handle, const ObIArray &tables); int build_ha_tablet_new_table_store( const ObTabletID &tablet_id, const ObBatchUpdateTableStoreParam ¶m); + int build_new_tablet_from_mds_table( + const int64_t ls_rebuild_seq, + const common::ObTabletID &tablet_id, + const share::SCN &flush_scn); int try_update_uppder_trans_version(); int diagnose(DiagnoseInfo &info) const; diff --git a/src/storage/multi_data_source/mds_table_base.cpp b/src/storage/multi_data_source/mds_table_base.cpp index 111b41b7d3..d4e120f714 100644 --- a/src/storage/multi_data_source/mds_table_base.cpp +++ b/src/storage/multi_data_source/mds_table_base.cpp @@ -152,7 +152,9 @@ int MdsTableBase::merge(const share::SCN &flushing_scn) param.ls_id_ = ls_id_; param.tablet_id_ = tablet_id_; param.flush_scn_ = flushing_scn; - param.generate_ts_ = ObClockGenerator::getCurrentTime(); + param.generate_ts_ = ObClockGenerator::getClock(); + param.merge_type_ = ObMergeType::MDS_TABLE_MERGE; + param.merge_version_ = 0; if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_mds_table_merge_dag(param))) { if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) { MDS_LOG(WARN, "failed to schedule mds table merge dag", K(ret), K(param)); diff --git a/src/storage/multi_data_source/ob_mds_table_merge_dag.cpp b/src/storage/multi_data_source/ob_mds_table_merge_dag.cpp index 652f8fef8c..5e536a6fdd 100644 --- a/src/storage/multi_data_source/ob_mds_table_merge_dag.cpp +++ b/src/storage/multi_data_source/ob_mds_table_merge_dag.cpp @@ -11,19 +11,17 @@ */ #include "storage/multi_data_source/ob_mds_table_merge_dag.h" -#include "lib/hash_func/murmur_hash.h" +#include "lib/ob_errno.h" #include "lib/utility/ob_print_utils.h" #include "storage/multi_data_source/ob_mds_table_merge_task.h" -#include "storage/tablet/ob_tablet_common.h" -#include "storage/tablet/ob_tablet.h" -#include "storage/tx_storage/ob_ls_handle.h" -#include "storage/tx_storage/ob_ls_service.h" +#include "storage/multi_data_source/ob_mds_table_merge_dag_param.h" #include "share/scheduler/ob_dag_warning_history_mgr.h" #define USING_LOG_PREFIX MDS using namespace oceanbase::common; using namespace oceanbase::share; +using namespace oceanbase::compaction; namespace oceanbase { @@ -32,7 +30,7 @@ namespace storage namespace mds { ObMdsTableMergeDag::ObMdsTableMergeDag() - : ObIDag(ObDagType::DAG_TYPE_MDS_TABLE_MERGE), + : ObTabletMergeDag(ObDagType::DAG_TYPE_MDS_TABLE_MERGE), is_inited_(false) { } @@ -49,27 +47,17 @@ int ObMdsTableMergeDag::init_by_param(const share::ObIDagInitParam *param) LOG_WARN("invalid args", K(ret), KP(param)); } else { const ObMdsTableMergeDagParam *mds_param = static_cast(param); - const share::ObLSID &ls_id = mds_param->ls_id_; - const common::ObTabletID &tablet_id = mds_param->tablet_id_; - ObLSService *ls_service = MTL(ObLSService*); - ObLSHandle ls_handle; - ObLS *ls = nullptr; - ObTabletHandle tablet_handle; - ObTablet *tablet = nullptr; - - if (OB_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::MDS_TABLE_MOD))) { - LOG_WARN("failed to get ls", K(ret), K(ls_id)); - } else if (OB_ISNULL(ls = ls_handle.get_ls())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls is null", K(ret), K(ls_id), K(ls_handle)); - } else if (OB_FAIL(ls->get_tablet(tablet_id, tablet_handle, 0, ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { - LOG_WARN("failed to get tablet", K(ret), K(tablet_id)); - } else if (OB_ISNULL(tablet = tablet_handle.get_obj())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("tablet is null", K(ret), K(tablet_id), K(tablet_handle)); + if (OB_UNLIKELY(!is_mds_table_merge(mds_param->merge_type_))) { + ret = OB_ERR_SYS; + LOG_WARN("param type is not mds table merge type", K(ret), KPC(mds_param)); + } else if (OB_UNLIKELY(!mds_param->flush_scn_.is_valid())) { + ret = OB_ERR_SYS; + LOG_WARN("flush scn is invalid", K(ret), KPC(mds_param)); + } else if (OB_FAIL(ObBasicTabletMergeDag::inner_init(*mds_param))) { + LOG_WARN("failed to init ObTabletMergeDag", K(ret), KPC(mds_param)); } else { - compat_mode_ = tablet->get_tablet_meta().compat_mode_; - param_ = *mds_param; + flush_scn_ = mds_param->flush_scn_; + generate_ts_ = mds_param->generate_ts_; is_inited_ = true; } } @@ -77,61 +65,24 @@ int ObMdsTableMergeDag::init_by_param(const share::ObIDagInitParam *param) return ret; } -bool ObMdsTableMergeDag::operator==(const share::ObIDag &other) const -{ - bool is_same = true; - - if (this == &other) { - is_same = true; - } else if (get_type() != other.get_type()) { - is_same = false; - } else { - const ObMdsTableMergeDag &other_dag = static_cast(other); - is_same = (param_.ls_id_ == other_dag.param_.ls_id_ && param_.tablet_id_ == other_dag.param_.tablet_id_); - } - - return is_same; -} - -int64_t ObMdsTableMergeDag::hash() const -{ - uint64_t hash_val = 0; - hash_val = common::murmurhash(¶m_.ls_id_, sizeof(share::ObLSID), hash_val); - hash_val = common::murmurhash(¶m_.tablet_id_, sizeof(common::ObTabletID), hash_val); - return hash_val; -} - int ObMdsTableMergeDag::create_first_task() { - int ret = OB_SUCCESS; - ObMdsTableMergeTask *task = nullptr; - - if (OB_UNLIKELY(!is_inited_)) { - ret = OB_NOT_INIT; - LOG_WARN("not init", K(ret), K_(is_inited)); - } else if (OB_FAIL(alloc_task(task))) { - LOG_WARN("failed to alloc task", K(ret)); - } else if (OB_FAIL(task->init(param_))) { - LOG_WARN("failed to ini task", K(ret), K_(param)); - } else if (OB_FAIL(add_task(*task))) { - LOG_WARN("failed to add task", K(ret)); - } - - return ret; + return ObTabletMergeDag::create_first_task(); } int ObMdsTableMergeDag::fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const { int ret = OB_SUCCESS; - if (IS_NOT_INIT) { ret = OB_NOT_INIT; - LOG_WARN("mds table merge dag do not init", K(ret)); - } else if (OB_FAIL(ADD_DAG_WARN_INFO_PARAM(out_param, allocator, get_type(), - param_.ls_id_.id(), - static_cast(param_.tablet_id_.id()), - static_cast(param_.flush_scn_.get_val_for_inner_table_field())))){ - LOG_WARN("failed to fill info param", K(ret)); + LOG_WARN("ls basic tablet merge dag do not init", K(ret)); + } else { + if (OB_FAIL(ADD_DAG_WARN_INFO_PARAM(out_param, allocator, ObIDag::get_type(), + ls_id_.id(), + static_cast(tablet_id_.id()), + static_cast(flush_scn_.get_val_for_inner_table_field())))) { + LOG_WARN("failed to fill info param", K(ret)); + } } return ret; } @@ -141,19 +92,12 @@ int ObMdsTableMergeDag::fill_dag_key(char *buf, const int64_t buf_len) const int ret = OB_SUCCESS; if (OB_FAIL(databuff_printf(buf, buf_len, "mds table merge task, ls_id=%ld, tablet_id=%ld, flush_scn=%ld", - param_.ls_id_.id(), param_.tablet_id_.id(), param_.flush_scn_.get_val_for_inner_table_field()))) { - LOG_WARN("failed to fill dag key", K(ret), K_(param)); + ls_id_.id(), tablet_id_.id(), flush_scn_.get_val_for_inner_table_field()))) { + LOG_WARN("failed to fill dag key", K(ret), K_(ls_id), K_(tablet_id), K_(flush_scn)); } return ret; } - -bool ObMdsTableMergeDag::ignore_warning() -{ - return OB_LS_NOT_EXIST == dag_ret_ - || OB_TABLET_NOT_EXIST == dag_ret_ - || OB_CANCELED == dag_ret_; -} } // namespace mds } // namespace storage } // namespace oceanbase diff --git a/src/storage/multi_data_source/ob_mds_table_merge_dag.h b/src/storage/multi_data_source/ob_mds_table_merge_dag.h index d5b8da6e44..9dbbb18a6d 100644 --- a/src/storage/multi_data_source/ob_mds_table_merge_dag.h +++ b/src/storage/multi_data_source/ob_mds_table_merge_dag.h @@ -14,9 +14,8 @@ #define OCEANBASE_STORAGE_OB_MDS_TABLE_MERGE_DAG #include -#include "lib/worker.h" -#include "share/scheduler/ob_dag_scheduler.h" -#include "storage/multi_data_source/ob_mds_table_merge_dag_param.h" +#include "share/scn.h" +#include "storage/compaction/ob_tablet_merge_task.h" namespace oceanbase { @@ -24,30 +23,29 @@ namespace storage { namespace mds { -class ObMdsTableMergeDag : public share::ObIDag +class ObMdsTableMergeDag : public compaction::ObTabletMergeDag { public: ObMdsTableMergeDag(); virtual ~ObMdsTableMergeDag() = default; ObMdsTableMergeDag(const ObMdsTableMergeDag&) = delete; ObMdsTableMergeDag &operator=(const ObMdsTableMergeDag&) = delete; - const ObMdsTableMergeDagParam& get_param() const { return param_; } public: virtual int init_by_param(const share::ObIDagInitParam *param) override; - virtual bool operator==(const share::ObIDag &other) const override; - virtual int64_t hash() const override; virtual int create_first_task() override; virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const override; virtual int fill_dag_key(char *buf, const int64_t buf_len) const override; - virtual bool ignore_warning() override; - virtual lib::Worker::CompatMode get_compat_mode() const override { return compat_mode_; } - virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; } - INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(is_inited), K_(compat_mode), K_(param)); + share::SCN get_flush_scn() const { return flush_scn_; } + + INHERIT_TO_STRING_KV("ObTabletMergeDag", ObTabletMergeDag, + K_(is_inited), + K_(flush_scn), + K_(generate_ts)); private: bool is_inited_; - lib::Worker::CompatMode compat_mode_; - ObMdsTableMergeDagParam param_; + share::SCN flush_scn_; + int64_t generate_ts_; }; } // namespace mds } // namespace storage diff --git a/src/storage/multi_data_source/ob_mds_table_merge_dag_param.cpp b/src/storage/multi_data_source/ob_mds_table_merge_dag_param.cpp index db2a11ba70..970de8009d 100644 --- a/src/storage/multi_data_source/ob_mds_table_merge_dag_param.cpp +++ b/src/storage/multi_data_source/ob_mds_table_merge_dag_param.cpp @@ -12,8 +12,6 @@ #include "storage/multi_data_source/ob_mds_table_merge_dag_param.h" -using namespace oceanbase::common; - namespace oceanbase { namespace storage @@ -21,8 +19,7 @@ namespace storage namespace mds { ObMdsTableMergeDagParam::ObMdsTableMergeDagParam() - : ls_id_(), - tablet_id_(), + : ObTabletMergeDagParam(), flush_scn_(share::SCN::invalid_scn()), generate_ts_(0) { diff --git a/src/storage/multi_data_source/ob_mds_table_merge_dag_param.h b/src/storage/multi_data_source/ob_mds_table_merge_dag_param.h index a6ada3149f..aa0128b5de 100644 --- a/src/storage/multi_data_source/ob_mds_table_merge_dag_param.h +++ b/src/storage/multi_data_source/ob_mds_table_merge_dag_param.h @@ -13,10 +13,9 @@ #ifndef OCEANBASE_STORAGE_OB_MDS_TABLE_MERGE_PARAM #define OCEANBASE_STORAGE_OB_MDS_TABLE_MERGE_PARAM -#include "share/scheduler/ob_dag_scheduler.h" +#include +#include "storage/compaction/ob_tablet_merge_task.h" #include "share/scn.h" -#include "share/ob_ls_id.h" -#include "common/ob_tablet_id.h" namespace oceanbase { @@ -24,37 +23,18 @@ namespace storage { namespace mds { -class ObMdsTableMergeDagParam : public share::ObIDagInitParam +class ObMdsTableMergeDagParam : public compaction::ObTabletMergeDagParam { public: ObMdsTableMergeDagParam(); virtual ~ObMdsTableMergeDagParam() = default; public: - virtual bool is_valid() const override; - bool operator==(const ObMdsTableMergeDagParam &other) const; - - TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(flush_scn), KTIME_(generate_ts)); + INHERIT_TO_STRING_KV("ObTabletMergeDagParam", compaction::ObTabletMergeDagParam, + K_(flush_scn), KTIME_(generate_ts)); public: - share::ObLSID ls_id_; - common::ObTabletID tablet_id_; share::SCN flush_scn_; int64_t generate_ts_; }; - -inline bool ObMdsTableMergeDagParam::is_valid() const -{ - return ls_id_.is_valid() - && tablet_id_.is_valid() - && !tablet_id_.is_ls_inner_tablet() - && flush_scn_.is_valid(); -} - -inline bool ObMdsTableMergeDagParam::operator==(const ObMdsTableMergeDagParam &other) const -{ - return ls_id_ == other.ls_id_ - && tablet_id_ == other.tablet_id_ - && flush_scn_ == other.flush_scn_; -} } // namespace mds } // namespace storage } // namespace oceanbase diff --git a/src/storage/multi_data_source/ob_mds_table_merge_task.cpp b/src/storage/multi_data_source/ob_mds_table_merge_task.cpp index c710c0f770..6782a8249f 100644 --- a/src/storage/multi_data_source/ob_mds_table_merge_task.cpp +++ b/src/storage/multi_data_source/ob_mds_table_merge_task.cpp @@ -15,10 +15,14 @@ #include "storage/ls/ob_ls_get_mod.h" #include "storage/tablet/ob_tablet_common.h" #include "storage/tablet/ob_tablet.h" +#include "storage/compaction/ob_tablet_merge_ctx.h" +#include "storage/compaction/ob_tablet_merge_task.h" +#include "storage/multi_data_source/ob_mds_table_merge_dag.h" #define USING_LOG_PREFIX MDS using namespace oceanbase::common; +using namespace oceanbase::compaction; namespace oceanbase { @@ -29,20 +33,32 @@ namespace mds ObMdsTableMergeTask::ObMdsTableMergeTask() : ObITask(ObITaskType::TASK_TYPE_MDS_TABLE_MERGE), is_inited_(false), - param_() + mds_merge_dag_(nullptr) { } -int ObMdsTableMergeTask::init(const ObMdsTableMergeDagParam ¶m) +int ObMdsTableMergeTask::init() { int ret = OB_SUCCESS; if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; + } else if (OB_ISNULL(dag_)) { + ret = OB_ERR_SYS; + LOG_WARN("dag must not be null", K(ret)); + } else if (OB_UNLIKELY(ObDagType::ObDagTypeEnum::DAG_TYPE_MDS_TABLE_MERGE != dag_->get_type())) { + ret = OB_ERR_SYS; + LOG_ERROR("dag type not match", K(ret), KPC_(dag)); } else { - param_ = param; - param_.generate_ts_ = ObClockGenerator::getClock(); - is_inited_ = true; + ObMdsTableMergeDag *mds_merge_dag = static_cast(dag_); + const ObTabletMergeDagParam &merge_dag_param = mds_merge_dag->get_param(); + if (OB_UNLIKELY(!merge_dag_param.is_valid())) { + ret = OB_ERR_SYS; + LOG_WARN("param is not valid", K(ret), "param", mds_merge_dag->get_param()); + } else { + mds_merge_dag_ = mds_merge_dag; + is_inited_ = true; + } } return ret; @@ -52,45 +68,53 @@ int ObMdsTableMergeTask::process() { TIMEGUARD_INIT(STORAGE, 10_ms, 5_s); int ret = OB_SUCCESS; - int tmp_ret = OB_SUCCESS; - ObLSService *ls_service = MTL(ObLSService*); - ObLSHandle ls_handle; - ObLS *ls = nullptr; - ObTabletHandle tablet_handle; - ObTablet *tablet = nullptr; - const share::ObLSID &ls_id = param_.ls_id_; - const common::ObTabletID &tablet_id = param_.tablet_id_; - const share::SCN &flush_scn = param_.flush_scn_; DEBUG_SYNC(AFTER_EMPTY_SHELL_TABLET_CREATE); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); - } else if (MDS_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) { - LOG_WARN("failed to get ls", K(ret), K(ls_id)); - } else if (OB_ISNULL(ls = ls_handle.get_ls())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls is null", K(ret), K(ls_id), K(ls_handle)); - } else if (MDS_FAIL(ls->get_tablet(tablet_id, tablet_handle, 0, ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { - LOG_WARN("failed to get tablet", K(ret), K(tablet_id)); - } else if (OB_ISNULL(tablet = tablet_handle.get_obj())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("tablet is null", K(ret), K(ls_id), K(tablet_handle)); - } else if (MDS_FAIL(ls->get_tablet_svr()->build_new_tablet_from_mds_table(tablet_id, flush_scn))) { - LOG_WARN("failed to build new tablet from mds table", K(ret), K(ls_id), K(tablet_id), K(flush_scn)); } else { - share::dag_yield(); - } + int tmp_ret = OB_SUCCESS; + ObLS *ls = nullptr; + ObTablet *tablet = nullptr; + ObTabletMergeCtx &ctx = mds_merge_dag_->get_ctx(); + const share::ObLSID &ls_id = ctx.param_.ls_id_; + const common::ObTabletID &tablet_id = ctx.param_.tablet_id_; + const share::SCN &flush_scn = mds_merge_dag_->get_flush_scn(); + int64_t ls_rebuild_seq = -1; - // always notify flush ret - if (CLICK() && OB_NOT_NULL(tablet) && OB_TMP_FAIL(tablet->notify_mds_table_flush_ret(flush_scn, ret))) { - LOG_WARN("failed to notify mds table flush ret", K(tmp_ret), K(ls_id), K(tablet_id), K(flush_scn), "flush_ret", ret); - } + if (OB_ISNULL(ls = ctx.ls_handle_.get_ls())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls is null", K(ret), K(ls_id), "ls_handle", ctx.ls_handle_); + } else if (ls->is_offline()) { + ret = OB_CANCELED; + LOG_INFO("ls offline, skip merge", K(ret), K(ctx)); + } else if (FALSE_IT(ctx.time_guard_.click(ObCompactionTimeGuard::DAG_WAIT_TO_SCHEDULE))) { + } else if (MDS_FAIL(ls->get_tablet(tablet_id, ctx.tablet_handle_, 0, ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { + LOG_WARN("failed to get tablet", K(ret), K(ls_id), K(tablet_id)); + } else if (OB_ISNULL(tablet = ctx.tablet_handle_.get_obj())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("tablet is null", K(ret), K(ls_id), K(tablet_id), "tablet_handle", ctx.tablet_handle_); + } else if (FALSE_IT(ctx.time_guard_.click(ObCompactionTimeGuard::GET_TABLET))) { + } else if (FALSE_IT(ls_rebuild_seq = ls->get_rebuild_seq())) { + } else if (MDS_FAIL(ls->build_new_tablet_from_mds_table(ls_rebuild_seq, tablet_id, flush_scn))) { + LOG_WARN("failed to build new tablet from mds table", K(ret), K(ls_id), K(tablet_id), K(ls_rebuild_seq), K(flush_scn)); + } else { + ctx.time_guard_.click(ObCompactionTimeGuard::EXECUTE); + share::dag_yield(); + } - if (OB_FAIL(ret)) { - } else if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) { - ret = tmp_ret; + // always notify flush ret + if (OB_NOT_NULL(tablet) && MDS_TMP_FAIL(tablet->notify_mds_table_flush_ret(flush_scn, ret))) { + LOG_WARN("failed to notify mds table flush ret", K(tmp_ret), K(ls_id), K(tablet_id), K(flush_scn), "flush_ret", ret); + if (OB_SUCC(ret)) { + // ret equals to OB_SUCCESS, use tmp_ret as return value + ret = tmp_ret; + } + } + ctx.time_guard_.click(ObCompactionTimeGuard::DAG_FINISH); + (void)ctx.collect_running_info(); } return ret; diff --git a/src/storage/multi_data_source/ob_mds_table_merge_task.h b/src/storage/multi_data_source/ob_mds_table_merge_task.h index 7d25ab9b4b..7c8168fa07 100644 --- a/src/storage/multi_data_source/ob_mds_table_merge_task.h +++ b/src/storage/multi_data_source/ob_mds_table_merge_task.h @@ -14,7 +14,6 @@ #define OCEANBASE_STORAGE_OB_MDS_TABLE_MERGE_TASK #include "share/scheduler/ob_dag_scheduler.h" -#include "storage/multi_data_source/ob_mds_table_merge_dag_param.h" namespace oceanbase { @@ -22,6 +21,8 @@ namespace storage { namespace mds { +class ObMdsTableMergeDag; + class ObMdsTableMergeTask : public share::ObITask { public: @@ -32,10 +33,10 @@ public: public: virtual int process() override; - int init(const ObMdsTableMergeDagParam ¶m); + int init(); private: bool is_inited_; - ObMdsTableMergeDagParam param_; + ObMdsTableMergeDag *mds_merge_dag_; }; } // namespace mds } // namespace storage diff --git a/src/storage/multi_data_source/runtime_utility/common_define.h b/src/storage/multi_data_source/runtime_utility/common_define.h index d9b26aef72..dc1075431c 100644 --- a/src/storage/multi_data_source/runtime_utility/common_define.h +++ b/src/storage/multi_data_source/runtime_utility/common_define.h @@ -233,6 +233,7 @@ do {\ // flag is needed to rollback logic #define MDS_FAIL_FLAG(stmt, flag) (CLICK_FAIL(stmt) || FALSE_IT(flag = true)) #define MDS_FAIL(stmt) (CLICK_FAIL(stmt)) +#define MDS_TMP_FAIL(stmt) (CLICK_TMP_FAIL(stmt)) } } diff --git a/src/storage/tablet/ob_tablet.cpp b/src/storage/tablet/ob_tablet.cpp index a9fcd7ede2..240a1174ab 100755 --- a/src/storage/tablet/ob_tablet.cpp +++ b/src/storage/tablet/ob_tablet.cpp @@ -2351,20 +2351,26 @@ int ObTablet::auto_get_read_tables( const bool allow_no_ready_read) { int ret = OB_SUCCESS; + const share::ObLSID &ls_id = tablet_meta_.ls_id_; + const common::ObTabletID &tablet_id = tablet_meta_.tablet_id_; iter.table_store_iter_.reset(); bool succ_get_src_tables = false; + if (OB_UNLIKELY(tablet_meta_.has_transfer_table())) { if (OB_FAIL(get_src_tablet_read_tables_(snapshot_version, allow_no_ready_read, iter, succ_get_src_tables))) { - LOG_WARN("failed to get src ls read tables", K(ret), K(snapshot_version), K(tablet_meta_)); + LOG_WARN("failed to get src ls read tables", K(ret), K(ls_id), K(tablet_id), + K(snapshot_version), "has_transfer_table", tablet_meta_.has_transfer_table()); } else { - LOG_INFO("get read tables during transfer", K(snapshot_version), K(iter.table_store_iter_.table_ptr_array_)); + FLOG_INFO("get read tables during transfer",K(ret), K(ls_id), K(tablet_id), + K(snapshot_version), "has_transfer_table", tablet_meta_.has_transfer_table(), + K(iter.table_store_iter_.table_ptr_array_)); } } if (OB_FAIL(ret)) { } else { bool allow_not_ready = succ_get_src_tables ? true : allow_no_ready_read; if (OB_FAIL(get_read_tables_(snapshot_version, iter.table_store_iter_, iter.table_store_iter_.table_store_handle_, allow_not_ready))) { - LOG_WARN("failed to get read tables from table store", K(ret), K(*this)); + LOG_WARN("failed to get read tables from table store", K(ret), K(ls_id), K(tablet_id)); } } return ret;