diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index a637b371c..4a4fd11ad 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -670,6 +670,7 @@ PCODE_DEF(OB_DUMP_SINGLE_TX_DATA, 0x742) PCODE_DEF(OB_GET_LS_REPLAYED_SCN, 0x743) PCODE_DEF(OB_REFRESH_TENANT_INFO, 0x744) PCODE_DEF(OB_UPDATE_TENANT_INFO_CACHE, 0x745) +PCODE_DEF(OB_BROADCAST_CONSENSUS_VERSION, 0x746) // BatchRpc PCODE_DEF(OB_BATCH, 0x750) diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index 63a16d998..04cb3072d 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -2477,5 +2477,17 @@ int ObRpcGetServerResourceInfoP::process() return ret; } +int ObBroadcastConsensusVersionP::process() +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(gctx_.ob_service_)) { + ret = OB_INVALID_ARGUMENT; + LOG_ERROR("invalid argument", K(gctx_.ob_service_), K(ret)); + } else { + ret = gctx_.ob_service_->broadcast_consensus_version(arg_, result_); + } + return ret; +} + } // end of namespace observer } // end of namespace oceanbase diff --git a/src/observer/ob_rpc_processor_simple.h b/src/observer/ob_rpc_processor_simple.h index 13528eee9..2b467db6b 100644 --- a/src/observer/ob_rpc_processor_simple.h +++ b/src/observer/ob_rpc_processor_simple.h @@ -222,6 +222,7 @@ OB_DEFINE_PROCESSOR_SM(Srv, OB_SESS_INFO_VERIFICATION, ObSessInfoVerificationP); OB_DEFINE_PROCESSOR_S(Srv, OB_SEND_HEARTBEAT, ObRpcSendHeartbeatP); OB_DEFINE_PROCESSOR_S(Srv, OB_GET_SERVER_RESOURCE_INFO, ObRpcGetServerResourceInfoP); OB_DEFINE_PROCESSOR_S(Srv, OB_UPDATE_TENANT_INFO_CACHE, ObUpdateTenantInfoCacheP); +OB_DEFINE_PROCESSOR_S(Srv, OB_BROADCAST_CONSENSUS_VERSION, ObBroadcastConsensusVersionP); } // end of namespace observer } // end of namespace oceanbase diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index 950139542..91191d15f 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -1376,6 +1376,35 @@ int ObService::switch_schema( return ret; } +int ObService::broadcast_consensus_version( + const obrpc::ObBroadcastConsensusVersionArg &arg, + obrpc::ObBroadcastConsensusVersionRes &result) +{ + int ret = OB_SUCCESS; + int64_t local_consensus_version = OB_INVALID_VERSION; + const uint64_t tenant_id = arg.get_tenant_id(); + const int64_t consensus_version = arg.get_consensus_version(); + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (OB_UNLIKELY(consensus_version <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(consensus_version)); + } else if (OB_ISNULL(gctx_.schema_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("schema_service is null", KR(ret)); + } else if (OB_FAIL(gctx_.schema_service_->get_tenant_broadcast_consensus_version(tenant_id, local_consensus_version))) { + LOG_WARN("fail to get local tenant consensus_version", KR(ret), K(tenant_id)); + } else if (OB_UNLIKELY(consensus_version < local_consensus_version)) { + ret = OB_EAGAIN; + LOG_WARN("consensus version is less than local consensus version", KR(ret), K(consensus_version), K(local_consensus_version)); + } else if (OB_FAIL(gctx_.schema_service_->set_tenant_broadcast_consensus_version(tenant_id, consensus_version))) { + LOG_WARN("failt to update received schema version", KR(ret), K(tenant_id), K(consensus_version)); + } + result.set_ret(ret); + return OB_SUCCESS; +} + int ObService::bootstrap(const obrpc::ObBootstrapArg &arg) { int ret = OB_SUCCESS; diff --git a/src/observer/ob_service.h b/src/observer/ob_service.h index 04ef52c8b..7f87de4c3 100644 --- a/src/observer/ob_service.h +++ b/src/observer/ob_service.h @@ -131,6 +131,9 @@ public: int stop_partition_write(const obrpc::Int64 &switchover_timestamp, obrpc::Int64 &result); int check_partition_log(const obrpc::Int64 &switchover_timestamp, obrpc::Int64 &result); int get_wrs_info(const obrpc::ObGetWRSArg &arg, obrpc::ObGetWRSResult &result); + int broadcast_consensus_version( + const obrpc::ObBroadcastConsensusVersionArg &arg, + obrpc::ObBroadcastConsensusVersionRes &result); //////////////////////////////////////////////////////////////// // ObRpcFetchSysLSP @RS load balance int fetch_sys_ls(share::ObLSReplica &replica); diff --git a/src/observer/ob_srv_xlator_partition.cpp b/src/observer/ob_srv_xlator_partition.cpp index ac6bd157c..84ef4c730 100644 --- a/src/observer/ob_srv_xlator_partition.cpp +++ b/src/observer/ob_srv_xlator_partition.cpp @@ -290,4 +290,5 @@ void oceanbase::observer::init_srv_xlator_for_others(ObSrvRpcXlator *xlator) { // session info verification RPC_PROCESSOR(ObSessInfoVerificationP, gctx_); + RPC_PROCESSOR(ObBroadcastConsensusVersionP, gctx_); } diff --git a/src/rootserver/ob_ddl_service.h b/src/rootserver/ob_ddl_service.h index f9d534d28..7e2ec6898 100644 --- a/src/rootserver/ob_ddl_service.h +++ b/src/rootserver/ob_ddl_service.h @@ -140,6 +140,7 @@ public: // these functions should be called after ddl_service has been inited share::schema::ObMultiVersionSchemaService &get_schema_service() { return *schema_service_; } common::ObMySQLProxy &get_sql_proxy() { return *sql_proxy_; } + ObUnitManager &get_unit_manager() { return *unit_mgr_; } ObZoneManager &get_zone_mgr() { return *zone_mgr_; } ObSnapshotInfoManager &get_snapshot_mgr() { return *snapshot_mgr_; } share::ObLSTableOperator &get_lst_operator() { return *lst_operator_; } diff --git a/src/rootserver/ob_rs_async_rpc_proxy.h b/src/rootserver/ob_rs_async_rpc_proxy.h index fe922ccf3..da2c1d334 100644 --- a/src/rootserver/ob_rs_async_rpc_proxy.h +++ b/src/rootserver/ob_rs_async_rpc_proxy.h @@ -75,7 +75,7 @@ RPC_F(obrpc::OB_GET_SERVER_RESOURCE_INFO, obrpc::ObGetServerResourceInfoArg, obr RPC_F(obrpc::OB_NOTIFY_SWITCH_LEADER, obrpc::ObNotifySwitchLeaderArg, obrpc::ObSrvRpcProxy::ObRpc::Response, ObNotifySwitchLeaderProxy); RPC_F(obrpc::OB_UPDATE_TENANT_INFO_CACHE, obrpc::ObUpdateTenantInfoCacheArg, obrpc::ObUpdateTenantInfoCacheRes, ObUpdateTenantInfoCacheProxy); - +RPC_F(obrpc::OB_BROADCAST_CONSENSUS_VERSION, obrpc::ObBroadcastConsensusVersionArg, obrpc::ObBroadcastConsensusVersionRes, ObBroadcstConsensusVersionProxy); }//end namespace rootserver }//end namespace oceanbase diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 2f18cc1de..ebc30ebe9 100644 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -8368,5 +8368,37 @@ void ObGetServerResourceInfoResult::reset() server_.reset(); resource_info_.reset(); } + +OB_SERIALIZE_MEMBER(ObBroadcastConsensusVersionArg, tenant_id_, consensus_version_); +bool ObBroadcastConsensusVersionArg::is_valid() const +{ + return OB_INVALID_TENANT_ID != tenant_id_ && OB_INVALID_VERSION != consensus_version_; +} + +int ObBroadcastConsensusVersionArg::init(const uint64_t tenant_id, const int64_t consensus_version) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id + || OB_INVALID_VERSION == consensus_version)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(consensus_version)); + } else { + tenant_id_ = tenant_id; + consensus_version_ = consensus_version; + } + return ret; +} + +int ObBroadcastConsensusVersionArg::assign(const ObBroadcastConsensusVersionArg &other) +{ + int ret = OB_SUCCESS; + if (this != &other) { + tenant_id_ = other.tenant_id_; + consensus_version_ = other.consensus_version_; + } + return ret; +} + +OB_SERIALIZE_MEMBER(ObBroadcastConsensusVersionRes, ret_); }//end namespace obrpc }//end namepsace oceanbase diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 0659cfd88..ed6b5b2ca 100644 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -9153,6 +9153,57 @@ private: common::ObAddr server_; share::ObServerResourceInfo resource_info_; }; + +struct ObBroadcastConsensusVersionArg +{ + OB_UNIS_VERSION(1); +public: + ObBroadcastConsensusVersionArg() + : tenant_id_(OB_INVALID_ID), + consensus_version_(OB_INVALID_VERSION) {} + ~ObBroadcastConsensusVersionArg() {} + int init(const uint64_t tenant_id, const int64_t consensus_version); + bool is_valid() const; + int assign(const ObBroadcastConsensusVersionArg &other); + uint64_t get_tenant_id() const + { + return tenant_id_; + } + int64_t get_consensus_version() const + { + return consensus_version_; + } + void set_tenant_id(const uint64_t tenant_id) + { + tenant_id_ = tenant_id; + } + void set_consensus_version(const int64_t consensus_version) + { + consensus_version_ = consensus_version; + } + TO_STRING_KV(K_(tenant_id), K_(consensus_version)); +private: + DISALLOW_COPY_AND_ASSIGN(ObBroadcastConsensusVersionArg); +private: + uint64_t tenant_id_; + int64_t consensus_version_; +}; + +struct ObBroadcastConsensusVersionRes +{ + OB_UNIS_VERSION(1); +public: + ObBroadcastConsensusVersionRes() + : ret_(OB_SUCCESS) {} + ~ObBroadcastConsensusVersionRes() {} + void set_ret(int ret) { ret_ = ret; } + TO_STRING_KV(K_(ret)); +private: + DISALLOW_COPY_AND_ASSIGN(ObBroadcastConsensusVersionRes); +private: + int ret_; +}; + }//end namespace obrpc }//end namespace oceanbase #endif diff --git a/src/share/ob_srv_rpc_proxy.h b/src/share/ob_srv_rpc_proxy.h index b14a50b8f..4d94e69d3 100644 --- a/src/share/ob_srv_rpc_proxy.h +++ b/src/share/ob_srv_rpc_proxy.h @@ -195,6 +195,7 @@ public: RPC_AP(PR5 get_server_resource_info, OB_GET_SERVER_RESOURCE_INFO, (obrpc::ObGetServerResourceInfoArg), obrpc::ObGetServerResourceInfoResult); RPC_AP(PR5 notify_switch_leader, OB_NOTIFY_SWITCH_LEADER, (obrpc::ObNotifySwitchLeaderArg)); RPC_AP(PR5 update_tenant_info_cache, OB_UPDATE_TENANT_INFO_CACHE, (obrpc::ObUpdateTenantInfoCacheArg), obrpc::ObUpdateTenantInfoCacheRes); + RPC_AP(PR5 broadcast_consensus_version, OB_BROADCAST_CONSENSUS_VERSION, (obrpc::ObBroadcastConsensusVersionArg), obrpc::ObBroadcastConsensusVersionRes); }; // end of class ObSrvRpcProxy } // end of namespace rpc diff --git a/src/share/schema/ob_ddl_trans_controller.cpp b/src/share/schema/ob_ddl_trans_controller.cpp index 13eb79c6f..f5b4c6c5e 100644 --- a/src/share/schema/ob_ddl_trans_controller.cpp +++ b/src/share/schema/ob_ddl_trans_controller.cpp @@ -12,6 +12,7 @@ #include "share/schema/ob_ddl_trans_controller.h" #include "share/schema/ob_multi_version_schema_service.h" #include "rootserver/ob_root_service.h" +#include "share/ob_srv_rpc_proxy.h" namespace oceanbase @@ -83,13 +84,15 @@ void ObDDLTransController::run1() } else { // ignore ret continue for (int64_t i = 0; i < tenant_ids.count(); i++) { - int64_t start_time= ObTimeUtility::current_time(); + int64_t start_time = ObTimeUtility::current_time(); ObCurTraceId::init(GCONF.self_addr_); if (OB_FAIL(GCTX.root_service_->get_ddl_service().publish_schema(tenant_ids.at(i)))) { LOG_WARN("refresh_schema fail", KR(ret), K(tenant_ids.at(i))); + } else if (OB_FAIL(broadcast_consensus_version(tenant_ids.at(i)))) { + LOG_WARN("fail to broadcast consensus version", KR(ret), K(tenant_ids.at(i))); } else { - int64_t end_time= ObTimeUtility::current_time(); - LOG_INFO("refresh_schema", K(tenant_ids.at(i)), K(end_time - start_time)); + int64_t end_time = ObTimeUtility::current_time(); + LOG_INFO("refresh_schema", KR(ret), K(tenant_ids.at(i)), K(end_time - start_time)); } } } @@ -100,6 +103,60 @@ void ObDDLTransController::run1() } } +int ObDDLTransController::broadcast_consensus_version(const int64_t tenant_id) +{ + int ret = OB_SUCCESS; + ObZone zone; + ObTimeoutCtx ctx; + ObArray server_list; + int64_t schema_version = OB_INVALID_VERSION; + obrpc::ObBroadcastConsensusVersionArg arg; + rootserver::ObBroadcstConsensusVersionProxy proxy(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::broadcast_consensus_version); + if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("ObDDLTransController", KR(ret)); + } else if (OB_ISNULL(schema_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ObDDLTransController", KR(ret)); + } else if (tenant_id == OB_INVALID_TENANT_ID) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant id", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(GCTX.root_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("rootservice is null", KR(ret)); + } else if (OB_FAIL(GCTX.root_service_->get_ddl_service().get_unit_manager().get_tenant_unit_servers(tenant_id, zone, server_list))) { + LOG_WARN("get alive server failed", KR(ret)); + } else if (OB_FAIL(schema_service_->get_tenant_refreshed_schema_version( + tenant_id, schema_version))) { + LOG_WARN("fail to get tenant refreshed schema version", KR(ret), K(tenant_id)); + } else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) { + LOG_WARN("fail to set default timeout ctx", KR(ret)); + } else { + arg.set_tenant_id(tenant_id); + arg.set_consensus_version(schema_version); + FOREACH_X(s, server_list, OB_SUCC(ret)) { + if (OB_ISNULL(s)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("s is null", KR(ret)); + } else { + // overwrite ret + if (OB_FAIL(proxy.call(*s, ctx.get_timeout(), arg))) { + LOG_WARN("send broadcast consensus version rpc failed", KR(ret), + K(ctx.get_timeout()), K(schema_version), K(arg), "server", *s); + ret = OB_SUCCESS; + } + } + } + int tmp_ret = OB_SUCCESS; + ObArray return_code_array; + if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { + LOG_WARN("wait result failed", KR(tmp_ret)); + } + } + LOG_INFO("broadcast consensus version finished", KR(ret), K(schema_version), K(arg), K(server_list)); + return ret; +} + int ObDDLTransController::create_task_and_assign_schema_version(const uint64_t tenant_id, const uint64_t schema_version_count, int64_t &task_id, diff --git a/src/share/schema/ob_ddl_trans_controller.h b/src/share/schema/ob_ddl_trans_controller.h index 196f4a643..85b827c0f 100644 --- a/src/share/schema/ob_ddl_trans_controller.h +++ b/src/share/schema/ob_ddl_trans_controller.h @@ -52,6 +52,7 @@ public: int remove_task(int64_t task_id); int check_enable_ddl_trans_new_lock(int64_t tenant_id, bool &res); int set_enable_ddl_trans_new_lock(int64_t tenant_id); + int broadcast_consensus_version(const int64_t tenant_id); private: virtual void run1() override; int check_task_ready(int64_t task_id, bool &ready); diff --git a/src/share/schema/ob_multi_version_schema_service.cpp b/src/share/schema/ob_multi_version_schema_service.cpp index d61d77158..0b1a795e2 100644 --- a/src/share/schema/ob_multi_version_schema_service.cpp +++ b/src/share/schema/ob_multi_version_schema_service.cpp @@ -3667,6 +3667,47 @@ int ObMultiVersionSchemaService::get_tenant_received_broadcast_version( return ret; } +int ObMultiVersionSchemaService::get_tenant_broadcast_consensus_version( + const uint64_t tenant_id, + int64_t &consensus_version) +{ + int ret = OB_SUCCESS; + if (OB_INVALID_TENANT_ID == tenant_id) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id)); + } else { + const ObSchemaStore* schema_store = NULL; + if (NULL == (schema_store = schema_store_map_.get(tenant_id))) { + ret = OB_ENTRY_NOT_EXIST; + LOG_WARN("fail to get schema_store", KR(ret)); + } else { + consensus_version = schema_store->get_consensus_version(); + } + } + return ret; +} + +int ObMultiVersionSchemaService::set_tenant_broadcast_consensus_version( + const uint64_t tenant_id, + const int64_t consensus_version) +{ + int ret = OB_SUCCESS; + if (OB_INVALID_TENANT_ID == tenant_id) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id), K(consensus_version)); + } else { + ObSchemaStore* schema_store = NULL; + if (NULL == (schema_store = schema_store_map_.get(tenant_id))) { + ret = OB_ENTRY_NOT_EXIST; + LOG_WARN("fail to get schema_store", KR(ret)); + } else { + schema_store->update_consensus_version(consensus_version); + LOG_INFO("try to set tenant broadcast consensus version", KR(ret), K(tenant_id), K(consensus_version)); + } + } + return ret; +} + int ObMultiVersionSchemaService::set_tenant_received_broadcast_version( const uint64_t tenant_id, const int64_t version) diff --git a/src/share/schema/ob_multi_version_schema_service.h b/src/share/schema/ob_multi_version_schema_service.h index 2a66e864e..6b464914c 100644 --- a/src/share/schema/ob_multi_version_schema_service.h +++ b/src/share/schema/ob_multi_version_schema_service.h @@ -226,6 +226,10 @@ public: int get_schema_store_tenants(common::ObIArray &tenant_ids); bool check_schema_store_tenant_exist(const uint64_t &tenant_id); + int get_tenant_broadcast_consensus_version(const uint64_t tenant_id, + int64_t &consensus_version); + int set_tenant_broadcast_consensus_version(const uint64_t tenant_id, + const int64_t consensus_version); virtual int set_tenant_received_broadcast_version(const uint64_t tenant_id, const int64_t version); virtual int set_last_refreshed_schema_info(const ObRefreshSchemaInfo &schema_info); int update_baseline_schema_version(const uint64_t tenant_id, const int64_t baseline_schema_version); diff --git a/src/share/schema/ob_schema_store.cpp b/src/share/schema/ob_schema_store.cpp index 6dae34883..d050e7047 100644 --- a/src/share/schema/ob_schema_store.cpp +++ b/src/share/schema/ob_schema_store.cpp @@ -88,6 +88,15 @@ void ObSchemaStore::update_baseline_schema_version(int64_t version) } } +void ObSchemaStore::update_consensus_version(int64_t version) +{ + if (version > consensus_version_) { + inc_update(&consensus_version_, version); + LOG_INFO("[SCHEMA_STORE] schema store update version", + K_(tenant_id), K(version), K_(consensus_version)); + } +} + int ObSchemaStoreMap::init(int64_t bucket_num) { UNUSED(bucket_num); diff --git a/src/share/schema/ob_schema_store.h b/src/share/schema/ob_schema_store.h index 992d9f250..30721e0e4 100644 --- a/src/share/schema/ob_schema_store.h +++ b/src/share/schema/ob_schema_store.h @@ -32,7 +32,8 @@ public: refreshed_version_(0), received_version_(0), checked_sys_version_(0), - baseline_schema_version_(common::OB_INVALID_VERSION) {} + baseline_schema_version_(common::OB_INVALID_VERSION), + consensus_version_(0) {} ~ObSchemaStore() {} int init(const uint64_t tenant_id, const int64_t init_version_count, @@ -42,16 +43,19 @@ public: void update_received_version(int64_t version); void update_checked_sys_version(int64_t version); void update_baseline_schema_version(int64_t version); + void update_consensus_version(int64_t version); int64_t get_refreshed_version() const { return ATOMIC_LOAD(&refreshed_version_); } int64_t get_received_version() const { return ATOMIC_LOAD(&received_version_); } int64_t get_checked_sys_version() const { return ATOMIC_LOAD(&checked_sys_version_); } int64_t get_baseline_schema_version() const { return ATOMIC_LOAD(&baseline_schema_version_); } + int64_t get_consensus_version() const { return ATOMIC_LOAD(&consensus_version_); } int64_t tenant_id_; int64_t refreshed_version_; int64_t received_version_; int64_t checked_sys_version_; int64_t baseline_schema_version_; + int64_t consensus_version_; ObSchemaMgrCache schema_mgr_cache_; ObSchemaMgrCache schema_mgr_cache_for_liboblog_; }; @@ -79,4 +83,3 @@ private: }; // end namespace oceanbase #endif /* OCEANBASE_SCHEMA_OB_TENANT_SCHEMA_STORE_H_ */ - diff --git a/src/sql/engine/cmd/ob_table_executor.cpp b/src/sql/engine/cmd/ob_table_executor.cpp index 0e1151cdc..903921f3c 100644 --- a/src/sql/engine/cmd/ob_table_executor.cpp +++ b/src/sql/engine/cmd/ob_table_executor.cpp @@ -2124,9 +2124,16 @@ int ObTruncateTableExecutor::execute(ObExecContext &ctx, ObTruncateTableStmt &st // wait schema_version refreshed on this server while (OB_SUCC(ret) && ctx.get_timeout() > 0) { int64_t refreshed_schema_version = OB_INVALID_VERSION; + int64_t consensus_schema_version = OB_INVALID_VERSION; if (OB_FAIL(GCTX.schema_service_->get_tenant_refreshed_schema_version(res.tenant_id_, refreshed_schema_version))) { - LOG_WARN("get schema_version fail", KR(ret), K(res.tenant_id_)); - } else if (refreshed_schema_version >= res.task_id_) { + LOG_WARN("get refreshed schema_version fail", KR(ret), K(res.tenant_id_)); + } else if (OB_FAIL(GCTX.schema_service_->get_tenant_broadcast_consensus_version(res.tenant_id_, consensus_schema_version))) { + LOG_WARN("get consensus schema_version fail", KR(ret), K(res.tenant_id_)); + } else if (refreshed_schema_version >= res.task_id_ + && consensus_schema_version >= res.task_id_) { + break; + } else if (refreshed_schema_version >= res.task_id_ + && ObTimeUtility::current_time() - step_time >= 10 * 1000 * 1000) { //10s break; } else { ob_usleep(10 * 1000); @@ -2134,7 +2141,7 @@ int ObTruncateTableExecutor::execute(ObExecContext &ctx, ObTruncateTableStmt &st } } int64_t end_time = ObTimeUtility::current_time(); - LOG_INFO("truncate_table_v2", K(ret), "cost", end_time-start_time, + LOG_INFO("truncate_table_v2", KR(ret), "cost", end_time-start_time, "trans_cost", step_time - start_time, "wait_refresh", end_time - step_time, "table_name", truncate_table_arg.table_name_,