From a4bf0af7c905aced489970c8cdacfa9c3100150a Mon Sep 17 00:00:00 2001 From: obdev Date: Tue, 4 Jul 2023 03:12:23 +0000 Subject: [PATCH] wait readonly request done for some time before offline ls --- src/observer/mysql/ob_query_retry_ctrl.cpp | 2 +- src/rootserver/ob_ls_service_helper.cpp | 34 +++++--- src/sql/das/ob_das_ref.cpp | 98 +++++++--------------- src/sql/das/ob_das_ref.h | 11 ++- src/storage/ls/ob_ls.cpp | 22 +++-- src/storage/ls/ob_ls.h | 4 +- src/storage/ls/ob_ls_tx_service.cpp | 59 +++++++++++-- src/storage/ls/ob_ls_tx_service.h | 1 + src/storage/ob_i_store.cpp | 1 + src/storage/ob_i_store.h | 5 +- src/storage/tx/ob_trans_ctx_mgr_v4.cpp | 40 ++++++++- src/storage/tx/ob_trans_ctx_mgr_v4.h | 21 ++++- 12 files changed, 194 insertions(+), 104 deletions(-) diff --git a/src/observer/mysql/ob_query_retry_ctrl.cpp b/src/observer/mysql/ob_query_retry_ctrl.cpp index 072e00a48..3c04615e1 100644 --- a/src/observer/mysql/ob_query_retry_ctrl.cpp +++ b/src/observer/mysql/ob_query_retry_ctrl.cpp @@ -957,7 +957,7 @@ int ObQueryRetryCtrl::init() // Just use location_error_proc to retry sql and a new schema guard will be obtained during the retry process. ERR_RETRY_FUNC("LOCATION", OB_TABLET_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_not_exist_retry_proc); ERR_RETRY_FUNC("LOCATION", OB_LS_LOCATION_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); - ERR_RETRY_FUNC("LOCATION", OB_PARTITION_IS_BLOCKED, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_nothing_readable_proc); + ERR_RETRY_FUNC("LOCATION", OB_PARTITION_IS_BLOCKED, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); ERR_RETRY_FUNC("LOCATION", OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST, location_error_proc,inner_location_error_proc, ObDASRetryCtrl::tablet_not_exist_retry_proc); ERR_RETRY_FUNC("LOCATION", OB_GET_LOCATION_TIME_OUT, location_error_proc, inner_table_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc); diff --git a/src/rootserver/ob_ls_service_helper.cpp b/src/rootserver/ob_ls_service_helper.cpp index 245d7f04a..87e355ef7 100755 --- a/src/rootserver/ob_ls_service_helper.cpp +++ b/src/rootserver/ob_ls_service_helper.cpp @@ -233,19 +233,31 @@ int ObLSServiceHelper::construct_ls_status_machine( share::ObLSStatusInfoArray status_info_array; share::ObLSAttrArray ls_array; share::ObLSAttrOperator ls_operator(tenant_id, sql_proxy); + ObLSStatusMachineParameter status_machine; + ObLSStatusOperator status_op; if (OB_ISNULL(sql_proxy) || !is_valid_tenant_id(tenant_id)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("sql proxy is null or tenant id is invalid", KR(ret), KP(sql_proxy), K(tenant_id)); + } else if (OB_FAIL(status_op.get_all_ls_status_by_order( + tenant_id, status_info_array, *sql_proxy))) { + LOG_WARN("failed to get all ls status by order", KR(ret)); + } else if (1 == status_info_array.count() && status_info_array.at(0).ls_is_tenant_dropping() + && status_info_array.at(0).ls_id_.is_sys_ls()) { + //Due to SYS_LS being tenant_dropping, it cannot be read, + //so it cannot be read during the construction of the state machine of __all_ls, + //it needs to be mocked the content of the ls table + ObLSAttr ls_info; + const share::ObLSStatusInfo &status_info = status_info_array.at(0); + if (OB_FAIL(ls_info.init(SYS_LS, status_info.ls_group_id_, status_info.flag_, + status_info.status_, OB_LS_OP_TENANT_DROP, SCN::base_scn()))) { + LOG_WARN("failed to mock ls info", KR(ret), K(status_info)); + } else if (OB_FAIL(status_machine.init(SYS_LS, status_info, ls_info))) { + LOG_WARN("failed to init status machine", KR(ret), K(ls_id), K(status_info), K(ls_info)); + } else if (OB_FAIL(status_machine_array.push_back(status_machine))) { + LOG_WARN("failed to push back status machine", KR(ret), K(status_machine)); + } } else if (OB_FAIL(ls_operator.get_all_ls_by_order(lock_sys_ls, ls_array))) { LOG_WARN("failed to get get all ls", KR(ret), K(lock_sys_ls)); - } else { - ObLSStatusOperator status_op; - if (OB_FAIL(status_op.get_all_ls_status_by_order( - tenant_id, status_info_array, *sql_proxy))) { - LOG_WARN("failed to get all ls status by order", KR(ret)); - } - } - if (OB_FAIL(ret)) { } else if (OB_UNLIKELY(0 == ls_array.count())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ls array can not be empty", KR(ret), K(ls_array), K(status_info_array)); @@ -253,7 +265,6 @@ int ObLSServiceHelper::construct_ls_status_machine( // merge by sort int64_t status_index = 0; int64_t ls_index = 0; - ObLSStatusMachineParameter status_machine; const int64_t status_count = status_info_array.count(); const int64_t ls_count = ls_array.count(); share::ObLSStatusInfo status_info; @@ -532,8 +543,9 @@ int ObLSServiceHelper::offline_ls(const uint64_t tenant_id, if (ls_id.is_sys_ls()) { // For SYS LS, drop scn can not be generated, as GTS service is down, SYS LS is blocked - // drop_scn is meaningless for SYS LS, so set it to base_scn - drop_scn.set_base(); + // drop_scn is meaningless for SYS LS, so set it to min_scn + // base_scn is dafault value, if set default, __all_ls_recovery_stat affected_row is zero + drop_scn.set_min(); } // For user LS, drop_scn should be GTS. else if (OB_FAIL(ObLSAttrOperator::get_tenant_gts(tenant_id, drop_scn))) { diff --git a/src/sql/das/ob_das_ref.cpp b/src/sql/das/ob_das_ref.cpp index d598a3503..5237a0574 100644 --- a/src/sql/das/ob_das_ref.cpp +++ b/src/sql/das/ob_das_ref.cpp @@ -20,10 +20,13 @@ #include "storage/tx/ob_trans_service.h" #include "sql/engine/ob_exec_context.h" #include "sql/das/ob_das_rpc_processor.h" +#include "sql/das/ob_das_retry_ctrl.h" +#include "observer/mysql/ob_query_retry_ctrl.h" namespace oceanbase { using namespace common; +using namespace observer; namespace sql { bool DasRefKey::operator==(const DasRefKey &other) const @@ -269,23 +272,14 @@ int ObDASRef::execute_all_task() } } } - if (OB_FAIL(ret)) { - if (check_rcode_can_retry(ret, batched_tasks_.get_last_node()->get_obj()->get_ref_table_id())) { - ret = OB_SUCCESS; // we ignore current error code, since all error code should be checked whether can be retried. - } else { - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(wait_executing_tasks())) { - LOG_WARN("failed to wait all tasks", K(ret)); - } - break; - } - } // wait all existing tasks to be finished - if (OB_SUCC(ret) && OB_FAIL(wait_executing_tasks())) { - LOG_WARN("failed to process all async remote tasks", K(ret)); - if (check_rcode_can_retry(ret, batched_tasks_.get_last_node()->get_obj()->get_ref_table_id())) { - ret = OB_SUCCESS; - } + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(wait_executing_tasks())) { + LOG_WARN("failed to process all async remote tasks", KR(ret)); + } + ret = COVER_SUCC(tmp_ret); + if (check_rcode_can_retry(ret)) { + ret = OB_SUCCESS; } if (OB_SUCC(ret)) { // check das task status. @@ -293,23 +287,21 @@ int ObDASRef::execute_all_task() ObDasAggregatedTasks* aggregated_task = curr->get_obj(); if (aggregated_task->has_unstart_tasks()) { if (aggregated_task->has_failed_tasks()) { - if (OB_FAIL(aggregated_task->failed_tasks_can_retry())) { - LOG_WARN("failed das aggreagted task cannot retry.", K(ret)); + // retry all failed tasks. + common::ObSEArray failed_tasks; + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(aggregated_task->get_failed_tasks(failed_tasks))) { + LOG_WARN("failed to get failed tasks", K(ret)); + } else if (failed_tasks.count() == 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get failed tasks"); } else { - // retry all failed tasks. - common::ObSEArray failed_tasks; - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(aggregated_task->get_failed_tasks(failed_tasks))) { - LOG_WARN("failed to get failed tasks", K(ret)); - } else if (failed_tasks.count() == 0) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("failed to get failed tasks"); - } else { - for (int i = 0; OB_SUCC(ret) && i < failed_tasks.count(); i++) { - if (OB_FAIL(MTL(ObDataAccessService *)->retry_das_task(*this, *failed_tasks.at(i)))) { - LOG_WARN("Failed to retry das task", K(ret)); - break; - } + for (int i = 0; OB_SUCC(ret) && i < failed_tasks.count(); i++) { + ObIDASTaskOp *failed_task = failed_tasks.at(i); + if (!GCONF._enable_partition_level_retry || !failed_task->can_part_retry()) { + ret = failed_task->errcode_; + } else if (OB_FAIL(MTL(ObDataAccessService *)->retry_das_task(*this, *failed_tasks.at(i)))) { + LOG_WARN("Failed to retry das task", K(ret)); } } } @@ -337,15 +329,16 @@ int ObDASRef::execute_all_task() return ret; } -bool ObDASRef::check_rcode_can_retry(int ret, int64_t ref_table_id) +bool ObDASRef::check_rcode_can_retry(int ret) { bool bret = false; - if ((is_master_changed_error(ret) || - is_partition_change_error(ret) || - OB_REPLICA_NOT_READABLE == ret) && - GCONF._enable_partition_level_retry && - !is_virtual_table(ref_table_id)) { - bret = true; // we ignore current error code, since all error code should be checked whether can be retried. + ObDASRetryCtrl::retry_func retry_func = nullptr; + + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(ObQueryRetryCtrl::get_das_retry_func(ret, retry_func))) { + LOG_WARN("get das retry func failed", KR(tmp_ret), KR(ret)); + } else if (retry_func != nullptr) { + bret = true; } return bret; } @@ -864,33 +857,6 @@ int ObDasAggregatedTasks::get_failed_tasks(common::ObSEArray return ret; } -int ObDasAggregatedTasks::failed_tasks_can_retry() const -{ - int ret = OB_SUCCESS; - bool can_retry = true; - if (!GCONF._enable_partition_level_retry) { - can_retry = false; - } - ObIDASTaskOp *cur_task = nullptr; - DLIST_FOREACH_X(curr, failed_tasks_, can_retry) { - cur_task = curr->get_data(); - ret = cur_task->get_errcode(); - if ((is_master_changed_error(ret) || - is_partition_change_error(ret) || - OB_REPLICA_NOT_READABLE == ret) && - cur_task->can_part_retry()) { - } else { - can_retry = false; - } - } -#if !defined(NDEBUG) - if (can_retry) { - OB_ASSERT(OB_SUCCESS != ret);; - } -#endif - return can_retry ? OB_SUCCESS : ret; // return the error code that can't be retried. -}; - bool ObDasAggregatedTasks::has_unstart_tasks() const { return high_priority_tasks_.get_size() != 0 || diff --git a/src/sql/das/ob_das_ref.h b/src/sql/das/ob_das_ref.h index 43ff98710..45858271c 100644 --- a/src/sql/das/ob_das_ref.h +++ b/src/sql/das/ob_das_ref.h @@ -55,12 +55,15 @@ struct ObDasAggregatedTasks */ int get_aggregated_tasks(common::ObSEArray, 2> &task_groups, int64_t count); int get_failed_tasks(common::ObSEArray &tasks); - bool has_failed_tasks() const { return failed_tasks_.get_size() > 0; }; - int failed_tasks_can_retry() const; + bool has_failed_tasks() const { return failed_tasks_.get_size() > 0; } bool has_unstart_tasks() const; bool has_unstart_high_priority_tasks() const; int32_t get_unstart_task_size() const; - TO_STRING_KV(K_(server), K(high_priority_tasks_.get_size()), K(tasks_.get_size()), K(failed_tasks_.get_size()), K(success_tasks_.get_size())); + TO_STRING_KV(K_(server), + K(high_priority_tasks_.get_size()), + K(tasks_.get_size()), + K(failed_tasks_.get_size()), + K(success_tasks_.get_size())); common::ObAddr server_; DasTaskLinkedList high_priority_tasks_; DasTaskLinkedList tasks_; @@ -154,7 +157,7 @@ private: int move_local_tasks_to_last(); int wait_executing_tasks(); int process_remote_task_resp(); - bool check_rcode_can_retry(int ret, int64_t ref_table_id); + bool check_rcode_can_retry(int ret); private: typedef common::ObObjNode DasOpNode; //declare das allocator diff --git a/src/storage/ls/ob_ls.cpp b/src/storage/ls/ob_ls.cpp index 62128c075..bf94980e2 100755 --- a/src/storage/ls/ob_ls.cpp +++ b/src/storage/ls/ob_ls.cpp @@ -694,6 +694,7 @@ void ObLS::destroy() // TODO: (yanyuan.cxf) destroy all the sub module. int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; + int64_t start_ts = ObTimeUtility::current_time(); if (tenant_id_ != OB_INVALID_TENANT_ID) { if (tenant_id_ != MTL_ID()) { LOG_ERROR("ls destroy happen in wrong tenant ctx", K(tenant_id_), K(MTL_ID())); @@ -702,7 +703,7 @@ void ObLS::destroy() } transaction::ObTransService *txs_svr = MTL(transaction::ObTransService *); FLOG_INFO("ObLS destroy", K(this), K(*this), K(lbt())); - if (OB_TMP_FAIL(offline_())) { + if (OB_TMP_FAIL(offline_(start_ts))) { LOG_WARN("ls offline failed.", K(tmp_ret), K(ls_meta_.ls_id_)); } else if (OB_TMP_FAIL(stop_())) { LOG_WARN("ls stop failed.", K(tmp_ret), K(ls_meta_.ls_id_)); @@ -844,10 +845,12 @@ void ObLS::destroy() startup_transfer_info_.reset(); } -int ObLS::offline_tx_() +int ObLS::offline_tx_(const int64_t start_ts) { int ret = OB_SUCCESS; - if (OB_FAIL(tx_table_.prepare_offline())) { + if (OB_FAIL(ls_tx_svr_.prepare_offline(start_ts))) { + LOG_WARN("prepare offline ls tx service failed", K(ret), K(ls_meta_)); + } else if (OB_FAIL(tx_table_.prepare_offline())) { LOG_WARN("tx table prepare offline failed", K(ret), K(ls_meta_)); } else if (OB_FAIL(ls_tx_svr_.offline())) { LOG_WARN("offline ls tx service failed", K(ret), K(ls_meta_)); @@ -868,7 +871,7 @@ int ObLS::offline_compaction_() return ret; } -int ObLS::offline_() +int ObLS::offline_(const int64_t start_ts) { int ret = OB_SUCCESS; // only follower can do this. @@ -895,7 +898,7 @@ int ObLS::offline_() LOG_WARN("weak read handler offline failed", K(ret), K(ls_meta_)); } else if (OB_FAIL(ls_ddl_log_handler_.offline())) { LOG_WARN("ddl log handler offline failed", K(ret), K(ls_meta_)); - } else if (OB_FAIL(offline_tx_())) { + } else if (OB_FAIL(offline_tx_(start_ts))) { LOG_WARN("offline tx service failed", K(ret), K(ls_meta_)); } else if (OB_FAIL(dup_table_ls_handler_.offline())) { LOG_WARN("offline dup table ls handler failed", K(ret), K(ls_meta_)); @@ -928,7 +931,7 @@ int ObLS::offline() { ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock); // only follower can do this. - if (OB_FAIL(offline_())) { + if (OB_FAIL(offline_(start_ts))) { LOG_WARN("ls offline failed", K(ret), K(ls_meta_)); } } @@ -952,7 +955,7 @@ int ObLS::offline_without_lock() do { retry_times++; { - if (OB_FAIL(offline_())) { + if (OB_FAIL(offline_(start_ts))) { LOG_WARN("ls offline failed", K(ret), K(ls_meta_)); } } @@ -1394,6 +1397,7 @@ int ObLS::finish_slog_replay() ObMigrationStatus new_migration_status; int64_t read_lock = 0; int64_t write_lock = LSLOCKALL - LSLOCKLOGMETA; + const int64_t start_ts = ObTimeUtility::current_time(); ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock); if (OB_FAIL(get_migration_status(current_migration_status))) { @@ -1412,11 +1416,11 @@ int ObLS::finish_slog_replay() } else if (OB_FAIL(start())) { LOG_WARN("ls can not start to work", K(ret)); } else if (ObMigrationStatus::OB_MIGRATION_STATUS_REBUILD == new_migration_status) { - if (OB_FAIL(offline_())) { + if (OB_FAIL(offline_(start_ts))) { LOG_WARN("failed to offline", K(ret), KPC(this)); } } else if (is_enable_for_restore()) { - if (OB_FAIL(offline_())) { + if (OB_FAIL(offline_(start_ts))) { LOG_WARN("failed to offline", K(ret), KPC(this)); } else if (OB_FAIL(log_handler_.enable_sync())) { LOG_WARN("failed to enable sync", K(ret), KPC(this)); diff --git a/src/storage/ls/ob_ls.h b/src/storage/ls/ob_ls.h index dedaa72a2..13ca8b969 100755 --- a/src/storage/ls/ob_ls.h +++ b/src/storage/ls/ob_ls.h @@ -326,10 +326,10 @@ private: void wait_(); int prepare_for_safe_destroy_(); int flush_if_need_(const bool need_flush); - int offline_(); + int offline_(const int64_t start_ts); int offline_compaction_(); int online_compaction_(); - int offline_tx_(); + int offline_tx_(const int64_t start_ts); int online_tx_(); public: // ObLSMeta interface: diff --git a/src/storage/ls/ob_ls_tx_service.cpp b/src/storage/ls/ob_ls_tx_service.cpp index d10d219f8..400a96241 100644 --- a/src/storage/ls/ob_ls_tx_service.cpp +++ b/src/storage/ls/ob_ls_tx_service.cpp @@ -127,12 +127,18 @@ int ObLSTxService::get_read_store_ctx(const ObTxReadSnapshot &snapshot, ObStoreCtx &store_ctx) const { int ret = OB_SUCCESS; - if (OB_ISNULL(trans_service_)) { + if (OB_ISNULL(trans_service_) || OB_ISNULL(mgr_)) { ret = OB_NOT_INIT; - TRANS_LOG(WARN, "not init", K(ret)); + TRANS_LOG(WARN, "not init", K(ret), KP(trans_service_), KP(mgr_)); + } else if (OB_FAIL(mgr_->start_readonly_request())) { + TRANS_LOG(WARN, "start readonly request failed", K(ret)); } else { store_ctx.ls_id_ = ls_id_; + store_ctx.is_read_store_ctx_ = true; ret = trans_service_->get_read_store_ctx(snapshot, read_latest, lock_timeout, store_ctx); + if (OB_FAIL(ret)) { + mgr_->end_readonly_request(); + } } return ret; } @@ -142,12 +148,18 @@ int ObLSTxService::get_read_store_ctx(const SCN &snapshot, ObStoreCtx &store_ctx) const { int ret = OB_SUCCESS; - if (OB_ISNULL(trans_service_)) { + if (OB_ISNULL(trans_service_) || OB_ISNULL(mgr_)) { ret = OB_NOT_INIT; - TRANS_LOG(WARN, "not init", K(ret)); + TRANS_LOG(WARN, "not init", K(ret), KP(trans_service_), KP(mgr_)); + } else if (OB_FAIL(mgr_->start_readonly_request())) { + TRANS_LOG(WARN, "start readonly request failed", K(ret)); } else { store_ctx.ls_id_ = ls_id_; + store_ctx.is_read_store_ctx_ = true; ret = trans_service_->get_read_store_ctx(snapshot, lock_timeout, store_ctx); + if (OB_FAIL(ret)) { + mgr_->end_readonly_request(); + } } return ret; } @@ -176,6 +188,15 @@ int ObLSTxService::revert_store_ctx(storage::ObStoreCtx &store_ctx) const } else { ret = trans_service_->revert_store_ctx(store_ctx); } + // ignore ret + if (store_ctx.is_read_store_ctx()) { + if (OB_ISNULL(mgr_)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "mgr is null", K(ret), KP(this)); + } else { + (void)mgr_->end_readonly_request(); + } + } return ret; } @@ -593,13 +614,37 @@ ObTxRetainCtxMgr *ObLSTxService::get_retain_ctx_mgr() return retain_ptr; } +int ObLSTxService::prepare_offline(const int64_t start_ts) +{ + int ret = OB_SUCCESS; + const int64_t PRINT_LOG_INTERVAL = 1000 * 1000; // 1s + const int64_t WAIT_READONLY_REQUEST_US = 60 * 1000 * 1000; + bool unused_is_all_tx_clean_up = false; + if (OB_ISNULL(mgr_)) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "not init", KR(ret), K_(ls_id)); + } else if (OB_FAIL(mgr_->block(unused_is_all_tx_clean_up))) { + TRANS_LOG(WARN, "block tx failed", K_(ls_id)); + } else if (ObTimeUtility::current_time() > start_ts + WAIT_READONLY_REQUEST_US) { + // dont care readonly request + } else { + const int64_t readonly_request_cnt = mgr_->get_total_active_readonly_request_count(); + if (readonly_request_cnt > 0) { + ret = OB_EAGAIN; + if (REACH_TIME_INTERVAL(PRINT_LOG_INTERVAL)) { + TRANS_LOG(WARN, "readonly requests are active", K(ret), KP(mgr_), K_(ls_id), K(readonly_request_cnt)); + } + } + } + TRANS_LOG(INFO, "prepare offline ls", K(ret), K(start_ts), KP(mgr_), K_(ls_id)); + return ret; +} + int ObLSTxService::offline() { int ret = OB_SUCCESS; const int64_t PRINT_LOG_INTERVAL = 1000 * 1000; // 1s - const int64_t SLEEP_US = 20000; //20ms const bool graceful = false; - const bool verbose = true; bool unused_is_all_tx_clean_up = false; if (OB_ISNULL(mgr_)) { ret = OB_NOT_INIT; @@ -611,7 +656,7 @@ int ObLSTxService::offline() } else if (mgr_->get_tx_ctx_count() > 0) { ret = OB_EAGAIN; if (REACH_TIME_INTERVAL(PRINT_LOG_INTERVAL)) { - TRANS_LOG(WARN, "transaction not empty, try again", KP(mgr_), K_(ls_id), K(mgr_->get_tx_ctx_count())); + TRANS_LOG(WARN, "transaction not empty, try again", K(ret), KP(mgr_), K_(ls_id), K(mgr_->get_tx_ctx_count())); } } return ret; diff --git a/src/storage/ls/ob_ls_tx_service.h b/src/storage/ls/ob_ls_tx_service.h index 3186228d2..af0c2977a 100644 --- a/src/storage/ls/ob_ls_tx_service.h +++ b/src/storage/ls/ob_ls_tx_service.h @@ -69,6 +69,7 @@ public: void destroy() { reset_(); } + int prepare_offline(const int64_t start_ts); int offline(); int online(); diff --git a/src/storage/ob_i_store.cpp b/src/storage/ob_i_store.cpp index 4f091721f..90f60a4ec 100644 --- a/src/storage/ob_i_store.cpp +++ b/src/storage/ob_i_store.cpp @@ -76,6 +76,7 @@ void ObStoreCtx::reset() mvcc_acc_ctx_.reset(); tablet_stat_.reset(); replay_log_scn_.set_max(); + is_read_store_ctx_ = false; } int ObStoreCtx::init_for_read(const ObLSID &ls_id, diff --git a/src/storage/ob_i_store.h b/src/storage/ob_i_store.h index 145556127..73cf5d280 100644 --- a/src/storage/ob_i_store.h +++ b/src/storage/ob_i_store.h @@ -414,6 +414,7 @@ struct ObStoreCtx bool is_read() const { return mvcc_acc_ctx_.is_read(); } bool is_write() const { return mvcc_acc_ctx_.is_write(); } bool is_replay() const { return mvcc_acc_ctx_.is_replay(); } + bool is_read_store_ctx() const { return is_read_store_ctx_; } int init_for_read(const share::ObLSID &ls_id, const int64_t timeout, const int64_t lock_timeout_us, @@ -432,7 +433,8 @@ struct ObStoreCtx K_(table_version), K_(mvcc_acc_ctx), K_(tablet_stat), - K_(replay_log_scn)); + K_(replay_log_scn), + K_(is_read_store_ctx)); share::ObLSID ls_id_; storage::ObLS *ls_; // for performance opt common::ObTabletID tablet_id_; @@ -442,6 +444,7 @@ struct ObStoreCtx memtable::ObMvccAccessCtx mvcc_acc_ctx_; // all txn relative context storage::ObTabletStat tablet_stat_; // used for collecting query statistics share::SCN replay_log_scn_; // used in replay pass log_ts + bool is_read_store_ctx_; }; diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp index b3ded9852..e04557b68 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp @@ -217,6 +217,7 @@ void ObLSTxCtxMgr::reset() lock_table_ = NULL; total_tx_ctx_count_ = 0; active_tx_count_ = 0; + total_active_readonly_request_count_ = 0; leader_takeover_ts_.reset(); max_replay_commit_version_.reset(); aggre_rec_scn_.reset(); @@ -233,10 +234,12 @@ void ObLSTxCtxMgr::reset() int ObLSTxCtxMgr::offline() { + int ret = OB_SUCCESS; aggre_rec_scn_.reset(); prev_aggre_rec_scn_.reset(); + TRANS_LOG(INFO, "offline ls", K(ret), "manager", *this); - return OB_SUCCESS; + return ret; } int ObLSTxCtxMgr::process_callback_(ObIArray &cb_array) const @@ -904,7 +907,11 @@ int ObLSTxCtxMgr::stop(const bool graceful) ObTimeGuard timeguard("ctxmgr stop"); { WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US); - if (OB_FAIL(ls_log_writer_.stop())) { + const int64_t total_active_readonly_request_count = get_total_active_readonly_request_count(); + if (!graceful && total_active_readonly_request_count > 0) { + ret = OB_EAGAIN; + TRANS_LOG(WARN, "readonly requests are active", K(ret), K(total_active_readonly_request_count)); + } else if (OB_FAIL(ls_log_writer_.stop())) { TRANS_LOG(WARN, "ls_log_writer_ stop error", KR(ret)); } else { { @@ -944,6 +951,7 @@ int ObLSTxCtxMgr::kill_all_tx(const bool graceful, bool &is_all_tx_cleaned_up) const KillTransArg arg(graceful); { WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US); + const int64_t total_active_readonly_request_count = get_total_active_readonly_request_count(); KillTxCtxFunctor fn(arg, cb_array); if (OB_FAIL(ls_retain_ctx_mgr_.force_gc_retain_ctx())) { TRANS_LOG(WARN, "force gc retain ctx mgr", K(ret)); @@ -973,6 +981,7 @@ int ObLSTxCtxMgr::block(bool &is_all_tx_cleaned_up) } else { is_all_tx_cleaned_up = (get_tx_ctx_count() == 0); } + TRANS_LOG(INFO, "block ls", K(ret), "manager", *this); return ret; } @@ -987,6 +996,7 @@ int ObLSTxCtxMgr::block_normal(bool &is_all_tx_cleaned_up) } else { is_all_tx_cleaned_up = (get_tx_ctx_count() == 0); } + TRANS_LOG(INFO, "block ls normally", K(ret), "manager", *this); return ret; } @@ -1001,6 +1011,7 @@ int ObLSTxCtxMgr::online() } else { online_ts_ = ObTimeUtility::current_time(); } + TRANS_LOG(INFO, "online ls", K(ret), "manager", *this); return ret; } @@ -1013,6 +1024,7 @@ int ObLSTxCtxMgr::unblock_normal() if (OB_FAIL(state_helper.switch_state(Ops::UNBLOCK_NORMAL))) { TRANS_LOG(WARN, "switch state error", KR(ret), "manager", *this); } + TRANS_LOG(INFO, "unblock ls normally", K(ret), "manager", *this); return ret; } @@ -1488,6 +1500,30 @@ int ObLSTxCtxMgr::dump_single_tx_data_2_text(const int64_t tx_id_int, FILE *fd) return ret; } +int ObLSTxCtxMgr::start_readonly_request() +{ + int ret = OB_SUCCESS; + RLockGuard guard(rwlock_); + + if (IS_NOT_INIT) { + TRANS_LOG(WARN, "ObLSTxCtxMgr not inited", K(this)); + ret = OB_NOT_INIT; + } else if (is_blocked_()) { + ret = OB_PARTITION_IS_BLOCKED; + // readonly read must be blocked, because trx may be killed forcely + TRANS_LOG(WARN, "logstream is blocked", K(ret)); + } else { + inc_total_active_readonly_request_count(); + } + return ret; +} + +int ObLSTxCtxMgr::end_readonly_request() +{ + dec_total_active_readonly_request_count(); + return OB_SUCCESS; +} + int ObTxCtxMgr::remove_all_ls_() { int ret = OB_SUCCESS; diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.h b/src/storage/tx/ob_trans_ctx_mgr_v4.h index 2e70ee43c..ac6ff425d 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.h +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.h @@ -327,6 +327,8 @@ public: // @param [in] tx_id // @param [in] fd int dump_single_tx_data_2_text(const int64_t tx_id, FILE *fd); + int start_readonly_request(); + int end_readonly_request(); // check this ObLSTxCtxMgr contains the specified ObLSID bool contain(const share::ObLSID &ls_id) @@ -347,6 +349,20 @@ public: // Decrease active trx count in this ls void dec_active_tx_count() { (void)ATOMIC_AAF(&active_tx_count_, -1); } + void inc_total_active_readonly_request_count() + { + const int64_t count = ATOMIC_AAF(&total_active_readonly_request_count_, 1); + } + void dec_total_active_readonly_request_count() + { + int ret = common::OB_ERR_UNEXPECTED; + const int64_t count = ATOMIC_AAF(&total_active_readonly_request_count_, -1); + if (OB_UNLIKELY(count < 0)) { + TRANS_LOG(ERROR, "unexpected total_active_readonly_request_count", KP(this), K(count)); + } + } + int64_t get_total_active_readonly_request_count() { return ATOMIC_LOAD(&total_active_readonly_request_count_); } + // Get all tx obj lock information in this ObLSTxCtxMgr // @param [out] iter: all tx obj lock op information int iterate_tx_obj_lock_op(ObLockOpIterator &iter); @@ -539,6 +555,7 @@ public: static const int64_t MAX_HASH_ITEM_PRINT = 16; static const int64_t WAIT_SW_CB_TIMEOUT = 100 * 1000; // 100 ms static const int64_t WAIT_SW_CB_INTERVAL = 10 * 1000; // 10 ms + static const int64_t WAIT_READONLY_REQUEST_TIME = 10 * 1000 * 1000; private: class State { @@ -754,7 +771,9 @@ private: mutable RWLock minor_merge_lock_; // Total TxCtx count in this ObLSTxCtxMgr - int64_t total_tx_ctx_count_; + int64_t total_tx_ctx_count_ CACHE_ALIGNED; + + int64_t total_active_readonly_request_count_ CACHE_ALIGNED; int64_t active_tx_count_;