diff --git a/src/logservice/ob_garbage_collector.cpp b/src/logservice/ob_garbage_collector.cpp index 29b236e10f..72a850e718 100644 --- a/src/logservice/ob_garbage_collector.cpp +++ b/src/logservice/ob_garbage_collector.cpp @@ -1158,6 +1158,7 @@ void ObGCHandler::handle_gc_ls_offline_(ObGarbageCollector::LSStatus &ls_status) } else if (is_ls_offline_gc_state(gc_state)) { (void)try_check_and_set_wait_gc_(ls_status); } else { + DEBUG_SYNC(LS_GC_BEFORE_SUBMIT_OFFLINE_LOG); if (OB_FAIL(submit_log_(ObGCLSLOGType::OFFLINE_LS, is_success))) { CLOG_LOG(WARN, "failed to submit OFFLINE_LS log", K(ls_id), K(gc_state)); } else if (is_success) { diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index eb90be5c5f..892f9d74fd 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -3029,6 +3029,7 @@ int ObService::get_ls_replayed_scn( ObLSService *ls_svr = MTL(ObLSService*); ObLSHandle ls_handle; ObLS *ls = nullptr; + share::SCN offline_scn; if (OB_ISNULL(ls_svr)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("pointer is null", KR(ret), KP(ls_svr)); @@ -3044,10 +3045,14 @@ int ObService::get_ls_replayed_scn( LOG_WARN("failed to get all replica readable scn", KR(ret)); } } - if (FAILEDx(result.init(arg.get_tenant_id(), arg.get_ls_id(), cur_readable_scn, GCTX.self_addr()))) { - LOG_WARN("failed to init res", KR(ret), K(arg), K(cur_readable_scn)); + if (FAILEDx(ls->get_offline_scn(offline_scn))) { + LOG_WARN("failed to get offline scn", KR(ret), K(arg), KPC(ls)); + } else if (OB_FAIL(result.init(arg.get_tenant_id(), arg.get_ls_id(), + cur_readable_scn, offline_scn, get_self_addr()))) { + LOG_WARN("failed to init res", KR(ret), K(arg), K(cur_readable_scn), K(offline_scn)); } - LOG_INFO("finish get_ls_replayed_scn", KR(ret), K(cur_readable_scn), K(arg), K(result)); + LOG_INFO("finish get_ls_replayed_scn", KR(ret), K(cur_readable_scn), K(arg), K(result), + K(offline_scn)); } return ret; diff --git a/src/rootserver/ob_balance_task_execute_service.cpp b/src/rootserver/ob_balance_task_execute_service.cpp index 325d759ecf..6a0a92ce07 100755 --- a/src/rootserver/ob_balance_task_execute_service.cpp +++ b/src/rootserver/ob_balance_task_execute_service.cpp @@ -13,6 +13,7 @@ #define USING_LOG_PREFIX BALANCE #include "ob_balance_task_execute_service.h" #include "lib/mysqlclient/ob_mysql_transaction.h"//trans +#include "lib/utility/ob_tracepoint.h" // ERRSIM_POINT_DEF #include "share/schema/ob_schema_struct.h"//ObTenantInfo #include "share/schema/ob_multi_version_schema_service.h"//ObMultiVersionSchemaService #include "share/schema/ob_part_mgr_util.h"//ObPartitionSchemaIter @@ -23,6 +24,7 @@ #include "share/ls/ob_ls_operator.h"//ls_op #include "share/ls/ob_ls_status_operator.h"//status_op #include "share/ls/ob_ls_table_operator.h"//lst_operator->get +#include "share/rpc/ob_async_rpc_proxy.h"//wait_all #include "rootserver/ob_tenant_transfer_service.h"//transfer #include "rootserver/balance/ob_ls_all_part_builder.h" // ObLSAllPartBuilder #include "rootserver/ob_root_utils.h"//get_rs_default_timeout_ctx @@ -265,8 +267,8 @@ int ObBalanceTaskExecuteService::process_current_task_status_( } else { if (task.get_task_status().is_init()) { DEBUG_SYNC(BEFORE_PROCESS_BALANCE_TASK_INIT); - if (OB_FAIL(process_init_task_(task, trans))) { - LOG_WARN("failed to init trans", KR(ret)); + if (OB_FAIL(process_init_task_(task, trans, skip_next_status))) { + LOG_WARN("failed to init trans", KR(ret), K(task)); } } else if (task.get_task_status().is_create_ls()) { DEBUG_SYNC(BEFORE_PROCESS_BALANCE_TASK_CREATE_LS); @@ -544,9 +546,11 @@ int ObBalanceTaskExecuteService::cancel_other_init_task_( } int ObBalanceTaskExecuteService::process_init_task_(const ObBalanceTask &task, - ObMySQLTransaction &trans) + ObMySQLTransaction &trans, + bool &skip_next_status) { int ret = OB_SUCCESS; + skip_next_status = false; ObLSAttrOperator ls_op(tenant_id_, sql_proxy_); if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; @@ -559,11 +563,18 @@ int ObBalanceTaskExecuteService::process_init_task_(const ObBalanceTask &task, share::ObLSAttr ls_info; share::ObLSFlag flag; SCN create_scn; - if (OB_FAIL(ObLSAttrOperator::get_tenant_gts(tenant_id_, create_scn))) { - LOG_WARN("failed to get tenant gts", KR(ret), K(tenant_id_)); + if (OB_FAIL(wait_can_create_new_ls_(create_scn))) { + LOG_WARN("failed to wait create new ls", KR(ret), K(tenant_id_)); + if (OB_NEED_WAIT == ret && !task_comment_.empty()) { + //为了保证task_comment可以更新到表里,先重置错误码 + //但是要跳过日志流的创建,设置skip_next_status等于true + ret = OB_SUCCESS; + skip_next_status = true; + } } else if (OB_FAIL(ls_info.init(task.get_dest_ls_id(), task.get_ls_group_id(), flag, share::OB_LS_CREATING, share::OB_LS_OP_CREATE_PRE, create_scn))) { - LOG_WARN("failed to init new operation", KR(ret), K(create_scn), K(task)); + LOG_WARN("failed to init new operation", KR(ret), K(create_scn), K(task), + K(skip_next_status), K(task_comment_)); //TODO msy164651 } else if (OB_FAIL(ls_op.insert_ls(ls_info, share::NORMAL_SWITCHOVER_STATUS, &trans))) { LOG_WARN("failed to insert new operation", KR(ret), K(ls_info)); @@ -856,6 +867,200 @@ int ObBalanceTaskExecuteService::set_ls_to_dropping_(const ObLSID &ls_id, ObMySQ return ret; } + +//在一个merge_ls在执行的时候,都保证日志流一定会推到wait_offline状态。 +//这里的目的不是检查现有的资源是否足够创建日志流,只是为了保证 +//这一轮job新创建的日志流的create_scn一定会大于上一轮job产生的wait_offline日志流的offline_scn。 +ERRSIM_POINT_DEF(EN_SET_MAX_OFFLINE_SCN); +int ObBalanceTaskExecuteService::wait_can_create_new_ls_(share::SCN &create_scn) +{ + int ret = OB_SUCCESS; + create_scn.reset(); + share::SCN offline_scn; + int64_t offline_ls_count = 0; + uint64_t cluster_version = GET_MIN_CLUSTER_VERSION(); + + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (cluster_version < CLUSTER_VERSION_4_2_2_0) { + //版本号没有推到4220版本,获取不到offline_scn + if (OB_FAIL(ObLSAttrOperator::get_tenant_gts(tenant_id_, create_scn))) { + LOG_WARN("failed to get tenant gts", KR(ret), K(tenant_id_)); + } + } else if (OB_FAIL(get_max_offline_scn_(offline_scn, offline_ls_count))) { + LOG_WARN("failed to get max offline scn", KR(ret)); + } else if (0 == offline_ls_count) { + if (OB_FAIL(ObLSAttrOperator::get_tenant_gts(tenant_id_, create_scn))) { + LOG_WARN("failed to get tenant gts", KR(ret), K(tenant_id_)); + } + } else if (OB_UNLIKELY(!offline_scn.is_valid() || 0 > offline_ls_count)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("offline scn is invalid", KR(ret), K(offline_scn), K(offline_ls_count)); + } else { + const int64_t start_time = ObTimeUtility::fast_current_time(); + const int64_t TIMEOUT = GCONF.rpc_timeout; + //for test + if (EN_SET_MAX_OFFLINE_SCN) { + LOG_INFO("set offline scn to max", K(offline_scn)); + offline_scn.set_max(); + } + do { + if (ObTimeUtility::fast_current_time() - start_time > TIMEOUT) { + ret = OB_NEED_WAIT; + LOG_WARN("stmt is timeout", KR(ret), K(start_time), K(TIMEOUT), + K(create_scn), K(offline_scn)); + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(task_comment_.assign( + "Wait timed out for GTS to exceed max offline scn for all LS"))) { + LOG_WARN("failed to assign task comment", KR(tmp_ret), K(offline_scn)); + } + } else if (OB_FAIL(ObLSAttrOperator::get_tenant_gts(tenant_id_, create_scn))) { + LOG_WARN("failed to get tenant gts", KR(ret), K(tenant_id_)); + } else if (create_scn > offline_scn) { + ret = OB_SUCCESS; + } else { + ret = OB_EAGAIN; + LOG_WARN("create scn is smaller than offline scn, need wait", KR(ret), + K(create_scn), K(offline_scn), K(offline_ls_count)); + // waiting 100ms + ob_usleep(100L * 1000L); + } + } while (OB_EAGAIN == ret); + } + return ret; +} + +int ObBalanceTaskExecuteService::get_max_offline_scn_(share::SCN &offline_scn, int64_t &offline_ls_count) +{ + int ret = OB_SUCCESS; + offline_scn.reset(); + offline_ls_count = 0; + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(GCTX.srv_rpc_proxy_) + || OB_ISNULL(GCTX.location_service_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), KP(sql_proxy_), KP(GCTX.srv_rpc_proxy_), + KP(GCTX.location_service_)); + } else { + ObGetLSReplayedScnProxy proxy( + *GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::get_ls_replayed_scn); + ObArray return_code_array; + if (OB_FAIL(get_ls_offline_scn_by_rpc_(proxy, offline_ls_count, return_code_array))) { + LOG_WARN("failed to get ls offline scn", KR(ret)); + } else if (0 == offline_ls_count) { + //nothing todo + } else if (return_code_array.count() != offline_ls_count) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("offline count not equal to return code array", KR(ret), + K(offline_ls_count), K(return_code_array)); + } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) { + LOG_WARN("fail to check return cnt", KR(ret), + "return_cnt", return_code_array.count()); + } else { + offline_scn.set_min(); + for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); ++i) { + if (OB_FAIL(return_code_array.at(i))) { + LOG_WARN("send rpc is failed", KR(ret), K(i)); + } else { + const obrpc::ObGetLSReplayedScnRes *result = proxy.get_results().at(i); + if (OB_ISNULL(result)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("result is null", KR(ret), K(i)); + } else if (!result->get_offline_scn().is_valid()) { + ret = OB_NEED_WAIT; + LOG_WARN("offline scn is invalid", KR(ret), KPC(result)); + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(task_comment_.assign_fmt("Wait for LS %ld to write offline log", + result->get_ls_id().id()))) { + LOG_WARN("failed to assign task comment", KR(tmp_ret), KPC(result)); + } + } else if (result->get_offline_scn() > offline_scn) { + offline_scn = result->get_offline_scn(); + LOG_INFO("get offline scn", K(offline_scn), KPC(result)); + } + } + }//end for + } + } + + return ret; +} +int ObBalanceTaskExecuteService::get_ls_offline_scn_by_rpc_( + ObGetLSReplayedScnProxy &proxy, + int64_t &offline_ls_count, + ObIArray &return_code_array) +{ + int ret = OB_SUCCESS; + offline_ls_count = 0; + ObArray status_info_array; + ObLSStatusOperator ls_status_op; + int tmp_ret = OB_SUCCESS; + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(GCTX.srv_rpc_proxy_) + || OB_ISNULL(GCTX.location_service_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), KP(sql_proxy_), KP(GCTX.srv_rpc_proxy_), + KP(GCTX.location_service_)); + } else if (OB_FAIL(ls_status_op.get_all_ls_status_by_order(tenant_id_, + status_info_array, *sql_proxy_))) { + LOG_WARN("failed to get ls status info array", KR(ret), K(tenant_id_)); + } else { + obrpc::ObGetLSReplayedScnArg arg; + ObAddr leader; + ObTimeoutCtx ctx; + if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) { + LOG_WARN("fail to set timeout ctx", KR(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < status_info_array.count(); ++i) { + ObLSStatusInfo &info = status_info_array.at(i); + if (info.ls_is_dropping()) { + //存在dropping状态的日志流,不确定是不是上一轮负载均衡任务的残留 + //先等dropping变成wait_offline吧 + ret = OB_NEED_WAIT; + LOG_WARN("has dropping ls, need wait", K(ret), K(info)); + if (OB_TMP_FAIL(task_comment_.assign_fmt("Wait for LS %ld in DROPPING status to become OFFLINE", + info.ls_id_.id()))) { + LOG_WARN("failed to assign task comment", KR(tmp_ret), K(info)); + } + } else if (!info.ls_is_wait_offline()) { + //负载均衡过程中不会存在tenant_dropping的状态,其他状态不考虑 + } else { + offline_ls_count++; + const int64_t timeout = ctx.get_timeout(); + if (OB_FAIL(arg.init(tenant_id_, info.ls_id_, false))) { + LOG_WARN("failed to init arg", KR(ret), K(arg)); + //实际上没有必要一定是leader副本,只是去leader上比较方面,所以只要 + //获取回来就不用校验 + } else if (OB_FAIL(GCTX.location_service_->get_leader( + GCONF.cluster_id, tenant_id_, info.ls_id_, false, leader))) { + LOG_WARN("failed to get leader", KR(ret), K(tenant_id_), K(info)); + } else if (OB_FAIL(proxy.call(leader, timeout, tenant_id_, arg))) { + LOG_WARN("failed to send rpc", KR(ret), K(leader), K(timeout), + K(tenant_id_), K(arg)); + } + if (OB_FAIL(ret)) { + if (OB_TMP_FAIL(GCTX.location_service_->nonblock_renew( + GCONF.cluster_id, tenant_id_, info.ls_id_))) { + LOG_WARN("failed to renew location", KR(ret), KR(tmp_ret), K(tenant_id_), K(info)); + } + } + }//end else + }//end for + if (0 == offline_ls_count) { + //nothing todo + } else if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { + LOG_WARN("wait all batch result failed", KR(ret), KR(tmp_ret)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } + } + return ret; +} + } } diff --git a/src/rootserver/ob_balance_task_execute_service.h b/src/rootserver/ob_balance_task_execute_service.h index 18dc9fabaf..588e8a94dd 100644 --- a/src/rootserver/ob_balance_task_execute_service.h +++ b/src/rootserver/ob_balance_task_execute_service.h @@ -20,6 +20,7 @@ #include "share/ob_thread_mgr.h" //OBTGDefIDEnum #include "share/ob_balance_define.h" // ObBalanceJobID, ObBalanceTaskID #include "share/ls/ob_ls_i_life_manager.h"//ObLSStatus +#include "rootserver/ob_rs_async_rpc_proxy.h"//get_offline_scn namespace oceanbase { @@ -86,7 +87,8 @@ private: bool &skip_next_status); int cancel_current_task_status_(const share::ObBalanceTask &task, ObMySQLTransaction &trans, bool &skip_next_status); int cancel_other_init_task_(const share::ObBalanceTask &task, ObMySQLTransaction &trans); - int process_init_task_(const share::ObBalanceTask &task, ObMySQLTransaction &trans); + int process_init_task_(const share::ObBalanceTask &task, ObMySQLTransaction &trans, + bool &skip_next_status); int wait_ls_to_target_status_(const share::ObLSID &ls_id, const share::ObLSStatus ls_status, bool &skip_next_status); int wait_alter_ls_(const share::ObBalanceTask &task, bool &skip_next_status); int set_ls_to_merge_(const share::ObBalanceTask &task, ObMySQLTransaction &trans); @@ -101,6 +103,11 @@ private: int wait_tenant_ready_(); int try_update_task_comment_(const share::ObBalanceTask &task, const common::ObSqlString &comment, ObISQLClient &sql_client); + int wait_can_create_new_ls_(share::SCN &create_scn); + int get_max_offline_scn_(share::SCN &offline_scn, int64_t &offline_ls_count); + int get_ls_offline_scn_by_rpc_(ObGetLSReplayedScnProxy &proxy, + int64_t &offline_ls_count, + ObIArray &return_code_array); private: bool inited_; uint64_t tenant_id_; diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index c66bc6aea1..d914b07983 100755 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -562,6 +562,7 @@ class ObString; ACT(BEFORE_RELOAD_UNIT,)\ ACT(BEFORE_PROCESS_EVENT_TASK,)\ ACT(BEFORE_CHECK_LS_TRANSFER_SCN_FOR_STANDBY,)\ + ACT(LS_GC_BEFORE_SUBMIT_OFFLINE_LOG,)\ ACT(BEFORE_GET_CONFIG_VERSION_AND_TRANSFER_SCN,)\ ACT(LS_GC_BEFORE_OFFLINE,)\ ACT(AFTER_LOCK_SNAPSHOT_MUTEX,)\ diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 6ab1fec9d6..ff8081edc4 100755 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -6488,19 +6488,19 @@ int ObGetLSReplayedScnArg::assign(const ObGetLSReplayedScnArg &other) return ret; } -OB_SERIALIZE_MEMBER(ObGetLSReplayedScnRes, tenant_id_, ls_id_, cur_readable_scn_, self_addr_); +OB_SERIALIZE_MEMBER(ObGetLSReplayedScnRes, tenant_id_, ls_id_, cur_readable_scn_, offline_scn_, self_addr_); bool ObGetLSReplayedScnRes::is_valid() const { return OB_INVALID_TENANT_ID != tenant_id_ && ls_id_.is_valid() && cur_readable_scn_.is_valid_and_not_min(); - //no need check server valid } int ObGetLSReplayedScnRes::init( const uint64_t tenant_id, const share::ObLSID &ls_id, const share::SCN &cur_readable_scn, + const share::SCN &offline_scn, const common::ObAddr &server) { int ret = OB_SUCCESS; @@ -6508,6 +6508,7 @@ int ObGetLSReplayedScnRes::init( || !ls_id.is_valid() || !cur_readable_scn.is_valid_and_not_min() || !server.is_valid())) { + //不用校验offline_scn,可能就是一个非法的 ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id), K(cur_readable_scn), K(server)); } else { @@ -6515,6 +6516,7 @@ int ObGetLSReplayedScnRes::init( ls_id_ = ls_id; cur_readable_scn_ = cur_readable_scn; self_addr_ = server; + offline_scn_ = offline_scn; } return ret; } @@ -6527,6 +6529,7 @@ int ObGetLSReplayedScnRes::assign(const ObGetLSReplayedScnRes &other) ls_id_ = other.ls_id_; cur_readable_scn_ = other.cur_readable_scn_; self_addr_ = other.self_addr_; + offline_scn_ = other.offline_scn_; } return ret; } diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 7d7399ed22..43cd18b2f1 100755 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -7622,13 +7622,14 @@ public: ObGetLSReplayedScnRes(): tenant_id_(OB_INVALID_TENANT_ID), ls_id_(), cur_readable_scn_(share::SCN::min_scn()), + offline_scn_(), self_addr_() {} ~ObGetLSReplayedScnRes() {} bool is_valid() const; int init(const uint64_t tenant_id, const share::ObLSID &ls_id, const share::SCN &cur_readable_scn, - const common::ObAddr &server); + const share::SCN &offline_scn, const common::ObAddr &server); int assign(const ObGetLSReplayedScnRes &other); - TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(cur_readable_scn)); + TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(cur_readable_scn), K_(offline_scn), K(self_addr_)); uint64_t get_tenant_id() const { return tenant_id_; @@ -7645,12 +7646,18 @@ public: { return self_addr_; } + share::SCN get_offline_scn() const + { + return offline_scn_; + } + private: DISALLOW_COPY_AND_ASSIGN(ObGetLSReplayedScnRes); private: uint64_t tenant_id_; share::ObLSID ls_id_; share::SCN cur_readable_scn_; + share::SCN offline_scn_;//add in 4.2.2.0 common::ObAddr self_addr_;//add in 4.3.0 };