From 4afce610984d91be68fcca9c5e2a9d7f6ab1088b Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 4 Nov 2022 02:08:04 +0000 Subject: [PATCH] fix bugs & compaction check in ls offline --- src/share/scheduler/ob_dag_scheduler.cpp | 21 +++++++++ src/share/scheduler/ob_dag_scheduler.h | 1 + .../compaction/ob_compaction_diagnose.cpp | 23 +++++++--- .../compaction/ob_compaction_diagnose.h | 8 +++- .../compaction/ob_tablet_merge_task.cpp | 7 +++ .../ob_tenant_compaction_progress.cpp | 1 + .../compaction/ob_tenant_freeze_info_mgr.cpp | 1 + .../compaction/ob_tenant_tablet_scheduler.cpp | 22 +++++++++ .../compaction/ob_tenant_tablet_scheduler.h | 1 + src/storage/ls/ob_ls.h | 1 + src/storage/ob_storage_schema_recorder.cpp | 46 ++++++++----------- src/storage/ob_storage_schema_recorder.h | 11 ++++- src/storage/tablet/ob_tablet.cpp | 3 ++ src/storage/tablet/ob_tablet_memtable_mgr.cpp | 1 + 14 files changed, 111 insertions(+), 36 deletions(-) diff --git a/src/share/scheduler/ob_dag_scheduler.cpp b/src/share/scheduler/ob_dag_scheduler.cpp index 7ae3f88c9..502f79ba5 100644 --- a/src/share/scheduler/ob_dag_scheduler.cpp +++ b/src/share/scheduler/ob_dag_scheduler.cpp @@ -2029,6 +2029,27 @@ int ObTenantDagScheduler::get_all_compaction_dag_info( return ret; } +int ObTenantDagScheduler::check_ls_compaction_dag_exist(const ObLSID &ls_id, bool &exist) +{ + int ret = OB_SUCCESS; + exist = false; + compaction::ObTabletMergeDag *dag = nullptr; + ObThreadCondGuard guard(scheduler_sync_); + for (int64_t i = 0; i < ObIDag::MergeDagPrioCnt; ++i) { + ObIDag *head = dag_list_[READY_DAG_LIST].get_head(ObIDag::MergeDagPrio[i]); + ObIDag *cur = head->get_next(); + while (head != cur) { + dag = static_cast(cur); + if (ls_id == dag->get_ctx().param_.ls_id_) { + exist = true; + break; + } + cur = cur->get_next(); + } + } + return ret; +} + // get max estimated_finish_time to update server_progress int ObTenantDagScheduler::get_max_major_finish_time(const int64_t version, int64_t &estimated_finish_time) { diff --git a/src/share/scheduler/ob_dag_scheduler.h b/src/share/scheduler/ob_dag_scheduler.h index 9e174db01..3d24def46 100644 --- a/src/share/scheduler/ob_dag_scheduler.h +++ b/src/share/scheduler/ob_dag_scheduler.h @@ -790,6 +790,7 @@ public: ObIArray &progress_array); int get_max_major_finish_time(const int64_t version, int64_t &estimated_finish_time); int diagnose_dag(const ObIDag *dag, compaction::ObDiagnoseTabletCompProgress &input_progress); + int check_ls_compaction_dag_exist(const ObLSID &ls_id, bool &exist); int check_dag_net_exist( const ObDagId &dag_id, bool &exist); private: diff --git a/src/storage/compaction/ob_compaction_diagnose.cpp b/src/storage/compaction/ob_compaction_diagnose.cpp index 51ebfcdef..a6a97c66a 100644 --- a/src/storage/compaction/ob_compaction_diagnose.cpp +++ b/src/storage/compaction/ob_compaction_diagnose.cpp @@ -32,6 +32,13 @@ using namespace share; namespace compaction { +int64_t ObScheduleSuspectInfo::hash() const +{ + int64_t hash_value = ObMergeDagHash::inner_hash(); + hash_value = common::murmurhash(&tenant_id_, sizeof(tenant_id_), hash_value); + return hash_value; +} + bool ObScheduleSuspectInfo::is_valid() const { bool bret = true; @@ -45,6 +52,7 @@ bool ObScheduleSuspectInfo::is_valid() const ObScheduleSuspectInfo & ObScheduleSuspectInfo::operator = (const ObScheduleSuspectInfo &other) { + tenant_id_ = other.tenant_id_; merge_type_ = other.merge_type_; ls_id_ = other.ls_id_; tablet_id_ = other.tablet_id_; @@ -320,13 +328,14 @@ int ObCompactionDiagnoseMgr::get_suspect_info( ObScheduleSuspectInfo &ret_info) { int ret = OB_SUCCESS; - compaction::ObMergeDagHash dag_hash; - dag_hash.merge_type_ = merge_type; - dag_hash.ls_id_ = ls_id; - dag_hash.tablet_id_ = tablet_id; - if (OB_FAIL(ObScheduleSuspectInfoMgr::get_instance().get_suspect_info(dag_hash.inner_hash(), ret_info))) { + ObScheduleSuspectInfo input_info; + input_info.tenant_id_ = MTL_ID(); + input_info.merge_type_ = merge_type; + input_info.ls_id_ = ls_id; + input_info.tablet_id_ = tablet_id; + if (OB_FAIL(ObScheduleSuspectInfoMgr::get_instance().get_suspect_info(input_info.hash(), ret_info))) { if (OB_HASH_NOT_EXIST != ret) { - LOG_WARN("failed to get suspect info", K(ret), K(dag_hash)); + LOG_WARN("failed to get suspect info", K(ret), K(input_info)); } } else if (ret_info.add_time_ + SUSPECT_INFO_WARNING_THRESHOLD < ObTimeUtility::fast_current_time()) { ret = OB_ENTRY_NOT_EXIST; @@ -413,7 +422,7 @@ int ObCompactionDiagnoseMgr::diagnose_tenant_tablet() SET_DIAGNOSE_INFO( info_array_[idx_++], MINI_MERGE, - MTL_ID(), + ret_info.tenant_id_, ls_id, ObTabletID(INT64_MAX), ObCompactionDiagnoseInfo::DIA_STATUS_FAILED, diff --git a/src/storage/compaction/ob_compaction_diagnose.h b/src/storage/compaction/ob_compaction_diagnose.h index 241e2c3b7..b11c33fec 100644 --- a/src/storage/compaction/ob_compaction_diagnose.h +++ b/src/storage/compaction/ob_compaction_diagnose.h @@ -32,13 +32,16 @@ struct ObScheduleSuspectInfo : public common::ObDLinkBase { ObScheduleSuspectInfo() : ObMergeDagHash(), + tenant_id_(OB_INVALID_ID), add_time_(0), suspect_info_("\0") {} + int64_t hash() const; bool is_valid() const; ObScheduleSuspectInfo & operator = (const ObScheduleSuspectInfo &other); - TO_STRING_KV(K_(merge_type), K_(ls_id), K_(tablet_id), K_(add_time), K_(suspect_info)); + TO_STRING_KV(K_(tenant_id), K_(merge_type), K_(ls_id), K_(tablet_id), K_(add_time), K_(suspect_info)); + int64_t tenant_id_; int64_t add_time_; char suspect_info_[common::OB_DIAGNOSE_INFO_LENGTH]; }; @@ -212,6 +215,7 @@ private: int64_t __pos = 0; \ int ret = OB_SUCCESS; \ compaction::ObScheduleSuspectInfo info; \ + info.tenant_id_ = MTL_ID(); \ info.merge_type_ = type; \ info.ls_id_ = ls_id; \ info.tablet_id_ = tablet_id; \ @@ -226,7 +230,7 @@ private: buf[__pos++] = '.'; \ } \ SIMPLE_TO_STRING_##n \ - if (OB_FAIL(ObScheduleSuspectInfoMgr::get_instance().add_suspect_info(info.inner_hash(), info))) { \ + if (OB_FAIL(ObScheduleSuspectInfoMgr::get_instance().add_suspect_info(info.hash(), info))) { \ STORAGE_LOG(WARN, "failed to add suspect info", K(ret), K(info)); \ } else { \ STORAGE_LOG(DEBUG, "success to add suspect info", K(ret), K(info)); \ diff --git a/src/storage/compaction/ob_tablet_merge_task.cpp b/src/storage/compaction/ob_tablet_merge_task.cpp index 7a95604f3..56d9d2092 100644 --- a/src/storage/compaction/ob_tablet_merge_task.cpp +++ b/src/storage/compaction/ob_tablet_merge_task.cpp @@ -604,6 +604,9 @@ int ObTabletMergePrepareTask::process() && !MTL(ObTenantTabletScheduler *)->could_major_merge_start())) { ret = OB_CANCELED; LOG_INFO("Merge has been paused", K(ret), K(ctx)); + } else if (ctx->ls_handle_.get_ls()->is_offline()) { + ret = OB_CANCELED; + LOG_INFO("ls offline, skip merge", K(ret), K(ctx)); } else if (FALSE_IT(ctx->time_guard_.click(ObCompactionTimeGuard::DAG_WAIT_TO_SCHEDULE))) { } else if (OB_FAIL(ctx->ls_handle_.get_ls()->get_tablet(ctx->param_.tablet_id_, ctx->tablet_handle_, @@ -890,6 +893,10 @@ int ObTabletMergeFinishTask::process() if (OB_SUCC(ret) && OB_NOT_NULL(ctx.merge_progress_)) { int tmp_ret = OB_SUCCESS; + // update merge info + if (OB_TMP_FAIL(ctx.merge_progress_->update_merge_info(ctx.merge_info_.get_sstable_merge_info()))) { + STORAGE_LOG(WARN, "fail to update update merge info", K(tmp_ret)); + } if (OB_TMP_FAIL(compaction::ObCompactionSuggestionMgr::get_instance().analyze_merge_info( ctx.merge_info_, *ctx.merge_progress_))) { diff --git a/src/storage/compaction/ob_tenant_compaction_progress.cpp b/src/storage/compaction/ob_tenant_compaction_progress.cpp index aaa33af49..b86409fbb 100644 --- a/src/storage/compaction/ob_tenant_compaction_progress.cpp +++ b/src/storage/compaction/ob_tenant_compaction_progress.cpp @@ -151,6 +151,7 @@ int ObTenantCompactionProgressMgr::loop_major_sstable_( int64_t &size) { int ret = OB_SUCCESS; + common::ObTimeGuard timeguard("loop_major_sstable_to_calc_progress_size", 30 * 1000 * 1000); // 30s ObSharedGuard ls_iter_guard; ObLS *ls = nullptr; if (OB_FAIL(MTL(ObLSService *)->get_ls_iter(ls_iter_guard, ObLSGetMod::STORAGE_MOD))) { diff --git a/src/storage/compaction/ob_tenant_freeze_info_mgr.cpp b/src/storage/compaction/ob_tenant_freeze_info_mgr.cpp index 625ce768a..427e5718e 100644 --- a/src/storage/compaction/ob_tenant_freeze_info_mgr.cpp +++ b/src/storage/compaction/ob_tenant_freeze_info_mgr.cpp @@ -152,6 +152,7 @@ int ObTenantFreezeInfoMgr::get_min_dependent_freeze_info(FreezeInfo &freeze_info idx = info_list.count() - MIN_DEPENDENT_FREEZE_INFO_GAP; } ret = get_info_nolock(idx, freeze_info); + LOG_INFO("get min dependent freeze info", K(ret), K(freeze_info)); // diagnose code for issue 45841468 return ret; } diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index 4a2d96744..7bf6e5023 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -347,6 +347,26 @@ int ObTenantTabletScheduler::update_upper_trans_version_and_gc_sstable() return ret; } +int ObTenantTabletScheduler::wait_ls_compaction_finish(const share::ObLSID &ls_id) +{ + int ret = OB_SUCCESS; + bool exist = false; + if (OB_UNLIKELY(!ls_id.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(ls_id)); + } + while (OB_SUCC(ret)) { + if (OB_FAIL(MTL(ObTenantDagScheduler*)->check_ls_compaction_dag_exist(ls_id, exist))) { + LOG_WARN("failed to check ls compaction dag", K(ret), K(ls_id)); + } else if (!exist) { + break; + } else { + ob_usleep(100 * 1000); // 100ms + } + } + return ret; +} + int ObTenantTabletScheduler::schedule_build_bloomfilter( const uint64_t table_id, const blocksstable::MacroBlockId ¯o_id, @@ -528,6 +548,8 @@ int ObTenantTabletScheduler::check_ls_state(ObLS &ls, bool &need_merge) need_merge = false; if (ls.is_deleted()) { LOG_INFO("ls is deleted", K(ret), K(ls)); + } else if (ls.is_offline()) { + LOG_INFO("ls is offline", K(ret), K(ls)); } else { need_merge = true; } diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.h b/src/storage/compaction/ob_tenant_tablet_scheduler.h index bac145548..8b2227bad 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.h +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.h @@ -112,6 +112,7 @@ public: int merge_all(); int schedule_merge(const int64_t broadcast_version); int update_upper_trans_version_and_gc_sstable(); + int wait_ls_compaction_finish(const share::ObLSID &ls_id); // Schedule an async task to build bloomfilter for the given macro block. // The bloomfilter build task will be ignored if a same build task exists in the queue. diff --git a/src/storage/ls/ob_ls.h b/src/storage/ls/ob_ls.h index 12fcf5143..fcde013d9 100644 --- a/src/storage/ls/ob_ls.h +++ b/src/storage/ls/ob_ls.h @@ -162,6 +162,7 @@ public: void destroy(); int offline(); int online(); + bool is_offline() const { return false; } // mock function, TODO(@yanyuan) ObLSTxService *get_tx_svr() { return &ls_tx_svr_; } ObLockTable *get_lock_table() { return &lock_table_; } diff --git a/src/storage/ob_storage_schema_recorder.cpp b/src/storage/ob_storage_schema_recorder.cpp index d601f268c..78708aad3 100644 --- a/src/storage/ob_storage_schema_recorder.cpp +++ b/src/storage/ob_storage_schema_recorder.cpp @@ -36,16 +36,9 @@ using namespace share::schema; namespace storage { -int ObStorageSchemaRecorder::ObStorageSchemaLogCb::set_table_version(const int64_t table_version) +void ObStorageSchemaRecorder::ObStorageSchemaLogCb::set_table_version(const int64_t table_version) { - int ret = OB_SUCCESS; - - if (OB_UNLIKELY(!ATOMIC_BCAS(&table_version_, OB_INVALID_VERSION, table_version))) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("double set table_version", K(ret), K(table_version_), K(table_version)); - } - - return ret; + ATOMIC_SET(&table_version_, table_version); } int ObStorageSchemaRecorder::ObStorageSchemaLogCb::on_success() @@ -490,24 +483,25 @@ int ObStorageSchemaRecorder::submit_schema_log(const int64_t table_id) logcb_ptr_ = new(buf) ObStorageSchemaLogCb(*this); } } - if (FAILEDx(logcb_ptr_->set_table_version(storage_schema_->get_schema_version()))) { - LOG_ERROR("fail to set table version", K(ret), K_(tablet_id)); - } else if (FALSE_IT(ATOMIC_STORE(&logcb_finish_flag_, false))) { - } else if (FALSE_IT(storage_schema_->set_sync_finish(false))) { - } else if (OB_FAIL(tablet_handle_.get_obj()->save_multi_source_data_unit(storage_schema_, - ObLogTsRange::MAX_TS, false/*for_replay*/, memtable::MemtableRefOp::INC_REF))) { - if (OB_BLOCK_FROZEN != ret) { - LOG_WARN("failed to inc ref for storage schema", K(ret), K_(tablet_id), K(storage_schema_)); + if (OB_SUCC(ret)) { + logcb_ptr_->set_table_version(storage_schema_->get_schema_version()); + ATOMIC_STORE(&logcb_finish_flag_, false); + storage_schema_->set_sync_finish(false); + if (OB_FAIL(tablet_handle_.get_obj()->save_multi_source_data_unit(storage_schema_, + ObLogTsRange::MAX_TS, false/*for_replay*/, memtable::MemtableRefOp::INC_REF))) { + if (OB_BLOCK_FROZEN != ret) { + LOG_WARN("failed to inc ref for storage schema", K(ret), K_(tablet_id), K(storage_schema_)); + } + } else if (OB_FAIL(log_handler_->append(clog_buf_, clog_len_, ref_ts_ns, need_nonblock, logcb_ptr_, lsn, clog_ts_))) { + LOG_WARN("fail to submit log", K(ret), K_(tablet_id)); + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(dec_ref_on_memtable(false))) { + LOG_ERROR("failed to dec ref on memtable", K(tmp_ret), K_(ls_id), K_(tablet_id)); + } + } else { + LOG_INFO("submit schema log succeed", K(ret), K_(ls_id), K_(tablet_id), K_(clog_ts), K_(clog_len), + "schema_version", storage_schema_->get_schema_version()); } - } else if (OB_FAIL(log_handler_->append(clog_buf_, clog_len_, ref_ts_ns, need_nonblock, logcb_ptr_, lsn, clog_ts_))) { - LOG_WARN("fail to submit log", K(ret), K_(tablet_id)); - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(dec_ref_on_memtable(false))) { - LOG_ERROR("failed to dec ref on memtable", K(tmp_ret), K_(ls_id), K_(tablet_id)); - } - } else { - LOG_INFO("submit schema log succeed", K(ret), K_(ls_id), K_(tablet_id), K_(clog_ts), K_(clog_len), - "schema_version", storage_schema_->get_schema_version()); } return ret; diff --git a/src/storage/ob_storage_schema_recorder.h b/src/storage/ob_storage_schema_recorder.h index 8d789204d..17d54681a 100644 --- a/src/storage/ob_storage_schema_recorder.h +++ b/src/storage/ob_storage_schema_recorder.h @@ -59,6 +59,14 @@ public: logservice::ObLogHandler *log_handler); void reset(); bool is_inited() const { return is_inited_; } + bool is_valid() const + { + return is_inited_ + && ls_id_.is_valid() + && tablet_id_.is_valid() + && nullptr != log_handler_ + && max_saved_table_version_ >= 0; + } // follower int replay_schema_log(const int64_t log_ts, const char *buf, const int64_t size, int64_t &pos); @@ -72,6 +80,7 @@ public: ObStorageSchemaRecorder(const ObStorageSchemaRecorder&) = delete; ObStorageSchemaRecorder& operator=(const ObStorageSchemaRecorder&) = delete; int64_t get_max_sync_version() const { return ATOMIC_LOAD(&max_saved_table_version_); } + TO_STRING_KV(K_(is_inited), K_(ls_id), K_(tablet_id)); private: class ObStorageSchemaLogCb : public logservice::AppendCb @@ -80,7 +89,7 @@ private: virtual int on_success() override; virtual int on_failure() override; - int set_table_version(const int64_t table_version); + void set_table_version(const int64_t table_version); ObStorageSchemaLogCb(ObStorageSchemaRecorder &recorder) : recorder_(recorder), diff --git a/src/storage/tablet/ob_tablet.cpp b/src/storage/tablet/ob_tablet.cpp index 6293ef1e3..5f22b9970 100644 --- a/src/storage/tablet/ob_tablet.cpp +++ b/src/storage/tablet/ob_tablet.cpp @@ -2811,6 +2811,9 @@ int ObTablet::check_max_sync_schema_version() const if (OB_FAIL(get_memtable_mgr(memtable_mgr))) { LOG_WARN("failed to get memtable mgr", K(ret)); } else if (FALSE_IT(data_memtable_mgr = static_cast(memtable_mgr))) { + } else if (OB_UNLIKELY(!data_memtable_mgr->get_storage_schema_recorder().is_valid())) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("schema recorder is invalid", K(ret), K_(tablet_meta), KPC(data_memtable_mgr)); } else if (OB_FAIL(data_memtable_mgr->get_multi_source_data_unit(&storage_schema, &tmp_allocator))) { LOG_ERROR("failed to storage schema from memtable, max_sync_schema_version is invalid", K(ret), K(max_sync_schema_version), KPC(data_memtable_mgr)); diff --git a/src/storage/tablet/ob_tablet_memtable_mgr.cpp b/src/storage/tablet/ob_tablet_memtable_mgr.cpp index b33883954..978ee2da3 100644 --- a/src/storage/tablet/ob_tablet_memtable_mgr.cpp +++ b/src/storage/tablet/ob_tablet_memtable_mgr.cpp @@ -796,6 +796,7 @@ int64_t ObTabletMemtableMgr::to_string(char *buf, const int64_t buf_len) const J_COMMA(); } } + J_KV("schema_recorder", schema_recorder_); J_ARRAY_END(); J_OBJ_END(); J_OBJ_END();