diff --git a/deps/oblib/src/lib/thread/threads.h b/deps/oblib/src/lib/thread/threads.h index 37822fb47c..e7c3e5b52a 100644 --- a/deps/oblib/src/lib/thread/threads.h +++ b/deps/oblib/src/lib/thread/threads.h @@ -117,6 +117,7 @@ protected: bool &has_set_stop() { return stop_; } int64_t get_thread_count() const { return n_threads_; } uint64_t get_thread_idx() const { return thread_idx_; } + void set_thread_idx(int64_t idx) { thread_idx_ = idx; } private: virtual void run(int64_t idx); diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index 398f6124d0..9011c321d0 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -47,6 +47,7 @@ #include "rpc/obrpc/ob_rpc_packet.h" #include "lib/container/ob_array.h" #include "share/rc/ob_tenant_module_init_ctx.h" +#include "sql/engine/px/ob_px_worker.h" using namespace oceanbase::lib; using namespace oceanbase::common; @@ -222,6 +223,14 @@ void ObPxPool::set_px_thread_name() lib::set_thread_name_inner(buf); } +void ObPxPool::run(int64_t idx) +{ + set_thread_idx(idx); + // Create worker for current thread. + ObPxWorker worker; + run1(); +} + void ObPxPool::run1() { int ret = OB_SUCCESS; diff --git a/src/observer/omt/ob_tenant.h b/src/observer/omt/ob_tenant.h index 820cb5b06c..75ec58ea1d 100644 --- a/src/observer/omt/ob_tenant.h +++ b/src/observer/omt/ob_tenant.h @@ -55,6 +55,7 @@ class ObPxPool : public share::ObThreadPool { using RunFuncT = std::function; + void run(int64_t idx) final; void run1() final; static const int64_t QUEUE_WAIT_TIME = 100 * 1000; diff --git a/src/rootserver/CMakeLists.txt b/src/rootserver/CMakeLists.txt index 8cb7dde1d6..e855a38ec5 100644 --- a/src/rootserver/CMakeLists.txt +++ b/src/rootserver/CMakeLists.txt @@ -110,6 +110,7 @@ ob_set_subtarget(ob_rootserver freeze freeze/ob_major_freeze_service.cpp freeze/ob_major_freeze_rpc_define.cpp freeze/ob_major_freeze_helper.cpp + freeze/ob_major_freeze_util.cpp ) ob_set_subtarget(ob_rootserver restore diff --git a/src/rootserver/freeze/ob_checksum_validator.cpp b/src/rootserver/freeze/ob_checksum_validator.cpp index 48319c2799..a27e85b52f 100644 --- a/src/rootserver/freeze/ob_checksum_validator.cpp +++ b/src/rootserver/freeze/ob_checksum_validator.cpp @@ -15,6 +15,7 @@ #include "rootserver/freeze/ob_checksum_validator.h" #include "rootserver/freeze/ob_freeze_info_manager.h" #include "rootserver/freeze/ob_zone_merge_manager.h" +#include "rootserver/freeze/ob_major_freeze_util.h" #include "rootserver/ob_root_utils.h" #include "lib/mysqlclient/ob_mysql_proxy.h" #include "lib/mysqlclient/ob_isql_client.h" @@ -301,6 +302,7 @@ bool ObIndexChecksumValidator::need_validate() const } int ObIndexChecksumValidator::validate_checksum( + const volatile bool &stop, const SCN &frozen_scn, const hash::ObHashMap &tablet_compaction_map, int64_t &table_count, @@ -308,7 +310,10 @@ int ObIndexChecksumValidator::validate_checksum( const int64_t expected_epoch) { int ret = OB_SUCCESS; - if ((!frozen_scn.is_valid()) || (tablet_compaction_map.empty())) { + if (stop) { + ret = OB_CANCELED; + LOG_WARN("already stop", KR(ret), K_(tenant_id)); + } else if ((!frozen_scn.is_valid()) || (tablet_compaction_map.empty())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K_(tenant_id), K(frozen_scn)); } else if (IS_NOT_INIT) { @@ -317,14 +322,15 @@ int ObIndexChecksumValidator::validate_checksum( } else if (!is_primary_cluster()) { ret = OB_INNER_STAT_ERROR; LOG_WARN("can only check index column checksum in primary cluster", KR(ret)); - } else if (OB_FAIL(check_all_table_verification_finished(frozen_scn, tablet_compaction_map, table_count, - table_compaction_map, expected_epoch))) { + } else if (OB_FAIL(check_all_table_verification_finished(stop, frozen_scn, tablet_compaction_map, + table_count, table_compaction_map, expected_epoch))) { LOG_WARN("fail to check all table verification finished", KR(ret), K_(tenant_id), K(frozen_scn)); } return ret; } int ObIndexChecksumValidator::check_all_table_verification_finished( + const volatile bool &stop, const SCN &frozen_scn, const hash::ObHashMap &tablet_compaction_map, int64_t &table_count, @@ -338,13 +344,16 @@ int ObIndexChecksumValidator::check_all_table_verification_finished( ObSchemaGetterGuard schema_guard; SMART_VARS_2((ObArray, table_schemas), (ObArray, table_ids)) { - if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard(tenant_id_, schema_guard))) { + if (stop) { + ret = OB_CANCELED; + LOG_WARN("already stop", KR(ret), K_(tenant_id)); + } else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard(tenant_id_, schema_guard))) { LOG_WARN("fail to get tenant schema guard", KR(ret), K_(tenant_id)); } else if (OB_FAIL(schema_guard.get_table_schemas_in_tenant(tenant_id_, table_schemas))) { LOG_WARN("fail to get tenant table schemas", KR(ret), K_(tenant_id)); } else { table_count = table_schemas.count(); - for (int64_t i = 0; (i < table_schemas.count()) && OB_SUCC(ret); ++i) { + for (int64_t i = 0; (i < table_schemas.count()) && OB_SUCC(ret) && !stop; ++i) { const ObSimpleTableSchemaV2 *simple_schema = table_schemas.at(i); if (OB_ISNULL(simple_schema)) { ret = OB_ERR_UNEXPECTED; @@ -423,6 +432,7 @@ int ObIndexChecksumValidator::check_all_table_verification_finished( } #endif // both tables' all tablets finished compaction, we should validate column checksum. + FREEZE_TIME_GUARD; if (OB_FAIL(ObTabletReplicaChecksumOperator::check_column_checksum(tenant_id_, *data_table_schema, *table_schema, frozen_scn, *sql_proxy_, expected_epoch))) { if (OB_CHECKSUM_ERROR == ret) { @@ -488,7 +498,7 @@ int ObIndexChecksumValidator::check_all_table_verification_finished( if (OB_SUCC(ret) && (OB_SUCCESS == check_ret)) { ObArray removed_table_ids; // record the table_id which will be removed hash::ObHashMap::iterator iter = table_compaction_map.begin(); - for (;OB_SUCC(ret) && (iter != table_compaction_map.end()); ++iter) { + for (;!stop && OB_SUCC(ret) && (iter != table_compaction_map.end()); ++iter) { const uint64_t cur_table_id = iter->first; if (exist_in_table_array(cur_table_id, table_ids)) { const ObTableCompactionInfo &compaction_info = iter->second; @@ -518,7 +528,6 @@ int ObIndexChecksumValidator::check_all_table_verification_finished( if (check_ret == OB_CHECKSUM_ERROR) { ret = check_ret; } - return ret; } @@ -591,6 +600,7 @@ int ObIndexChecksumValidator::handle_table_compaction_finished( } else { if (table_schema->has_tablet()) { SMART_VAR(ObArray, pairs) { + FREEZE_TIME_GUARD; if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(tenant_id_, *table_schema, *sql_proxy_, pairs))) { LOG_WARN("fail to get tablet_ls pairs", KR(ret), K_(tenant_id), K(table_id)); } else if (pairs.count() < 1) { @@ -649,6 +659,7 @@ int ObIndexChecksumValidator::check_table_compaction_finished( SMART_VAR(ObArray, tablet_ids) { SMART_VAR(ObArray, pairs) { if (table_schema.has_tablet()) { + FREEZE_TIME_GUARD; if (OB_FAIL(table_schema.get_tablet_ids(tablet_ids))) { LOG_WARN("fail to get tablet_ids from table schema", KR(ret), K(table_schema)); } else if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(tenant_id_, table_id, @@ -689,6 +700,7 @@ int ObIndexChecksumValidator::check_table_compaction_finished( } // if current table 'has tablet' & 'finished compaction' & 'not skip verifying', verify tablet replica checksum if (OB_SUCC(ret) && latest_compaction_info.is_compacted()) { + FREEZE_TIME_GUARD; if (OB_FAIL(ObTabletReplicaChecksumOperator::check_tablet_replica_checksum(tenant_id_, pairs, frozen_scn, *sql_proxy_))) { if (OB_CHECKSUM_ERROR == ret) { diff --git a/src/rootserver/freeze/ob_checksum_validator.h b/src/rootserver/freeze/ob_checksum_validator.h index faa2a42e20..4d871f2944 100644 --- a/src/rootserver/freeze/ob_checksum_validator.h +++ b/src/rootserver/freeze/ob_checksum_validator.h @@ -104,7 +104,8 @@ public: virtual ~ObIndexChecksumValidator() {} public: - int validate_checksum(const share::SCN &frozen_scn, + int validate_checksum(const volatile bool &stop, + const share::SCN &frozen_scn, const hash::ObHashMap &tablet_compaction_map, int64_t &table_count, hash::ObHashMap &table_compaction_map, @@ -113,7 +114,8 @@ public: private: // valid '' pair should finish index column checksum verification, other tables just skip verification. - int check_all_table_verification_finished(const share::SCN &frozen_scn, + int check_all_table_verification_finished(const volatile bool &stop, + const share::SCN &frozen_scn, const hash::ObHashMap &tablet_compaction_map, int64_t &table_count, hash::ObHashMap &table_compaction_map, diff --git a/src/rootserver/freeze/ob_major_freeze_service.cpp b/src/rootserver/freeze/ob_major_freeze_service.cpp index 26af501cde..e49beaaa22 100644 --- a/src/rootserver/freeze/ob_major_freeze_service.cpp +++ b/src/rootserver/freeze/ob_major_freeze_service.cpp @@ -243,35 +243,44 @@ int ObMajorFreezeService::check_inner_stat() void ObMajorFreezeService::stop() { + LOG_INFO("major_freeze_service start to stop", K_(tenant_id)); ObRecursiveMutexGuard guard(lock_); SpinRLockGuard r_guard(rw_lock_); if (OB_NOT_NULL(tenant_major_freeze_)) { + LOG_INFO("tenant_major_freeze_ start to stop", K_(tenant_id)); tenant_major_freeze_->stop(); } + LOG_INFO("major_freeze_service finish to stop", K_(tenant_id)); } void ObMajorFreezeService::wait() { + LOG_INFO("major_freeze_service start to wait", K_(tenant_id)); ObRecursiveMutexGuard guard(lock_); SpinRLockGuard r_guard(rw_lock_); int ret = OB_SUCCESS; if (OB_NOT_NULL(tenant_major_freeze_)) { + LOG_INFO("tenant_major_freeze_ start to wait", K_(tenant_id)); if (OB_FAIL(tenant_major_freeze_->wait())) { LOG_WARN("fail to wait", KR(ret), K_(tenant_id)); } } + LOG_INFO("major_freeze_service finish to wait", K_(tenant_id)); } void ObMajorFreezeService::destroy() { + LOG_INFO("major_freeze_service start to destroy", K_(tenant_id)); ObRecursiveMutexGuard guard(lock_); SpinRLockGuard r_guard(rw_lock_); int ret = OB_SUCCESS; if (OB_NOT_NULL(tenant_major_freeze_)) { + LOG_INFO("tenant_major_freeze_ start to destroy", K_(tenant_id)); if (OB_FAIL(tenant_major_freeze_->destroy())) { LOG_WARN("fail to destroy", KR(ret), K_(tenant_id)); } } + LOG_INFO("major_freeze_service finish to destroy", K_(tenant_id)); } int ObMajorFreezeService::mtl_init(ObMajorFreezeService *&service) diff --git a/src/rootserver/freeze/ob_major_freeze_util.cpp b/src/rootserver/freeze/ob_major_freeze_util.cpp new file mode 100644 index 0000000000..95169095c5 --- /dev/null +++ b/src/rootserver/freeze/ob_major_freeze_util.cpp @@ -0,0 +1,57 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX RS + +#include "rootserver/freeze/ob_major_freeze_util.h" + +#include "lib/time/ob_time_utility.h" +#include "share/ob_define.h" + +namespace oceanbase +{ +namespace rootserver +{ + +ObFreezeTimeGuard::ObFreezeTimeGuard( + const char *file, + const int64_t line, + const char *func, + const char *mod) + : warn_threshold_us_(FREEZE_WARN_THRESHOLD_US), + start_time_us_(common::ObTimeUtility::fast_current_time()), + file_(file), + line_(line), + func_name_(func), + log_mod_(mod) +{ +} + +ObFreezeTimeGuard::~ObFreezeTimeGuard() +{ + int64_t now_us = common::ObTimeUtility::fast_current_time(); + int64_t total_cost_us = now_us - start_time_us_; + if (OB_UNLIKELY(total_cost_us >= warn_threshold_us_)) { + constexpr int buffer_size = 256; + char strbuffer[buffer_size] = { 0 }; + int n = snprintf(strbuffer, buffer_size, "cost too much time: %s (%s:%ld), ", + func_name_, strrchr(file_, '/') ? (strrchr(file_, '/') + 1) : file_, line_); + if (n >= buffer_size) { + snprintf(&strbuffer[buffer_size - 6], 6, "..., "); + } + common::OB_PRINT(log_mod_, OB_LOG_LEVEL_DIRECT(WARN), strbuffer, + LOG_KVS(K_(warn_threshold_us), K(total_cost_us), K_(start_time_us), K(now_us))); + } +} + +} // end namespace rootserver +} // end namespace oceanbase diff --git a/src/rootserver/freeze/ob_major_freeze_util.h b/src/rootserver/freeze/ob_major_freeze_util.h new file mode 100644 index 0000000000..cf8d6baa1b --- /dev/null +++ b/src/rootserver/freeze/ob_major_freeze_util.h @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_ROOTSERVER_FREEZE_OB_MAJOR_FREEZE_UTIL_H_ +#define OCEANBASE_ROOTSERVER_FREEZE_OB_MAJOR_FREEZE_UTIL_H_ + +#include + +namespace oceanbase +{ +namespace rootserver +{ + +#define FREEZE_TIME_GUARD \ + rootserver::ObFreezeTimeGuard freeze_time_guard(__FILE__, __LINE__, __FUNCTION__, "[RS] ") + +class ObFreezeTimeGuard +{ +public: + ObFreezeTimeGuard(const char *file, + const int64_t line, + const char *func, + const char *mod); + virtual ~ObFreezeTimeGuard(); + +private: + const int64_t FREEZE_WARN_THRESHOLD_US = 10 * 1000L * 1000L; // 10s + +private: + const int64_t warn_threshold_us_; + const int64_t start_time_us_; + const char * const file_; + const int64_t line_; + const char * const func_name_; + const char * const log_mod_; +}; + +} // end namespace rootserver +} // end namespace oceanbase + +#endif // OCEANBASE_ROOTSERVER_FREEZE_OB_MAJOR_FREEZE_UTIL_H_ diff --git a/src/rootserver/freeze/ob_major_merge_progress_checker.cpp b/src/rootserver/freeze/ob_major_merge_progress_checker.cpp index df52ce9cdf..c6b0c1003f 100644 --- a/src/rootserver/freeze/ob_major_merge_progress_checker.cpp +++ b/src/rootserver/freeze/ob_major_merge_progress_checker.cpp @@ -14,6 +14,7 @@ #include "rootserver/freeze/ob_major_merge_progress_checker.h" #include "rootserver/freeze/ob_zone_merge_manager.h" +#include "rootserver/freeze/ob_major_freeze_util.h" #include "share/schema/ob_schema_getter_guard.h" #include "share/tablet/ob_tablet_table_operator.h" #include "share/tablet/ob_tablet_table_iterator.h" @@ -181,8 +182,17 @@ int ObMajorMergeProgressChecker::check_merge_progress( LOG_WARN("fail to generate tablet table map", K_(tenant_id), KR(ret)); } else { ObTabletInfo tablet_info; - while (!stop && OB_SUCC(ret) && OB_SUCC(iter.next(tablet_info))) { - if (!tablet_info.is_valid()) { + while (!stop && OB_SUCC(ret)) { + { + FREEZE_TIME_GUARD; + if (OB_FAIL(iter.next(tablet_info))) { + if (OB_ITER_END != ret) { + LOG_WARN("fail to get next tablet_info", KR(ret), K_(tenant_id), K(stop)); + } + } + } + if (OB_FAIL(ret)) { + } else if (!tablet_info.is_valid()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("iterate invalid tablet info", KR(ret), K(tablet_info)); } else if (OB_FAIL(check_tablet(tablet_info, tablet_map, all_progress, global_broadcast_scn, @@ -258,10 +268,14 @@ int ObMajorMergeProgressChecker::check_tablet( ObLSInfo ls_info; int64_t cluster_id = GCONF.cluster_id; const ObLSID &ls_id = tablet_info.get_ls_id(); - if (OB_FAIL(lst_operator_->get(cluster_id, tenant_id_, - ls_id, share::ObLSTable::DEFAULT_MODE, ls_info))) { - LOG_WARN("fail to get ls info", KR(ret), K_(tenant_id), K(ls_id)); - } else if (OB_FAIL(check_majority_integrated(schema_guard, tablet_info, ls_info))) { + { + FREEZE_TIME_GUARD; + if (OB_FAIL(lst_operator_->get(cluster_id, tenant_id_, + ls_id, share::ObLSTable::DEFAULT_MODE, ls_info))) { + LOG_WARN("fail to get ls info", KR(ret), K_(tenant_id), K(ls_id)); + } + } + if (FAILEDx(check_majority_integrated(schema_guard, tablet_info, ls_info))) { LOG_WARN("fail to check majority integrated", KR(ret), K(tablet_info), K(ls_info)); } else if (OB_FAIL(check_tablet_compaction_scn(all_progress, global_broadcast_scn, tablet_info, ls_info))) { LOG_WARN("fail to check data version", KR(ret), K(tablet_info), K(ls_info)); @@ -458,13 +472,18 @@ int ObMajorMergeProgressChecker::get_associated_replica_num( } int ObMajorMergeProgressChecker::check_verification( + const volatile bool &stop, const share::SCN &global_broadcast_scn, const int64_t expected_epoch) { int ret = OB_SUCCESS; - if (!tablet_compaction_map_.empty()) { - if (index_validator_.need_validate() && OB_FAIL(index_validator_.validate_checksum(global_broadcast_scn, - tablet_compaction_map_, table_count_, table_compaction_map_, expected_epoch))) { + if (stop) { + ret = OB_CANCELED; + LOG_WARN("already stop", KR(ret), K_(tenant_id)); + } else if (!tablet_compaction_map_.empty()) { + if (index_validator_.need_validate() && OB_FAIL(index_validator_.validate_checksum(stop, + global_broadcast_scn, tablet_compaction_map_, table_count_, table_compaction_map_, + expected_epoch))) { LOG_WARN("fail to validate checksum of index validator", KR(ret), K(global_broadcast_scn), K(expected_epoch)); } diff --git a/src/rootserver/freeze/ob_major_merge_progress_checker.h b/src/rootserver/freeze/ob_major_merge_progress_checker.h index 27b69d8c02..109aa880cf 100644 --- a/src/rootserver/freeze/ob_major_merge_progress_checker.h +++ b/src/rootserver/freeze/ob_major_merge_progress_checker.h @@ -62,7 +62,8 @@ public: const share::SCN &global_broadcast_scn, share::ObAllZoneMergeProgress &all_progress); - int check_verification(const share::SCN &global_broadcast_scn, + int check_verification(const volatile bool &stop, + const share::SCN &global_broadcast_scn, const int64_t expected_epoch); // @exist_uncompacted means not all table finished compaction diff --git a/src/rootserver/freeze/ob_major_merge_scheduler.cpp b/src/rootserver/freeze/ob_major_merge_scheduler.cpp index 06d1156362..55a37df08a 100644 --- a/src/rootserver/freeze/ob_major_merge_scheduler.cpp +++ b/src/rootserver/freeze/ob_major_merge_scheduler.cpp @@ -20,11 +20,13 @@ #include "rootserver/freeze/ob_freeze_info_manager.h" #include "rootserver/ob_rs_event_history_table_operator.h" #include "rootserver/freeze/ob_tenant_all_zone_merge_strategy.h" +#include "rootserver/freeze/ob_major_freeze_util.h" #include "lib/container/ob_array.h" #include "lib/container/ob_se_array.h" #include "lib/container/ob_array_iterator.h" #include "lib/container/ob_se_array_iterator.h" #include "lib/allocator/page_arena.h" +#include "lib/profile/ob_trace_id.h" #include "share/ob_errno.h" #include "share/config/ob_server_config.h" #include "share/tablet/ob_tablet_table_iterator.h" @@ -257,6 +259,7 @@ int ObMajorMergeScheduler::do_before_major_merge(const int64_t expected_epoch) int ret = OB_SUCCESS; share::SCN global_broadcast_scn; global_broadcast_scn.set_min(); + FREEZE_TIME_GUARD; if (OB_FAIL(progress_checker_.prepare_handle())) { LOG_WARN("fail to do prepare handle of progress checker", KR(ret)); @@ -279,6 +282,7 @@ int ObMajorMergeScheduler::do_one_round_major_merge(const int64_t expected_epoch // loop until 'this round major merge finished' or 'epoch changed' while (!stop_ && !is_paused()) { update_last_run_timestamp(); + ObCurTraceId::init(GCONF.self_addr_); ObZoneArray to_merge_zone; // Place is_last_merge_complete() to the head of this while loop. // So as to break this loop at once, when the last merge is complete. @@ -372,7 +376,7 @@ int ObMajorMergeScheduler::schedule_zones_to_merge( // set zone merging flag if (OB_SUCC(ret)) { HEAP_VAR(ObZoneMergeInfo, tmp_info) { - FOREACH_CNT_X(zone, to_merge, OB_SUCCESS == ret) { + FOREACH_CNT_X(zone, to_merge, (OB_SUCCESS == ret) && !stop_) { tmp_info.reset(); tmp_info.tenant_id_ = tenant_id_; tmp_info.zone_ = *zone; @@ -411,7 +415,7 @@ int ObMajorMergeScheduler::start_zones_merge(const ObZoneArray &to_merge, const } else if (OB_FAIL(zone_merge_mgr_->get_global_broadcast_scn(global_broadcast_scn))) { LOG_WARN("fail to get_global_broadcast_scn", KR(ret)); } else { - for (int64_t i = 0; OB_SUCC(ret) && (i < to_merge.count()); ++i) { + for (int64_t i = 0; !stop_ && OB_SUCC(ret) && (i < to_merge.count()); ++i) { HEAP_VAR(ObZoneMergeInfo, tmp_info) { tmp_info.tenant_id_ = tenant_id_; tmp_info.zone_ = to_merge.at(i); @@ -458,7 +462,7 @@ int ObMajorMergeScheduler::update_merge_status(const int64_t expected_epoch) "global_broadcast_scn", global_broadcast_scn.get_val_for_inner_table_field(), "service_addr", GCONF.self_addr_); } - } else if (OB_FAIL(progress_checker_.check_verification(global_broadcast_scn, expected_epoch))) { + } else if (OB_FAIL(progress_checker_.check_verification(stop_, global_broadcast_scn, expected_epoch))) { LOG_WARN("fail to check verification", KR(ret), K_(tenant_id), K(global_broadcast_scn)); int64_t time_interval = 10L * 60 * 1000 * 1000; // record every 10 minutes if (TC_REACH_TIME_INTERVAL(time_interval)) { @@ -496,7 +500,7 @@ int ObMajorMergeScheduler::handle_all_zone_merge( } else { // 1. check all zone finished compaction HEAP_VAR(ObZoneMergeInfo, info) { - FOREACH_X(progress, all_progress, OB_SUCC(ret)) { + FOREACH_X(progress, all_progress, OB_SUCC(ret) && !stop_) { const ObZone &zone = progress->zone_; bool merged = false; info.reset(); @@ -703,6 +707,7 @@ int ObMajorMergeScheduler::try_update_epoch_and_reload() } LOG_WARN("fail to update freeze_service_epoch", KR(ret), K(ori_epoch), K(latest_epoch)); } else { + FREEZE_TIME_GUARD; if (OB_FAIL(set_epoch(latest_epoch))) { LOG_WARN("fail to set epoch", KR(ret), K(latest_epoch)); } else if (OB_FAIL(zone_merge_mgr_->reload())) { @@ -735,6 +740,7 @@ int ObMajorMergeScheduler::do_update_freeze_service_epoch( { int ret = OB_SUCCESS; int64_t affected_rows = 0; + FREEZE_TIME_GUARD; if (OB_FAIL(ObServiceEpochProxy::update_service_epoch(*sql_proxy_, tenant_id_, ObServiceEpochProxy::FREEZE_SERVICE_EPOCH, latest_epoch, affected_rows))) { LOG_WARN("fail to update freeze_service_epoch", KR(ret), K(latest_epoch), K_(tenant_id)); @@ -750,6 +756,7 @@ int ObMajorMergeScheduler::update_all_tablets_report_scn( const uint64_t global_braodcast_scn_val) { int ret = OB_SUCCESS; + FREEZE_TIME_GUARD; if (OB_FAIL(ObTabletMetaTableCompactionOperator::batch_update_report_scn( tenant_id_, global_braodcast_scn_val, @@ -793,10 +800,13 @@ void ObMajorMergeScheduler::check_merge_interval_time(const bool is_merging) ObServerTableOperator st_operator; if (OB_FAIL(st_operator.init(sql_proxy_))) { LOG_WARN("fail to init server table operator", K(ret), K_(tenant_id)); - } else if (OB_FAIL(st_operator.get_start_service_time(GCONF.self_addr_, start_service_time))) { - LOG_WARN("fail to get start service time", KR(ret), K_(tenant_id)); } else { - total_service_time = now - start_service_time; + FREEZE_TIME_GUARD; + if (OB_FAIL(st_operator.get_start_service_time(GCONF.self_addr_, start_service_time))) { + LOG_WARN("fail to get start service time", KR(ret), K_(tenant_id)); + } else { + total_service_time = now - start_service_time; + } } } // In order to avoid LOG_ERROR when the tenant miss daily merge due to the cluster restarted. diff --git a/src/rootserver/freeze/ob_tenant_major_freeze.cpp b/src/rootserver/freeze/ob_tenant_major_freeze.cpp index be70f3e352..7e4f87a315 100644 --- a/src/rootserver/freeze/ob_tenant_major_freeze.cpp +++ b/src/rootserver/freeze/ob_tenant_major_freeze.cpp @@ -85,16 +85,22 @@ int ObTenantMajorFreeze::start() void ObTenantMajorFreeze::stop() { + LOG_INFO("daily_launcher start to stop", K_(tenant_id)); daily_launcher_.stop(); + LOG_INFO("freeze_info_detector start to stop", K_(tenant_id)); freeze_info_detector_.stop(); + LOG_INFO("merge_scheduler start to stop", K_(tenant_id)); merge_scheduler_.stop(); } int ObTenantMajorFreeze::wait() { int ret = OB_SUCCESS; + LOG_INFO("daily_launcher start to wait", K_(tenant_id)); daily_launcher_.wait(); + LOG_INFO("freeze_info_detector start to wait", K_(tenant_id)); freeze_info_detector_.wait(); + LOG_INFO("merge_scheduler start to wait", K_(tenant_id)); merge_scheduler_.wait(); return ret; } @@ -102,13 +108,22 @@ int ObTenantMajorFreeze::wait() int ObTenantMajorFreeze::destroy() { int ret = OB_SUCCESS; + LOG_INFO("daily_launcher start to destroy", K_(tenant_id)); if (OB_FAIL(daily_launcher_.destroy())) { LOG_WARN("fail to destroy daily_launcher", KR(ret), K_(tenant_id)); - } else if (OB_FAIL(freeze_info_detector_.destroy())) { - LOG_WARN("fail to destroy freeze_info_detector", KR(ret), K_(tenant_id)); - } else if (OB_FAIL(merge_scheduler_.destroy())) { - LOG_WARN("fail to destroy merge_scheduler", KR(ret), K_(tenant_id)); - } + } + if (OB_SUCC(ret)) { + LOG_INFO("freeze_info_detector start to destroy", K_(tenant_id)); + if (OB_FAIL(freeze_info_detector_.destroy())) { + LOG_WARN("fail to destroy freeze_info_detector", KR(ret), K_(tenant_id)); + } + } + if (OB_SUCC(ret)) { + LOG_INFO("merge_scheduler start to destroy", K_(tenant_id)); + if (OB_FAIL(merge_scheduler_.destroy())) { + LOG_WARN("fail to destroy merge_scheduler", KR(ret), K_(tenant_id)); + } + } return ret; } diff --git a/src/rootserver/freeze/ob_zone_merge_manager.cpp b/src/rootserver/freeze/ob_zone_merge_manager.cpp index 5d7329282c..900232095c 100644 --- a/src/rootserver/freeze/ob_zone_merge_manager.cpp +++ b/src/rootserver/freeze/ob_zone_merge_manager.cpp @@ -23,6 +23,7 @@ #include "lib/mysqlclient/ob_mysql_proxy.h" #include "lib/utility/ob_macro_utils.h" #include "rootserver/ob_rs_event_history_table_operator.h" +#include "rootserver/freeze/ob_major_freeze_util.h" namespace oceanbase { @@ -310,6 +311,7 @@ int ObZoneMergeManagerBase::start_zone_merge( ObMySQLTransaction trans; const int64_t cur_time = ObTimeUtility::current_time(); const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_); + FREEZE_TIME_GUARD; if (OB_FAIL(check_valid(zone, idx))) { LOG_WARN("fail to check valid", KR(ret), K(zone), K_(tenant_id)); @@ -344,6 +346,7 @@ int ObZoneMergeManagerBase::start_zone_merge( tmp_info.broadcast_scn_.set_scn(global_merge_info_.global_broadcast_scn(), need_update); tmp_info.frozen_scn_.set_scn(global_merge_info_.frozen_scn(), need_update); + FREEZE_TIME_GUARD; if (OB_FAIL(ObZoneMergeTableOperator::update_partial_zone_merge_info(trans, tenant_id_, tmp_info))) { LOG_WARN("fail to update partial zone merge info", KR(ret), K_(tenant_id), K(tmp_info)); } @@ -373,6 +376,7 @@ int ObZoneMergeManagerBase::finish_zone_merge( ObMySQLTransaction trans; const int64_t cur_time = ObTimeUtility::current_time(); const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_); + FREEZE_TIME_GUARD; if (OB_FAIL(check_valid(zone, idx))) { LOG_WARN("fail to check valid", KR(ret), K(zone), K_(tenant_id)); @@ -408,6 +412,7 @@ int ObZoneMergeManagerBase::finish_zone_merge( tmp_info.all_merged_scn_.set_scn(new_all_merged_scn, true); } + FREEZE_TIME_GUARD; if (OB_FAIL(ObZoneMergeTableOperator::update_partial_zone_merge_info(trans, tenant_id_, tmp_info))) { LOG_WARN("fail to update partial zone merge info", KR(ret), K_(tenant_id), K(tmp_info)); } @@ -466,6 +471,7 @@ int ObZoneMergeManagerBase::set_merge_error(const int64_t error_type, const int6 is_merge_error = 0; } + FREEZE_TIME_GUARD; if (OB_FAIL(check_inner_stat())) { LOG_WARN("fail to check inner stat", KR(ret), K_(tenant_id)); } else if (OB_FAIL(trans.start(proxy_, meta_tenant_id))) { @@ -480,6 +486,7 @@ int ObZoneMergeManagerBase::set_merge_error(const int64_t error_type, const int6 tmp_global_info.is_merge_error_.set_val(is_merge_error, true); tmp_global_info.error_type_.set_val(error_type, true); + FREEZE_TIME_GUARD; if (OB_FAIL(ObGlobalMergeTableOperator::update_partial_global_merge_info(trans, tenant_id_, tmp_global_info))) { LOG_WARN("fail to update partial global merge info", KR(ret), K(tmp_global_info)); @@ -512,6 +519,7 @@ int ObZoneMergeManagerBase::set_zone_merging( int64_t idx = OB_INVALID_INDEX; ObMySQLTransaction trans; const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_); + FREEZE_TIME_GUARD; if (OB_FAIL(check_valid(zone, idx))) { LOG_WARN("fail to check valid", KR(ret), K(zone), K_(tenant_id)); } else if (OB_FAIL(trans.start(proxy_, meta_tenant_id))) { @@ -526,6 +534,7 @@ int ObZoneMergeManagerBase::set_zone_merging( } else if (is_merging != zone_merge_infos_[idx].is_merging_.get_value()) { tmp_info.is_merging_.set_val(is_merging, true); + FREEZE_TIME_GUARD; if (OB_FAIL(ObZoneMergeTableOperator::update_partial_zone_merge_info(trans, tenant_id_, tmp_info))) { LOG_WARN("fail to update partial zone merge info", KR(ret), K_(tenant_id), K(tmp_info)); } @@ -668,6 +677,7 @@ int ObZoneMergeManagerBase::generate_next_global_broadcast_scn( SCN &next_scn) { int ret = OB_SUCCESS; + FREEZE_TIME_GUARD; ObMySQLTransaction trans; const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_); if (OB_FAIL(check_inner_stat())) { @@ -713,6 +723,7 @@ int ObZoneMergeManagerBase::generate_next_global_broadcast_scn( LOG_INFO("next global_broadcast_scn", K_(tenant_id), K(next_scn), K(tmp_global_info)); tmp_global_info.merge_status_.set_val(ObZoneMergeInfo::MERGE_STATUS_MERGING, true); + FREEZE_TIME_GUARD; if (OB_FAIL(ObGlobalMergeTableOperator::update_partial_global_merge_info(trans, tenant_id_, tmp_global_info))) { LOG_WARN("fail to update partial global merge info", KR(ret), K(tmp_global_info)); @@ -761,6 +772,7 @@ int ObZoneMergeManagerBase::try_update_global_last_merged_scn(const int64_t expe } if (OB_SUCC(ret) && need_update) { + FREEZE_TIME_GUARD; if (OB_FAIL(trans.start(proxy_, meta_tenant_id))) { LOG_WARN("fail to start transaction", KR(ret), K_(tenant_id), K(meta_tenant_id)); } else if (OB_FAIL(check_freeze_service_epoch(trans, expected_epoch))) { @@ -776,6 +788,7 @@ int ObZoneMergeManagerBase::try_update_global_last_merged_scn(const int64_t expe tmp_global_info.last_merged_scn_.set_scn(global_merge_info_.global_broadcast_scn(), true); tmp_global_info.merge_status_.set_val(ObZoneMergeInfo::MERGE_STATUS_IDLE, true); + FREEZE_TIME_GUARD; if (OB_FAIL(ObGlobalMergeTableOperator::update_partial_global_merge_info(trans, tenant_id_, tmp_global_info))) { LOG_WARN("fail to update partial global merge info", KR(ret), K(tmp_global_info)); diff --git a/src/sql/engine/px/ob_px_worker.cpp b/src/sql/engine/px/ob_px_worker.cpp index 5815d94f28..9cec2c06ef 100644 --- a/src/sql/engine/px/ob_px_worker.cpp +++ b/src/sql/engine/px/ob_px_worker.cpp @@ -471,3 +471,27 @@ ObPxLocalWorkerFactory::~ObPxLocalWorkerFactory() { destroy(); } + + + +////////////////////////////////////////////////////////////////////////////// +////////////////////////////////////////////////////////////////////////////// +////////////////////////////////////////////////////////////////////////////// +int ObPxWorker::check_status() +{ + int ret = OB_SUCCESS; + if (nullptr != session_) { + session_->is_terminate(ret); + } + + if (OB_SUCC(ret)) { + if (is_timeout()) { + ret = OB_TIMEOUT; + } else if (IS_INTERRUPTED()) { + ObInterruptCode &ic = GET_INTERRUPT_CODE(); + ret = ic.code_; + LOG_WARN("px execution was interrupted", K(ic), K(ret)); + } + } + return ret; +} diff --git a/src/sql/engine/px/ob_px_worker.h b/src/sql/engine/px/ob_px_worker.h index abe86b2564..f252467d40 100644 --- a/src/sql/engine/px/ob_px_worker.h +++ b/src/sql/engine/px/ob_px_worker.h @@ -226,6 +226,12 @@ public: } }; +class ObPxWorker : public lib::Worker +{ +public: + virtual int check_status() override; +}; + } } #endif /* __OB_SQL_ENGINE_PX_WORKER_RUNNABLE_H__ */ diff --git a/src/storage/access/ob_index_tree_prefetcher.cpp b/src/storage/access/ob_index_tree_prefetcher.cpp index 161ed548b4..01db056a26 100644 --- a/src/storage/access/ob_index_tree_prefetcher.cpp +++ b/src/storage/access/ob_index_tree_prefetcher.cpp @@ -539,7 +539,6 @@ int ObIndexTreeMultiPrefetcher::multi_prefetch() int64_t tenant_id = MTL_ID(); ObMicroIndexInfo &cur_index_info = read_handle.index_block_info_; ObMicroBlockDataHandle &next_handle = read_handle.get_read_handle(); - ObMicroBlockDataHandle tmp_handle; if (OB_UNLIKELY(!cur_index_info.is_valid() || nullptr == read_handle.micro_handle_ || &next_handle == read_handle.micro_handle_ || @@ -550,7 +549,7 @@ int ObIndexTreeMultiPrefetcher::multi_prefetch() tenant_id, cur_index_info, cur_index_info.is_data_block(), - tmp_handle))) { + next_handle))) { //not in cache yet, stop this rowkey prefetching if it's not the rowkey to be feteched ret = OB_SUCCESS; if (is_rowkey_to_fetched) { @@ -560,13 +559,9 @@ int ObIndexTreeMultiPrefetcher::multi_prefetch() } else { stop_prefetch = true; } - } else { - // already in block cache, get block data - read_handle.micro_handle_->reset(); - *read_handle.micro_handle_ = tmp_handle; - if (OB_FAIL(read_handle.micro_handle_->get_cached_index_block_data(*index_read_info_, index_block_))) { - LOG_WARN("Fail to get cached index block data", K(ret), KPC(read_handle.micro_handle_)); - } + } else if (FALSE_IT(read_handle.set_cur_micro_handle(next_handle))) { + } else if (OB_FAIL(read_handle.micro_handle_->get_cached_index_block_data(*index_read_info_, index_block_))) { + LOG_WARN("Fail to get cached index block data", K(ret), KPC(read_handle.micro_handle_)); } if (OB_SUCC(ret) && !stop_prefetch) { if (OB_FAIL(drill_down(cur_index_info.get_macro_id(), read_handle, cur_index_info.is_leaf_block(), is_rowkey_to_fetched))) { diff --git a/src/storage/access/ob_index_tree_prefetcher.h b/src/storage/access/ob_index_tree_prefetcher.h index 86632e6020..9c104c5f3f 100644 --- a/src/storage/access/ob_index_tree_prefetcher.h +++ b/src/storage/access/ob_index_tree_prefetcher.h @@ -221,13 +221,13 @@ public: cur_prefetch_end_ = false; index_block_info_.reset(); micro_handle_idx_ = 0; - for (int64_t i = 0; i < DEFAULT_GET_MICRO_DATA_HANDLE_CNT; ++i) { + for (int64_t i = 0; i < DEFAULT_MULTIGET_MICRO_DATA_HANDLE_CNT; ++i) { micro_handles_[i].reset(); } } OB_INLINE ObMicroBlockDataHandle& get_read_handle() { - return micro_handles_[micro_handle_idx_ % DEFAULT_GET_MICRO_DATA_HANDLE_CNT]; + return micro_handles_[micro_handle_idx_ % DEFAULT_MULTIGET_MICRO_DATA_HANDLE_CNT]; } OB_INLINE void set_cur_micro_handle(ObMicroBlockDataHandle &handle) { @@ -236,11 +236,13 @@ public: } INHERIT_TO_STRING_KV("ObSSTableReadHandle", ObSSTableReadHandle, KPC_(rowkey), K_(cur_level), K_(cur_prefetch_end), K_(index_block_info), K_(micro_handle_idx), K_(micro_handles)); + // TODO(yht146439) change to 2 + static const int64_t DEFAULT_MULTIGET_MICRO_DATA_HANDLE_CNT = 3; int16_t cur_level_; bool cur_prefetch_end_; ObMicroIndexInfo index_block_info_; int64_t micro_handle_idx_; - ObMicroBlockDataHandle micro_handles_[DEFAULT_GET_MICRO_DATA_HANDLE_CNT]; + ObMicroBlockDataHandle micro_handles_[DEFAULT_MULTIGET_MICRO_DATA_HANDLE_CNT]; }; typedef ObReallocatedFixedArray ReadHandleExtArray; ObIndexTreeMultiPrefetcher() :