wait readonly request done for some time before offline ls

This commit is contained in:
obdev 2023-07-04 03:12:23 +00:00 committed by ob-robot
parent e8866b5f23
commit a4bf0af7c9
12 changed files with 194 additions and 104 deletions

View File

@ -957,7 +957,7 @@ int ObQueryRetryCtrl::init()
// Just use location_error_proc to retry sql and a new schema guard will be obtained during the retry process.
ERR_RETRY_FUNC("LOCATION", OB_TABLET_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_not_exist_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_LS_LOCATION_NOT_EXIST, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_PARTITION_IS_BLOCKED, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_nothing_readable_proc);
ERR_RETRY_FUNC("LOCATION", OB_PARTITION_IS_BLOCKED, location_error_proc, inner_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST, location_error_proc,inner_location_error_proc, ObDASRetryCtrl::tablet_not_exist_retry_proc);
ERR_RETRY_FUNC("LOCATION", OB_GET_LOCATION_TIME_OUT, location_error_proc, inner_table_location_error_proc, ObDASRetryCtrl::tablet_location_retry_proc);

View File

@ -233,19 +233,31 @@ int ObLSServiceHelper::construct_ls_status_machine(
share::ObLSStatusInfoArray status_info_array;
share::ObLSAttrArray ls_array;
share::ObLSAttrOperator ls_operator(tenant_id, sql_proxy);
ObLSStatusMachineParameter status_machine;
ObLSStatusOperator status_op;
if (OB_ISNULL(sql_proxy) || !is_valid_tenant_id(tenant_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("sql proxy is null or tenant id is invalid", KR(ret), KP(sql_proxy), K(tenant_id));
} else if (OB_FAIL(status_op.get_all_ls_status_by_order(
tenant_id, status_info_array, *sql_proxy))) {
LOG_WARN("failed to get all ls status by order", KR(ret));
} else if (1 == status_info_array.count() && status_info_array.at(0).ls_is_tenant_dropping()
&& status_info_array.at(0).ls_id_.is_sys_ls()) {
//Due to SYS_LS being tenant_dropping, it cannot be read,
//so it cannot be read during the construction of the state machine of __all_ls,
//it needs to be mocked the content of the ls table
ObLSAttr ls_info;
const share::ObLSStatusInfo &status_info = status_info_array.at(0);
if (OB_FAIL(ls_info.init(SYS_LS, status_info.ls_group_id_, status_info.flag_,
status_info.status_, OB_LS_OP_TENANT_DROP, SCN::base_scn()))) {
LOG_WARN("failed to mock ls info", KR(ret), K(status_info));
} else if (OB_FAIL(status_machine.init(SYS_LS, status_info, ls_info))) {
LOG_WARN("failed to init status machine", KR(ret), K(ls_id), K(status_info), K(ls_info));
} else if (OB_FAIL(status_machine_array.push_back(status_machine))) {
LOG_WARN("failed to push back status machine", KR(ret), K(status_machine));
}
} else if (OB_FAIL(ls_operator.get_all_ls_by_order(lock_sys_ls, ls_array))) {
LOG_WARN("failed to get get all ls", KR(ret), K(lock_sys_ls));
} else {
ObLSStatusOperator status_op;
if (OB_FAIL(status_op.get_all_ls_status_by_order(
tenant_id, status_info_array, *sql_proxy))) {
LOG_WARN("failed to get all ls status by order", KR(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(0 == ls_array.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls array can not be empty", KR(ret), K(ls_array), K(status_info_array));
@ -253,7 +265,6 @@ int ObLSServiceHelper::construct_ls_status_machine(
// merge by sort
int64_t status_index = 0;
int64_t ls_index = 0;
ObLSStatusMachineParameter status_machine;
const int64_t status_count = status_info_array.count();
const int64_t ls_count = ls_array.count();
share::ObLSStatusInfo status_info;
@ -532,8 +543,9 @@ int ObLSServiceHelper::offline_ls(const uint64_t tenant_id,
if (ls_id.is_sys_ls()) {
// For SYS LS, drop scn can not be generated, as GTS service is down, SYS LS is blocked
// drop_scn is meaningless for SYS LS, so set it to base_scn
drop_scn.set_base();
// drop_scn is meaningless for SYS LS, so set it to min_scn
// base_scn is dafault value, if set default, __all_ls_recovery_stat affected_row is zero
drop_scn.set_min();
}
// For user LS, drop_scn should be GTS.
else if (OB_FAIL(ObLSAttrOperator::get_tenant_gts(tenant_id, drop_scn))) {

View File

@ -20,10 +20,13 @@
#include "storage/tx/ob_trans_service.h"
#include "sql/engine/ob_exec_context.h"
#include "sql/das/ob_das_rpc_processor.h"
#include "sql/das/ob_das_retry_ctrl.h"
#include "observer/mysql/ob_query_retry_ctrl.h"
namespace oceanbase
{
using namespace common;
using namespace observer;
namespace sql
{
bool DasRefKey::operator==(const DasRefKey &other) const
@ -269,23 +272,14 @@ int ObDASRef::execute_all_task()
}
}
}
if (OB_FAIL(ret)) {
if (check_rcode_can_retry(ret, batched_tasks_.get_last_node()->get_obj()->get_ref_table_id())) {
ret = OB_SUCCESS; // we ignore current error code, since all error code should be checked whether can be retried.
} else {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(wait_executing_tasks())) {
LOG_WARN("failed to wait all tasks", K(ret));
}
break;
}
}
// wait all existing tasks to be finished
if (OB_SUCC(ret) && OB_FAIL(wait_executing_tasks())) {
LOG_WARN("failed to process all async remote tasks", K(ret));
if (check_rcode_can_retry(ret, batched_tasks_.get_last_node()->get_obj()->get_ref_table_id())) {
ret = OB_SUCCESS;
}
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(wait_executing_tasks())) {
LOG_WARN("failed to process all async remote tasks", KR(ret));
}
ret = COVER_SUCC(tmp_ret);
if (check_rcode_can_retry(ret)) {
ret = OB_SUCCESS;
}
if (OB_SUCC(ret)) {
// check das task status.
@ -293,23 +287,21 @@ int ObDASRef::execute_all_task()
ObDasAggregatedTasks* aggregated_task = curr->get_obj();
if (aggregated_task->has_unstart_tasks()) {
if (aggregated_task->has_failed_tasks()) {
if (OB_FAIL(aggregated_task->failed_tasks_can_retry())) {
LOG_WARN("failed das aggreagted task cannot retry.", K(ret));
// retry all failed tasks.
common::ObSEArray<ObIDASTaskOp *, 2> failed_tasks;
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(aggregated_task->get_failed_tasks(failed_tasks))) {
LOG_WARN("failed to get failed tasks", K(ret));
} else if (failed_tasks.count() == 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get failed tasks");
} else {
// retry all failed tasks.
common::ObSEArray<ObIDASTaskOp *, 2> failed_tasks;
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(aggregated_task->get_failed_tasks(failed_tasks))) {
LOG_WARN("failed to get failed tasks", K(ret));
} else if (failed_tasks.count() == 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get failed tasks");
} else {
for (int i = 0; OB_SUCC(ret) && i < failed_tasks.count(); i++) {
if (OB_FAIL(MTL(ObDataAccessService *)->retry_das_task(*this, *failed_tasks.at(i)))) {
LOG_WARN("Failed to retry das task", K(ret));
break;
}
for (int i = 0; OB_SUCC(ret) && i < failed_tasks.count(); i++) {
ObIDASTaskOp *failed_task = failed_tasks.at(i);
if (!GCONF._enable_partition_level_retry || !failed_task->can_part_retry()) {
ret = failed_task->errcode_;
} else if (OB_FAIL(MTL(ObDataAccessService *)->retry_das_task(*this, *failed_tasks.at(i)))) {
LOG_WARN("Failed to retry das task", K(ret));
}
}
}
@ -337,15 +329,16 @@ int ObDASRef::execute_all_task()
return ret;
}
bool ObDASRef::check_rcode_can_retry(int ret, int64_t ref_table_id)
bool ObDASRef::check_rcode_can_retry(int ret)
{
bool bret = false;
if ((is_master_changed_error(ret) ||
is_partition_change_error(ret) ||
OB_REPLICA_NOT_READABLE == ret) &&
GCONF._enable_partition_level_retry &&
!is_virtual_table(ref_table_id)) {
bret = true; // we ignore current error code, since all error code should be checked whether can be retried.
ObDASRetryCtrl::retry_func retry_func = nullptr;
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(ObQueryRetryCtrl::get_das_retry_func(ret, retry_func))) {
LOG_WARN("get das retry func failed", KR(tmp_ret), KR(ret));
} else if (retry_func != nullptr) {
bret = true;
}
return bret;
}
@ -864,33 +857,6 @@ int ObDasAggregatedTasks::get_failed_tasks(common::ObSEArray<ObIDASTaskOp *, 2>
return ret;
}
int ObDasAggregatedTasks::failed_tasks_can_retry() const
{
int ret = OB_SUCCESS;
bool can_retry = true;
if (!GCONF._enable_partition_level_retry) {
can_retry = false;
}
ObIDASTaskOp *cur_task = nullptr;
DLIST_FOREACH_X(curr, failed_tasks_, can_retry) {
cur_task = curr->get_data();
ret = cur_task->get_errcode();
if ((is_master_changed_error(ret) ||
is_partition_change_error(ret) ||
OB_REPLICA_NOT_READABLE == ret) &&
cur_task->can_part_retry()) {
} else {
can_retry = false;
}
}
#if !defined(NDEBUG)
if (can_retry) {
OB_ASSERT(OB_SUCCESS != ret);;
}
#endif
return can_retry ? OB_SUCCESS : ret; // return the error code that can't be retried.
};
bool ObDasAggregatedTasks::has_unstart_tasks() const
{
return high_priority_tasks_.get_size() != 0 ||

View File

@ -55,12 +55,15 @@ struct ObDasAggregatedTasks
*/
int get_aggregated_tasks(common::ObSEArray<common::ObSEArray<ObIDASTaskOp *, 2>, 2> &task_groups, int64_t count);
int get_failed_tasks(common::ObSEArray<ObIDASTaskOp *, 2> &tasks);
bool has_failed_tasks() const { return failed_tasks_.get_size() > 0; };
int failed_tasks_can_retry() const;
bool has_failed_tasks() const { return failed_tasks_.get_size() > 0; }
bool has_unstart_tasks() const;
bool has_unstart_high_priority_tasks() const;
int32_t get_unstart_task_size() const;
TO_STRING_KV(K_(server), K(high_priority_tasks_.get_size()), K(tasks_.get_size()), K(failed_tasks_.get_size()), K(success_tasks_.get_size()));
TO_STRING_KV(K_(server),
K(high_priority_tasks_.get_size()),
K(tasks_.get_size()),
K(failed_tasks_.get_size()),
K(success_tasks_.get_size()));
common::ObAddr server_;
DasTaskLinkedList high_priority_tasks_;
DasTaskLinkedList tasks_;
@ -154,7 +157,7 @@ private:
int move_local_tasks_to_last();
int wait_executing_tasks();
int process_remote_task_resp();
bool check_rcode_can_retry(int ret, int64_t ref_table_id);
bool check_rcode_can_retry(int ret);
private:
typedef common::ObObjNode<ObIDASTaskOp*> DasOpNode;
//declare das allocator

View File

@ -694,6 +694,7 @@ void ObLS::destroy()
// TODO: (yanyuan.cxf) destroy all the sub module.
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
int64_t start_ts = ObTimeUtility::current_time();
if (tenant_id_ != OB_INVALID_TENANT_ID) {
if (tenant_id_ != MTL_ID()) {
LOG_ERROR("ls destroy happen in wrong tenant ctx", K(tenant_id_), K(MTL_ID()));
@ -702,7 +703,7 @@ void ObLS::destroy()
}
transaction::ObTransService *txs_svr = MTL(transaction::ObTransService *);
FLOG_INFO("ObLS destroy", K(this), K(*this), K(lbt()));
if (OB_TMP_FAIL(offline_())) {
if (OB_TMP_FAIL(offline_(start_ts))) {
LOG_WARN("ls offline failed.", K(tmp_ret), K(ls_meta_.ls_id_));
} else if (OB_TMP_FAIL(stop_())) {
LOG_WARN("ls stop failed.", K(tmp_ret), K(ls_meta_.ls_id_));
@ -844,10 +845,12 @@ void ObLS::destroy()
startup_transfer_info_.reset();
}
int ObLS::offline_tx_()
int ObLS::offline_tx_(const int64_t start_ts)
{
int ret = OB_SUCCESS;
if (OB_FAIL(tx_table_.prepare_offline())) {
if (OB_FAIL(ls_tx_svr_.prepare_offline(start_ts))) {
LOG_WARN("prepare offline ls tx service failed", K(ret), K(ls_meta_));
} else if (OB_FAIL(tx_table_.prepare_offline())) {
LOG_WARN("tx table prepare offline failed", K(ret), K(ls_meta_));
} else if (OB_FAIL(ls_tx_svr_.offline())) {
LOG_WARN("offline ls tx service failed", K(ret), K(ls_meta_));
@ -868,7 +871,7 @@ int ObLS::offline_compaction_()
return ret;
}
int ObLS::offline_()
int ObLS::offline_(const int64_t start_ts)
{
int ret = OB_SUCCESS;
// only follower can do this.
@ -895,7 +898,7 @@ int ObLS::offline_()
LOG_WARN("weak read handler offline failed", K(ret), K(ls_meta_));
} else if (OB_FAIL(ls_ddl_log_handler_.offline())) {
LOG_WARN("ddl log handler offline failed", K(ret), K(ls_meta_));
} else if (OB_FAIL(offline_tx_())) {
} else if (OB_FAIL(offline_tx_(start_ts))) {
LOG_WARN("offline tx service failed", K(ret), K(ls_meta_));
} else if (OB_FAIL(dup_table_ls_handler_.offline())) {
LOG_WARN("offline dup table ls handler failed", K(ret), K(ls_meta_));
@ -928,7 +931,7 @@ int ObLS::offline()
{
ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock);
// only follower can do this.
if (OB_FAIL(offline_())) {
if (OB_FAIL(offline_(start_ts))) {
LOG_WARN("ls offline failed", K(ret), K(ls_meta_));
}
}
@ -952,7 +955,7 @@ int ObLS::offline_without_lock()
do {
retry_times++;
{
if (OB_FAIL(offline_())) {
if (OB_FAIL(offline_(start_ts))) {
LOG_WARN("ls offline failed", K(ret), K(ls_meta_));
}
}
@ -1394,6 +1397,7 @@ int ObLS::finish_slog_replay()
ObMigrationStatus new_migration_status;
int64_t read_lock = 0;
int64_t write_lock = LSLOCKALL - LSLOCKLOGMETA;
const int64_t start_ts = ObTimeUtility::current_time();
ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock);
if (OB_FAIL(get_migration_status(current_migration_status))) {
@ -1412,11 +1416,11 @@ int ObLS::finish_slog_replay()
} else if (OB_FAIL(start())) {
LOG_WARN("ls can not start to work", K(ret));
} else if (ObMigrationStatus::OB_MIGRATION_STATUS_REBUILD == new_migration_status) {
if (OB_FAIL(offline_())) {
if (OB_FAIL(offline_(start_ts))) {
LOG_WARN("failed to offline", K(ret), KPC(this));
}
} else if (is_enable_for_restore()) {
if (OB_FAIL(offline_())) {
if (OB_FAIL(offline_(start_ts))) {
LOG_WARN("failed to offline", K(ret), KPC(this));
} else if (OB_FAIL(log_handler_.enable_sync())) {
LOG_WARN("failed to enable sync", K(ret), KPC(this));

View File

@ -326,10 +326,10 @@ private:
void wait_();
int prepare_for_safe_destroy_();
int flush_if_need_(const bool need_flush);
int offline_();
int offline_(const int64_t start_ts);
int offline_compaction_();
int online_compaction_();
int offline_tx_();
int offline_tx_(const int64_t start_ts);
int online_tx_();
public:
// ObLSMeta interface:

View File

@ -127,12 +127,18 @@ int ObLSTxService::get_read_store_ctx(const ObTxReadSnapshot &snapshot,
ObStoreCtx &store_ctx) const
{
int ret = OB_SUCCESS;
if (OB_ISNULL(trans_service_)) {
if (OB_ISNULL(trans_service_) || OB_ISNULL(mgr_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not init", K(ret));
TRANS_LOG(WARN, "not init", K(ret), KP(trans_service_), KP(mgr_));
} else if (OB_FAIL(mgr_->start_readonly_request())) {
TRANS_LOG(WARN, "start readonly request failed", K(ret));
} else {
store_ctx.ls_id_ = ls_id_;
store_ctx.is_read_store_ctx_ = true;
ret = trans_service_->get_read_store_ctx(snapshot, read_latest, lock_timeout, store_ctx);
if (OB_FAIL(ret)) {
mgr_->end_readonly_request();
}
}
return ret;
}
@ -142,12 +148,18 @@ int ObLSTxService::get_read_store_ctx(const SCN &snapshot,
ObStoreCtx &store_ctx) const
{
int ret = OB_SUCCESS;
if (OB_ISNULL(trans_service_)) {
if (OB_ISNULL(trans_service_) || OB_ISNULL(mgr_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not init", K(ret));
TRANS_LOG(WARN, "not init", K(ret), KP(trans_service_), KP(mgr_));
} else if (OB_FAIL(mgr_->start_readonly_request())) {
TRANS_LOG(WARN, "start readonly request failed", K(ret));
} else {
store_ctx.ls_id_ = ls_id_;
store_ctx.is_read_store_ctx_ = true;
ret = trans_service_->get_read_store_ctx(snapshot, lock_timeout, store_ctx);
if (OB_FAIL(ret)) {
mgr_->end_readonly_request();
}
}
return ret;
}
@ -176,6 +188,15 @@ int ObLSTxService::revert_store_ctx(storage::ObStoreCtx &store_ctx) const
} else {
ret = trans_service_->revert_store_ctx(store_ctx);
}
// ignore ret
if (store_ctx.is_read_store_ctx()) {
if (OB_ISNULL(mgr_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "mgr is null", K(ret), KP(this));
} else {
(void)mgr_->end_readonly_request();
}
}
return ret;
}
@ -593,13 +614,37 @@ ObTxRetainCtxMgr *ObLSTxService::get_retain_ctx_mgr()
return retain_ptr;
}
int ObLSTxService::prepare_offline(const int64_t start_ts)
{
int ret = OB_SUCCESS;
const int64_t PRINT_LOG_INTERVAL = 1000 * 1000; // 1s
const int64_t WAIT_READONLY_REQUEST_US = 60 * 1000 * 1000;
bool unused_is_all_tx_clean_up = false;
if (OB_ISNULL(mgr_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not init", KR(ret), K_(ls_id));
} else if (OB_FAIL(mgr_->block(unused_is_all_tx_clean_up))) {
TRANS_LOG(WARN, "block tx failed", K_(ls_id));
} else if (ObTimeUtility::current_time() > start_ts + WAIT_READONLY_REQUEST_US) {
// dont care readonly request
} else {
const int64_t readonly_request_cnt = mgr_->get_total_active_readonly_request_count();
if (readonly_request_cnt > 0) {
ret = OB_EAGAIN;
if (REACH_TIME_INTERVAL(PRINT_LOG_INTERVAL)) {
TRANS_LOG(WARN, "readonly requests are active", K(ret), KP(mgr_), K_(ls_id), K(readonly_request_cnt));
}
}
}
TRANS_LOG(INFO, "prepare offline ls", K(ret), K(start_ts), KP(mgr_), K_(ls_id));
return ret;
}
int ObLSTxService::offline()
{
int ret = OB_SUCCESS;
const int64_t PRINT_LOG_INTERVAL = 1000 * 1000; // 1s
const int64_t SLEEP_US = 20000; //20ms
const bool graceful = false;
const bool verbose = true;
bool unused_is_all_tx_clean_up = false;
if (OB_ISNULL(mgr_)) {
ret = OB_NOT_INIT;
@ -611,7 +656,7 @@ int ObLSTxService::offline()
} else if (mgr_->get_tx_ctx_count() > 0) {
ret = OB_EAGAIN;
if (REACH_TIME_INTERVAL(PRINT_LOG_INTERVAL)) {
TRANS_LOG(WARN, "transaction not empty, try again", KP(mgr_), K_(ls_id), K(mgr_->get_tx_ctx_count()));
TRANS_LOG(WARN, "transaction not empty, try again", K(ret), KP(mgr_), K_(ls_id), K(mgr_->get_tx_ctx_count()));
}
}
return ret;

View File

@ -69,6 +69,7 @@ public:
void destroy() {
reset_();
}
int prepare_offline(const int64_t start_ts);
int offline();
int online();

View File

@ -76,6 +76,7 @@ void ObStoreCtx::reset()
mvcc_acc_ctx_.reset();
tablet_stat_.reset();
replay_log_scn_.set_max();
is_read_store_ctx_ = false;
}
int ObStoreCtx::init_for_read(const ObLSID &ls_id,

View File

@ -414,6 +414,7 @@ struct ObStoreCtx
bool is_read() const { return mvcc_acc_ctx_.is_read(); }
bool is_write() const { return mvcc_acc_ctx_.is_write(); }
bool is_replay() const { return mvcc_acc_ctx_.is_replay(); }
bool is_read_store_ctx() const { return is_read_store_ctx_; }
int init_for_read(const share::ObLSID &ls_id,
const int64_t timeout,
const int64_t lock_timeout_us,
@ -432,7 +433,8 @@ struct ObStoreCtx
K_(table_version),
K_(mvcc_acc_ctx),
K_(tablet_stat),
K_(replay_log_scn));
K_(replay_log_scn),
K_(is_read_store_ctx));
share::ObLSID ls_id_;
storage::ObLS *ls_; // for performance opt
common::ObTabletID tablet_id_;
@ -442,6 +444,7 @@ struct ObStoreCtx
memtable::ObMvccAccessCtx mvcc_acc_ctx_; // all txn relative context
storage::ObTabletStat tablet_stat_; // used for collecting query statistics
share::SCN replay_log_scn_; // used in replay pass log_ts
bool is_read_store_ctx_;
};

View File

@ -217,6 +217,7 @@ void ObLSTxCtxMgr::reset()
lock_table_ = NULL;
total_tx_ctx_count_ = 0;
active_tx_count_ = 0;
total_active_readonly_request_count_ = 0;
leader_takeover_ts_.reset();
max_replay_commit_version_.reset();
aggre_rec_scn_.reset();
@ -233,10 +234,12 @@ void ObLSTxCtxMgr::reset()
int ObLSTxCtxMgr::offline()
{
int ret = OB_SUCCESS;
aggre_rec_scn_.reset();
prev_aggre_rec_scn_.reset();
TRANS_LOG(INFO, "offline ls", K(ret), "manager", *this);
return OB_SUCCESS;
return ret;
}
int ObLSTxCtxMgr::process_callback_(ObIArray<ObTxCommitCallback> &cb_array) const
@ -904,7 +907,11 @@ int ObLSTxCtxMgr::stop(const bool graceful)
ObTimeGuard timeguard("ctxmgr stop");
{
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
if (OB_FAIL(ls_log_writer_.stop())) {
const int64_t total_active_readonly_request_count = get_total_active_readonly_request_count();
if (!graceful && total_active_readonly_request_count > 0) {
ret = OB_EAGAIN;
TRANS_LOG(WARN, "readonly requests are active", K(ret), K(total_active_readonly_request_count));
} else if (OB_FAIL(ls_log_writer_.stop())) {
TRANS_LOG(WARN, "ls_log_writer_ stop error", KR(ret));
} else {
{
@ -944,6 +951,7 @@ int ObLSTxCtxMgr::kill_all_tx(const bool graceful, bool &is_all_tx_cleaned_up)
const KillTransArg arg(graceful);
{
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
const int64_t total_active_readonly_request_count = get_total_active_readonly_request_count();
KillTxCtxFunctor fn(arg, cb_array);
if (OB_FAIL(ls_retain_ctx_mgr_.force_gc_retain_ctx())) {
TRANS_LOG(WARN, "force gc retain ctx mgr", K(ret));
@ -973,6 +981,7 @@ int ObLSTxCtxMgr::block(bool &is_all_tx_cleaned_up)
} else {
is_all_tx_cleaned_up = (get_tx_ctx_count() == 0);
}
TRANS_LOG(INFO, "block ls", K(ret), "manager", *this);
return ret;
}
@ -987,6 +996,7 @@ int ObLSTxCtxMgr::block_normal(bool &is_all_tx_cleaned_up)
} else {
is_all_tx_cleaned_up = (get_tx_ctx_count() == 0);
}
TRANS_LOG(INFO, "block ls normally", K(ret), "manager", *this);
return ret;
}
@ -1001,6 +1011,7 @@ int ObLSTxCtxMgr::online()
} else {
online_ts_ = ObTimeUtility::current_time();
}
TRANS_LOG(INFO, "online ls", K(ret), "manager", *this);
return ret;
}
@ -1013,6 +1024,7 @@ int ObLSTxCtxMgr::unblock_normal()
if (OB_FAIL(state_helper.switch_state(Ops::UNBLOCK_NORMAL))) {
TRANS_LOG(WARN, "switch state error", KR(ret), "manager", *this);
}
TRANS_LOG(INFO, "unblock ls normally", K(ret), "manager", *this);
return ret;
}
@ -1488,6 +1500,30 @@ int ObLSTxCtxMgr::dump_single_tx_data_2_text(const int64_t tx_id_int, FILE *fd)
return ret;
}
int ObLSTxCtxMgr::start_readonly_request()
{
int ret = OB_SUCCESS;
RLockGuard guard(rwlock_);
if (IS_NOT_INIT) {
TRANS_LOG(WARN, "ObLSTxCtxMgr not inited", K(this));
ret = OB_NOT_INIT;
} else if (is_blocked_()) {
ret = OB_PARTITION_IS_BLOCKED;
// readonly read must be blocked, because trx may be killed forcely
TRANS_LOG(WARN, "logstream is blocked", K(ret));
} else {
inc_total_active_readonly_request_count();
}
return ret;
}
int ObLSTxCtxMgr::end_readonly_request()
{
dec_total_active_readonly_request_count();
return OB_SUCCESS;
}
int ObTxCtxMgr::remove_all_ls_()
{
int ret = OB_SUCCESS;

View File

@ -327,6 +327,8 @@ public:
// @param [in] tx_id
// @param [in] fd
int dump_single_tx_data_2_text(const int64_t tx_id, FILE *fd);
int start_readonly_request();
int end_readonly_request();
// check this ObLSTxCtxMgr contains the specified ObLSID
bool contain(const share::ObLSID &ls_id)
@ -347,6 +349,20 @@ public:
// Decrease active trx count in this ls
void dec_active_tx_count() { (void)ATOMIC_AAF(&active_tx_count_, -1); }
void inc_total_active_readonly_request_count()
{
const int64_t count = ATOMIC_AAF(&total_active_readonly_request_count_, 1);
}
void dec_total_active_readonly_request_count()
{
int ret = common::OB_ERR_UNEXPECTED;
const int64_t count = ATOMIC_AAF(&total_active_readonly_request_count_, -1);
if (OB_UNLIKELY(count < 0)) {
TRANS_LOG(ERROR, "unexpected total_active_readonly_request_count", KP(this), K(count));
}
}
int64_t get_total_active_readonly_request_count() { return ATOMIC_LOAD(&total_active_readonly_request_count_); }
// Get all tx obj lock information in this ObLSTxCtxMgr
// @param [out] iter: all tx obj lock op information
int iterate_tx_obj_lock_op(ObLockOpIterator &iter);
@ -539,6 +555,7 @@ public:
static const int64_t MAX_HASH_ITEM_PRINT = 16;
static const int64_t WAIT_SW_CB_TIMEOUT = 100 * 1000; // 100 ms
static const int64_t WAIT_SW_CB_INTERVAL = 10 * 1000; // 10 ms
static const int64_t WAIT_READONLY_REQUEST_TIME = 10 * 1000 * 1000;
private:
class State
{
@ -754,7 +771,9 @@ private:
mutable RWLock minor_merge_lock_;
// Total TxCtx count in this ObLSTxCtxMgr
int64_t total_tx_ctx_count_;
int64_t total_tx_ctx_count_ CACHE_ALIGNED;
int64_t total_active_readonly_request_count_ CACHE_ALIGNED;
int64_t active_tx_count_;