diff --git a/src/share/scheduler/ob_dag_scheduler_config.h b/src/share/scheduler/ob_dag_scheduler_config.h index af0011dc0e..24b34800e4 100644 --- a/src/share/scheduler/ob_dag_scheduler_config.h +++ b/src/share/scheduler/ob_dag_scheduler_config.h @@ -60,6 +60,8 @@ DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_WRITE_CKPT, ObDagPrio::DAG_PRIO_COMPACTION_L false, 2, {"ls_id", "tablet_id"}) DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_MDS_TABLE_MERGE, ObDagPrio::DAG_PRIO_COMPACTION_HIGH, ObSysTaskType::MDS_TABLE_MERGE_TASK, "MDS_TABLE_MERGE", "COMPACTION", false, 3, {"ls_id", "tablet_id", "flush_scn"}) +DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BATCH_FREEZE_TABLETS, ObDagPrio::DAG_PRIO_COMPACTION_HIGH, ObSysTaskType::BATCH_FREEZE_TABLET_TASK, "BATCH_FREEZE", "COMPACTION", + false, 3, {"ls_id", "compaction_scn", "tablet_count"}) DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_DDL, ObDagPrio::DAG_PRIO_DDL, ObSysTaskType::DDL_TASK, "DDL_COMPLEMENT", "DDL", true, 7, {"ls_id", "source_tablet_id", "dest_tablet_id", "data_table_id", "target_table_id", "schema_version", "snapshot_version"}) diff --git a/src/share/scheduler/ob_sys_task_stat.cpp b/src/share/scheduler/ob_sys_task_stat.cpp index aec91ce0f8..d94a28a3a4 100644 --- a/src/share/scheduler/ob_sys_task_stat.cpp +++ b/src/share/scheduler/ob_sys_task_stat.cpp @@ -40,7 +40,8 @@ const static char *ObSysTaskTypeStr[] = { "MDS_TABLE_MERGE", "TTL_TASK", "TENANT_SNAPSHOT_CREATE", - "TENANT_SNAPSHOT_GC" + "TENANT_SNAPSHOT_GC", + "BATCH_FREEZE_TABLET_TASK" }; const char *sys_task_type_to_str(const ObSysTaskType &type) diff --git a/src/share/scheduler/ob_sys_task_stat.h b/src/share/scheduler/ob_sys_task_stat.h index b8329bc6e3..3144b21dda 100644 --- a/src/share/scheduler/ob_sys_task_stat.h +++ b/src/share/scheduler/ob_sys_task_stat.h @@ -54,6 +54,7 @@ enum ObSysTaskType TABLE_API_TTL_TASK, TENANT_SNAPSHOT_CREATE_TASK, TENANT_SNAPSHOT_GC_TASK, + BATCH_FREEZE_TABLET_TASK, MAX_SYS_TASK_TYPE }; diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.cpp b/src/share/scheduler/ob_tenant_dag_scheduler.cpp index 1d29fdca4f..2baa3b9b4c 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.cpp +++ b/src/share/scheduler/ob_tenant_dag_scheduler.cpp @@ -2585,6 +2585,8 @@ int ObDagPrioScheduler::finish_dag_( if (need_add) { if (OB_TMP_FAIL(MTL(ObDagWarningHistoryManager*)->add_dag_warning_info(&dag))) { COMMON_LOG(WARN, "failed to add dag warning info", K(tmp_ret), K(dag)); + } else if (ObDagType::DAG_TYPE_BATCH_FREEZE_TABLETS == dag.get_type()) { + // no need to add diagnose } else { compaction::ObTabletMergeDag *merge_dag = static_cast(&dag); if (OB_SUCCESS != dag.get_dag_ret()) { @@ -2957,7 +2959,6 @@ int ObDagPrioScheduler::check_ls_compaction_dag_exist_with_cancel( { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - compaction::ObTabletMergeDag *dag = nullptr; exist = false; ObDagListIndex loop_list[2] = { READY_DAG_LIST, RANK_DAG_LIST }; ObIDag *cancel_dag = nullptr; @@ -2971,8 +2972,9 @@ int ObDagPrioScheduler::check_ls_compaction_dag_exist_with_cancel( ObIDag *head = dag_list_[list_idx].get_header(); ObIDag *cur = head->get_next(); while (head != cur) { - dag = static_cast(cur); - cancel_flag = (ls_id == dag->get_ls_id()); + cancel_flag = ObDagType::DAG_TYPE_BATCH_FREEZE_TABLETS == cur->get_type() + ? (ls_id == static_cast(cur)->get_param().ls_id_) + : (ls_id == static_cast(cur)->get_ls_id()); if (cancel_flag) { if (cur->get_dag_status() == ObIDag::DAG_STATUS_READY) { diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.h b/src/share/scheduler/ob_tenant_dag_scheduler.h index 10dd65bd49..920c879677 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.h +++ b/src/share/scheduler/ob_tenant_dag_scheduler.h @@ -210,6 +210,7 @@ public: TASK_TYPE_TTL_DELETE = 54, TASK_TYPE_TENANT_SNAPSHOT_CREATE = 55, TASK_TYPE_TENANT_SNAPSHOT_GC = 56, + TASK_TYPE_BATCH_FREEZE_TABLETS = 57, TASK_TYPE_MAX, }; @@ -1389,7 +1390,8 @@ inline bool is_compaction_dag(ObDagType::ObDagTypeEnum dag_type) ObDagType::DAG_TYPE_MINI_MERGE == dag_type || ObDagType::DAG_TYPE_MERGE_EXECUTE == dag_type || ObDagType::DAG_TYPE_TX_TABLE_MERGE == dag_type || - ObDagType::DAG_TYPE_MDS_TABLE_MERGE == dag_type; + ObDagType::DAG_TYPE_MDS_TABLE_MERGE == dag_type || + ObDagType::DAG_TYPE_BATCH_FREEZE_TABLETS == dag_type; } inline int dag_yield() diff --git a/src/storage/compaction/ob_tablet_merge_task.cpp b/src/storage/compaction/ob_tablet_merge_task.cpp index 1bf6b4955b..3320a65c1d 100644 --- a/src/storage/compaction/ob_tablet_merge_task.cpp +++ b/src/storage/compaction/ob_tablet_merge_task.cpp @@ -34,6 +34,7 @@ #include "storage/meta_mem/ob_tenant_meta_mem_mgr.h" #include "storage/compaction/ob_basic_tablet_merge_ctx.h" #include "storage/compaction/ob_tenant_compaction_progress.h" +#include "storage/tx_storage/ob_tenant_freezer.h" namespace oceanbase { @@ -1187,5 +1188,232 @@ void prepare_allocator( } +/* + * ----------------------------------------ObBatchFreezeTabletsParam-------------------------------------------- + */ +ObBatchFreezeTabletsParam::ObBatchFreezeTabletsParam() + : ls_id_(), + compaction_scn_(), + tablet_ids_() +{ + tablet_ids_.set_attr(lib::ObMemAttr(MTL_ID(), "BtFrzTbls", ObCtxIds::MERGE_NORMAL_CTX_ID)); +} + +int ObBatchFreezeTabletsParam::assign( + const ObBatchFreezeTabletsParam &other) +{ + int ret = OB_SUCCESS; + + if (this == &other) { + // do nothing + } else if (OB_FAIL(tablet_ids_.assign(other.tablet_ids_))) { + LOG_WARN("failed to copy tablet ids", K(ret)); + } else { + ls_id_ = other.ls_id_; + compaction_scn_ = other.compaction_scn_; + } + return ret; +} + +int64_t ObBatchFreezeTabletsParam::get_hash() const +{ + int64_t hash_val = 0; + hash_val = common::murmurhash(&ls_id_, sizeof(ls_id_), hash_val); + hash_val = common::murmurhash(&compaction_scn_, sizeof(compaction_scn_), hash_val); + return hash_val; +} + + +ObBatchFreezeTabletsDag::ObBatchFreezeTabletsDag() + : ObIDag(share::ObDagType::DAG_TYPE_BATCH_FREEZE_TABLETS), + is_inited_(false), + param_() +{ +} + +ObBatchFreezeTabletsDag::~ObBatchFreezeTabletsDag() +{ +} + +int ObBatchFreezeTabletsDag::init_by_param( + const share::ObIDagInitParam *param) +{ + int ret = OB_SUCCESS; + const ObBatchFreezeTabletsParam *init_param = nullptr; + + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("ObBatchFreezeTabletsDag has been inited", K(ret), KPC(this)); + } else if (FALSE_IT(init_param = static_cast(param))) { + } else if (OB_UNLIKELY(nullptr == init_param || !init_param->is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid arguments", K(ret), KPC(init_param)); + } else if (OB_FAIL(param_.assign(*init_param))) { + LOG_WARN("failed to init param", K(ret), KPC(init_param)); + } else { + is_inited_ = true; + } + return ret; +} + +int ObBatchFreezeTabletsDag::create_first_task() +{ + int ret = OB_SUCCESS; + ObBatchFreezeTabletsTask *task = nullptr; + + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("BatchFreezeTabletsDag has not inited", K(ret)); + } else if (OB_FAIL(create_task(nullptr/*parent*/, task))) { + LOG_WARN("failed to create batch tablet sstable task", K(ret)); + } + return ret; +} + +bool ObBatchFreezeTabletsDag::operator == (const ObIDag &other) const +{ + bool is_same = true; + + if (this == &other) { + // same + } else if (get_type() != other.get_type()) { + is_same = false; + } else if (param_.ls_id_ != static_cast(other).param_.ls_id_) { + is_same = false; + } else if (param_.compaction_scn_ != static_cast(other).param_.compaction_scn_) { + is_same = false; + } + return is_same; +} + +int64_t ObBatchFreezeTabletsDag::hash() const +{ + return param_.get_hash(); +} + +int ObBatchFreezeTabletsDag::fill_info_param( + compaction::ObIBasicInfoParam *&out_param, + ObIAllocator &allocator) const +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObBatchFreezeTabletsDag not inited", K(ret)); + } else if (OB_FAIL(ADD_DAG_WARN_INFO_PARAM(out_param, + allocator, + get_type(), + param_.ls_id_.id(), + param_.compaction_scn_, + param_.tablet_ids_.count()))) { + LOG_WARN("failed to fill info param", K(ret), K(param_)); + } + return ret; +} + +int ObBatchFreezeTabletsDag::fill_dag_key(char *buf, const int64_t buf_len) const +{ + int ret = OB_SUCCESS; + if (OB_FAIL(databuff_printf(buf, buf_len, "ls_id=%ld compaction_scn=%ld freeze_tablet_cnt=%ld", + param_.ls_id_.id(), param_.compaction_scn_, param_.tablet_ids_.count()))) { + LOG_WARN("failed to fill dag key", K(ret), K(param_)); + } + return ret; +} + + +ObBatchFreezeTabletsTask::ObBatchFreezeTabletsTask() + : ObITask(ObITask::TASK_TYPE_BATCH_FREEZE_TABLETS), + is_inited_(false), + base_dag_(nullptr) +{ +} + +ObBatchFreezeTabletsTask::~ObBatchFreezeTabletsTask() +{ +} + +int ObBatchFreezeTabletsTask::init() +{ + int ret = OB_SUCCESS; + + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("ObBatchFreezeTabletsTask init twice", K(ret)); + } else if (OB_ISNULL(dag_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null dag", K(ret)); + } else if (OB_UNLIKELY(ObDagType::ObDagTypeEnum::DAG_TYPE_BATCH_FREEZE_TABLETS != dag_->get_type())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected dag type", K(ret)); + } else if (FALSE_IT(base_dag_ = static_cast(dag_))) { + } else if (OB_UNLIKELY(!base_dag_->get_param().is_valid())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected not valid param", K(ret), K(base_dag_->get_param())); + } else { + is_inited_ = true; + } + return ret; +} + +int ObBatchFreezeTabletsTask::process() +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + const ObBatchFreezeTabletsParam ¶m = base_dag_->get_param(); + + ObLSHandle ls_handle; + ObLS *ls_ptr = nullptr; + if (OB_TMP_FAIL(MTL(ObLSService *)->get_ls(param.ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) { + LOG_WARN_RET(tmp_ret, "failed to get log stream", K(param)); + } else { + ls_ptr = ls_handle.get_ls(); + } + + int64_t fail_freeze_cnt = 0; + int64_t succ_schedule_cnt = 0; + for (int64_t i = 0; OB_SUCC(ret) && i < param.tablet_ids_.count(); ++i) { + const ObTabletID &tablet_id = param.tablet_ids_.at(i); + ObTabletHandle tablet_handle; + ObTablet *tablet = nullptr; + + if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(tablet_id, true/*need_rewrite*/, true/*is_sync*/))) { + LOG_WARN_RET(tmp_ret, "failed to force freeze tablet", K(param), K(tablet_id)); + ++fail_freeze_cnt; + } else if (OB_ISNULL(ls_ptr)) { + // do nothing + } else if (OB_TMP_FAIL(ls_ptr->get_tablet_svr()->get_tablet(tablet_id, + tablet_handle, + 0/*timeout_us*/, + storage::ObMDSGetTabletMode::READ_ALL_COMMITED))) { + LOG_WARN_RET(tmp_ret, "failed to get tablet", K(param), K(tablet_id)); + } else if (FALSE_IT(tablet = tablet_handle.get_obj())) { + } else if (OB_UNLIKELY(tablet->get_snapshot_version() < param.compaction_scn_)) { + // do nothing + } else if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_merge_dag(param.ls_id_, + *tablet, + MEDIUM_MERGE, + param.compaction_scn_))) { + if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) { + ret = tmp_ret; + LOG_WARN_RET(tmp_ret, "failed to schedule medium merge dag", K(param), K(tablet_id)); + } + } else { + ++succ_schedule_cnt; + } + + if (OB_FAIL(share::dag_yield())) { + LOG_WARN("failed to dag yield", K(ret)); + } + } + + if (OB_UNLIKELY(fail_freeze_cnt * 2 > param.tablet_ids_.count())) { + ret = OB_PARTIAL_FAILED; + } + FLOG_INFO("batch freeze tablets finished", KR(ret), K(param), K(fail_freeze_cnt), KP(ls_ptr), K(succ_schedule_cnt)); + + return ret; +} + + } // namespace compaction } // namespace oceanbase diff --git a/src/storage/compaction/ob_tablet_merge_task.h b/src/storage/compaction/ob_tablet_merge_task.h index 0edcf6359a..7d3e957db3 100644 --- a/src/storage/compaction/ob_tablet_merge_task.h +++ b/src/storage/compaction/ob_tablet_merge_task.h @@ -352,6 +352,71 @@ public: virtual ~ObTabletMiniMergeDag(); }; + +struct ObBatchFreezeTabletsParam : public share::ObIDagInitParam +{ +public: + ObBatchFreezeTabletsParam(); + virtual ~ObBatchFreezeTabletsParam() { tablet_ids_.reset(); } + virtual bool is_valid() const override { return ls_id_.is_valid() && compaction_scn_ > 0 && tablet_ids_.count() > 0; } + int assign(const ObBatchFreezeTabletsParam &other); + bool operator == (const ObBatchFreezeTabletsParam &other) const; + bool operator != (const ObBatchFreezeTabletsParam &other) const { return !this->operator==(other); } + int64_t get_hash() const; + VIRTUAL_TO_STRING_KV(K_(ls_id), K_(compaction_scn), "tablet_count", tablet_ids_.count(), K_(tablet_ids)); +public: + static constexpr int64_t DEFAULT_BATCH_SIZE = 16; + share::ObLSID ls_id_; + int64_t compaction_scn_; + common::ObSEArray tablet_ids_; +}; + + +class ObBatchFreezeTabletsDag : public share::ObIDag +{ +public: + ObBatchFreezeTabletsDag(); + virtual ~ObBatchFreezeTabletsDag(); + int init_by_param(const share::ObIDagInitParam *param); + virtual int create_first_task() override; + virtual bool operator == (const ObIDag &other) const override; + virtual int64_t hash() const override; + virtual int fill_info_param( + compaction::ObIBasicInfoParam *&out_param, + ObIAllocator &allocator) const override; + virtual int fill_dag_key(char *buf, const int64_t buf_len) const override; + virtual lib::Worker::CompatMode get_compat_mode() const override { return lib::Worker::CompatMode::MYSQL; } + virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; } + const ObBatchFreezeTabletsParam &get_param() const { return param_; } + virtual bool ignore_warning() override + { + return OB_PARTIAL_FAILED != dag_ret_; + } + + INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(is_inited), K_(param)); +private: + bool is_inited_; + ObBatchFreezeTabletsParam param_; +private: + DISALLOW_COPY_AND_ASSIGN(ObBatchFreezeTabletsDag); +}; + + +class ObBatchFreezeTabletsTask : public share::ObITask +{ +public: + ObBatchFreezeTabletsTask(); + virtual ~ObBatchFreezeTabletsTask(); + int init(); + virtual int process() override; +private: + bool is_inited_; + ObBatchFreezeTabletsDag *base_dag_; +private: + DISALLOW_COPY_AND_ASSIGN(ObBatchFreezeTabletsTask); +}; + + } // namespace compaction } // namespace oceanbase diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index 901c9bfc33..1317ee01cc 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -1603,11 +1603,10 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge( ls_time_guard.add_time_guard(tablet_time_guard); } // end of while - // TODO(@chengkong): submit a async task - FOREACH(need_freeze_tablet_id, need_freeze_tablets) { - if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(*need_freeze_tablet_id, true/*force_freeze*/, true/*is_sync*/))) { - LOG_WARN("failed to force freeze tablet", KR(tmp_ret), K(ls_id), K(*need_freeze_tablet_id)); - } + if (OB_FAIL(ret) || need_freeze_tablets.empty()) { + } else if (OB_TMP_FAIL(schedule_batch_freeze_dag(merge_version, ls_id, need_freeze_tablets))) { + LOG_WARN("failed to schedule batch force freeze tablets dag", K(tmp_ret), K(ls_id), + "tablet_count", need_freeze_tablets.count()); } ls_time_guard.click(ObCompactionScheduleTimeGuard::FAST_FREEZE); @@ -2048,5 +2047,33 @@ void ObTenantTabletScheduler::report_blocking_medium( } } +int ObTenantTabletScheduler::schedule_batch_freeze_dag( + const int64_t merge_version, + const share::ObLSID &ls_id, + const common::ObIArray &tablet_ids) +{ + int ret = OB_SUCCESS; + ObBatchFreezeTabletsParam param; + + if (OB_UNLIKELY(!ls_id.is_valid() || tablet_ids.empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid arguments", K(ret), K(ls_id), K(tablet_ids)); + } else if (FALSE_IT(param.ls_id_ = ls_id)) { + } else if (FALSE_IT(param.compaction_scn_ = merge_version)) { + } else if (OB_FAIL(param.tablet_ids_.assign(tablet_ids))) { + LOG_WARN("failed to assign tablet ids", K(ret)); + } else if (OB_FAIL(MTL(ObTenantDagScheduler *)->create_and_add_dag(¶m, true/*is_emergency*/))) { + if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) { + LOG_WARN("failed to create merge dag", K(ret), K(param)); + } else if (OB_EAGAIN == ret) { + LOG_WARN("curr ls exists batch freeze dag, wait the dag to finish", K(ret), K(ls_id)); + } + } else { + LOG_INFO("Succ to create tablet batch freeze dag", K(ret), K(param)); + } + return ret; +} + + } // namespace storage } // namespace oceanbase diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.h b/src/storage/compaction/ob_tenant_tablet_scheduler.h index cbc5c91fd3..f1cce5a941 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.h +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.h @@ -294,6 +294,10 @@ private: const bool &tablet_could_schedule_medium, const bool &could_major_merge, const share::ObLSID &ls_id); + int schedule_batch_freeze_dag( + const int64_t merge_version, + const share::ObLSID &ls_id, + const common::ObIArray &tablet_ids); public: static const int64_t INIT_COMPACTION_SCN = 1; typedef common::ObSEArray MinorParallelResultArray;