From 08add2abda666785df9528d34b8fe7914509fb60 Mon Sep 17 00:00:00 2001 From: Hongqin-Li Date: Fri, 28 Jun 2024 09:59:10 +0000 Subject: [PATCH] Avoid get binding info and autoinc seq mds on replay --- deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h | 1 + src/observer/ob_rpc_processor_simple.cpp | 13 + src/observer/ob_rpc_processor_simple.h | 1 + src/observer/ob_srv_xlator_storage.cpp | 1 + src/rootserver/ob_ddl_service.cpp | 105 +-- src/rootserver/ob_ddl_service.h | 2 + src/rootserver/ob_partition_exchange.cpp | 48 +- src/rootserver/ob_partition_exchange.h | 3 +- src/rootserver/ob_root_service.cpp | 8 +- src/rootserver/ob_tablet_creator.cpp | 10 + src/share/ob_rpc_struct.cpp | 50 ++ src/share/ob_rpc_struct.h | 36 + src/share/ob_srv_rpc_proxy.h | 1 + .../compile_utility/mds_register.h | 6 +- .../ob_tablet_autoinc_seq_rpc_handler.cpp | 11 +- .../tablet/ob_tablet_binding_helper.cpp | 625 ++++++++++++++++-- src/storage/tablet/ob_tablet_binding_helper.h | 183 ++++- .../ob_tablet_binding_replay_executor.cpp | 2 +- .../ob_tablet_binding_replay_executor.h | 4 +- .../tablet/ob_tablet_create_mds_helper.cpp | 4 +- .../tx/ob_multi_data_source_printer.cpp | 3 +- src/storage/tx/ob_tx_log.cpp | 3 +- 22 files changed, 1003 insertions(+), 117 deletions(-) diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index 55414ca400..0a56c06bc4 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -861,6 +861,7 @@ PCODE_DEF(OB_REMOTE_WRITE_DDL_INC_COMMIT_LOG, 0x967) //PCODE_DEF(OB_FETCH_SPLIT_TABLET_INFO, 0x96A) //PCODE_DEF(OB_SPLIT_GLOBAL_INDEX_TABLET, 0x96B) //PCODE_DEF(OB_PREPARE_TABLET_SPLIT_TASK_RANGES, 0x96C) +PCODE_DEF(OB_BATCH_GET_TABLET_BINDING, 0x96D) // Depedency Detector PCODE_DEF(OB_DETECTOR_LCL_MESSAGE, 0x9F0) diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index a53023fe94..a5bf38e16a 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -2248,6 +2248,19 @@ int ObRpcClearTabletAutoincSeqCacheP::process() return ret; } +int ObRpcBatchGetTabletBindingP::process() +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(rpc_pkt_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid rpc pkt", K(ret)); + } else { + const int64_t abs_timeout_us = get_send_timestamp() + rpc_pkt_->get_timeout(); + ret = ObTabletBindingMdsHelper::batch_get_tablet_binding(abs_timeout_us, arg_, result_); + } + return ret; +} + #ifdef OB_BUILD_TDE_SECURITY int ObDumpTenantCacheMasterKeyP::process() { diff --git a/src/observer/ob_rpc_processor_simple.h b/src/observer/ob_rpc_processor_simple.h index 78f24b32b8..41b29272c3 100644 --- a/src/observer/ob_rpc_processor_simple.h +++ b/src/observer/ob_rpc_processor_simple.h @@ -123,6 +123,7 @@ OB_DEFINE_PROCESSOR_S(Srv, OB_FETCH_TABLET_AUTOINC_SEQ_CACHE, ObRpcFetchTabletAu OB_DEFINE_PROCESSOR_S(Srv, OB_BATCH_GET_TABLET_AUTOINC_SEQ, ObRpcBatchGetTabletAutoincSeqP); OB_DEFINE_PROCESSOR_S(Srv, OB_BATCH_SET_TABLET_AUTOINC_SEQ, ObRpcBatchSetTabletAutoincSeqP); OB_DEFINE_PROCESSOR_S(Srv, OB_CLEAR_TABLET_AUTOINC_SEQ_CACHE, ObRpcClearTabletAutoincSeqCacheP); +OB_DEFINE_PROCESSOR_S(Srv, OB_BATCH_GET_TABLET_BINDING, ObRpcBatchGetTabletBindingP); OB_DEFINE_PROCESSOR_S(Srv, OB_CHECK_MODIFY_TIME_ELAPSED, ObRpcCheckCtxCreateTimestampElapsedP); OB_DEFINE_PROCESSOR_S(Srv, OB_UPDATE_BASELINE_SCHEMA_VERSION, ObRpcUpdateBaselineSchemaVersionP); OB_DEFINE_PROCESSOR_S(Srv, OB_SWITCH_LEADER, ObRpcSwitchLeaderP); diff --git a/src/observer/ob_srv_xlator_storage.cpp b/src/observer/ob_srv_xlator_storage.cpp index aaa6fe04f6..a3fb03fb5b 100644 --- a/src/observer/ob_srv_xlator_storage.cpp +++ b/src/observer/ob_srv_xlator_storage.cpp @@ -102,6 +102,7 @@ void oceanbase::observer::init_srv_xlator_for_storage(ObSrvRpcXlator *xlator) { RPC_PROCESSOR(ObRpcBatchGetTabletAutoincSeqP, gctx_); RPC_PROCESSOR(ObRpcBatchSetTabletAutoincSeqP, gctx_); RPC_PROCESSOR(ObRpcClearTabletAutoincSeqCacheP, gctx_); + RPC_PROCESSOR(ObRpcBatchGetTabletBindingP, gctx_); RPC_PROCESSOR(ObRpcRemoteWriteDDLRedoLogP, gctx_); RPC_PROCESSOR(ObRpcRemoteWriteDDLCommitLogP, gctx_); RPC_PROCESSOR(ObRpcRemoteWriteDDLIncCommitLogP, gctx_); diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index c7cd29c41d..14d8231fa8 100755 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -12775,6 +12775,7 @@ int ObDDLService::alter_table_in_trans(obrpc::ObAlterTableArg &alter_table_arg, LOG_WARN("failed to alter table options,", K(origin_table_name), K(new_table_schema), K(ret)); } else if (is_rename_and_need_table_lock && OB_FAIL(build_rw_defense_for_table_( + tenant_data_version, new_table_schema, new_table_schema.get_schema_version(), idx_schema_versions, @@ -16445,7 +16446,9 @@ int ObDDLService::rename_table(const obrpc::ObRenameTableArg &rename_table_arg) idx_schema_versions))) { LOG_WARN("failed to rename table!", K(rename_item), K(table_id), K(ret)); } else if (need_table_lock_and_defense && + !in_new_table_set && // to avoid defense twice on the same table OB_FAIL(build_rw_defense_for_table_( + tenant_data_version, *from_table_schema, new_table_schema_version, idx_schema_versions, @@ -16498,6 +16501,7 @@ int ObDDLService::rename_table(const obrpc::ObRenameTableArg &rename_table_arg) LOG_WARN("failed to rename table!", K(ret), K(rename_item), K(table_id)); } else if (need_table_lock_and_defense && OB_FAIL(build_rw_defense_for_table_( + tenant_data_version, *from_table_schema, new_table_schema_version, idx_schema_versions, @@ -16635,32 +16639,41 @@ int ObDDLService::rename_table(const obrpc::ObRenameTableArg &rename_table_arg) int ObDDLService::build_single_table_rw_defensive_( const uint64_t tenant_id, + const uint64_t tenant_data_version, const ObArray &tablet_ids, const int64_t schema_version, ObDDLSQLTransaction &trans) { int ret = OB_SUCCESS; - ObArray args; if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || tablet_ids.empty() || schema_version <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(tenant_id), K(tablet_ids), K(schema_version)); - } else if (OB_FAIL(build_modify_tablet_binding_args( - tenant_id, tablet_ids, true/*is_hidden_tablets*/, schema_version, args, trans))) { - LOG_WARN("failed to build reuse index args", K(ret)); - } - ObArenaAllocator allocator("DDLRWDefens"); - for (int64_t i = 0; OB_SUCC(ret) && i < args.count(); i++) { - int64_t pos = 0; - int64_t size = args[i].get_serialize_size(); - char *buf = nullptr; - allocator.reuse(); - if (OB_ISNULL(buf = static_cast(allocator.alloc(size)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate", K(ret)); - } else if (OB_FAIL(args[i].serialize(buf, size, pos))) { - LOG_WARN("failed to serialize arg", K(ret)); - } else if (OB_FAIL(trans.register_tx_data(args[i].tenant_id_, args[i].ls_id_, transaction::ObTxDataSourceType::UNBIND_TABLET_NEW_MDS, buf, pos))) { - LOG_WARN("failed to register tx data", K(ret)); + } else if (OB_LIKELY(tenant_data_version >= DATA_VERSION_4_3_2_0)) { + const int64_t abs_timeout_us = THIS_WORKER.is_timeout_ts_valid() ? THIS_WORKER.get_timeout_ts() + : ObTimeUtility::current_time() + GCONF.rpc_timeout; + if (OB_FAIL(ObTabletBindingMdsHelper::modify_tablet_binding_for_rw_defensive(tenant_id, tablet_ids, schema_version, abs_timeout_us, trans))) { + LOG_WARN("failed to modify tablet binding", K(ret), K(abs_timeout_us)); + } + } else { + ObArray args; + if (OB_FAIL(build_modify_tablet_binding_args( + tenant_id, tablet_ids, true/*is_hidden_tablets*/, schema_version, args, trans))) { + LOG_WARN("failed to build reuse index args", K(ret)); + } + ObArenaAllocator allocator("DDLRWDefens"); + for (int64_t i = 0; OB_SUCC(ret) && i < args.count(); i++) { + int64_t pos = 0; + int64_t size = args[i].get_serialize_size(); + char *buf = nullptr; + allocator.reuse(); + if (OB_ISNULL(buf = static_cast(allocator.alloc(size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate", K(ret)); + } else if (OB_FAIL(args[i].serialize(buf, size, pos))) { + LOG_WARN("failed to serialize arg", K(ret)); + } else if (OB_FAIL(trans.register_tx_data(args[i].tenant_id_, args[i].ls_id_, transaction::ObTxDataSourceType::UNBIND_TABLET_NEW_MDS, buf, pos))) { + LOG_WARN("failed to register tx data", K(ret)); + } } } return ret; @@ -16673,6 +16686,7 @@ int ObDDLService::build_single_table_rw_defensive_( * (An origin_table_schema has no updated tablet info compared to a new table schema, e.g. tablet split) */ int ObDDLService::build_rw_defense_for_table_( + const uint64_t tenant_data_version, const ObTableSchema &table_schema, const int64_t new_data_table_schema_version, const ObIArray> &aux_schema_versions, // pair: table_id, schema_version @@ -16711,7 +16725,7 @@ int ObDDLService::build_rw_defense_for_table_( LOG_WARN("get schema guard failed", K(ret)); } else if (OB_FAIL(table_schema.get_tablet_ids(tablet_ids))) { LOG_WARN("failed to get tablet_ids", K(ret), K(table_schema)); - } else if (OB_FAIL(build_single_table_rw_defensive_(tenant_id, tablet_ids, new_data_table_schema_version, trans))) { + } else if (OB_FAIL(build_single_table_rw_defensive_(tenant_id, tenant_data_version, tablet_ids, new_data_table_schema_version, trans))) { LOG_WARN("failed to build defense ", K(ret), K(table_schema), K(tablet_ids)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < aux_schema_versions.count(); ++i) { @@ -16726,7 +16740,7 @@ int ObDDLService::build_rw_defense_for_table_( LOG_WARN("table schema should not be null", K(ret)); } else if (OB_FAIL(aux_table_schema->get_tablet_ids(tablet_ids))) { LOG_WARN("failed to get tablet_ids", K(ret), KPC(aux_table_schema)); - } else if (OB_FAIL(build_single_table_rw_defensive_(tenant_id, tablet_ids, schema_version, trans))) { + } else if (OB_FAIL(build_single_table_rw_defensive_(tenant_id, tenant_data_version, tablet_ids, schema_version, trans))) { LOG_WARN("failed to build defense ", K(ret), KPC(aux_table_schema), K(i), K(schema_version)); } } @@ -19548,32 +19562,43 @@ int ObDDLService::unbind_hidden_tablets( { int ret = OB_SUCCESS; const uint64_t tenant_id = orig_table_schema.get_tenant_id(); + uint64_t tenant_data_version = 0; ObArray orig_tablet_ids; ObArray hidden_tablet_ids; - ObArray args; if (OB_FAIL(orig_table_schema.get_tablet_ids(orig_tablet_ids))) { LOG_WARN("get tablet ids failed", K(ret)); } else if (OB_FAIL(hidden_table_schema.get_tablet_ids(hidden_tablet_ids))) { LOG_WARN("get tablet ids failed", K(ret)); - } else if (OB_FAIL(build_modify_tablet_binding_args( - tenant_id, orig_tablet_ids, false/*is_hidden_tablets*/, schema_version, args, trans))) { - LOG_WARN("failed to build reuse index args", K(ret)); - } else if (OB_FAIL(build_modify_tablet_binding_args( - tenant_id, hidden_tablet_ids, true/*is_hidden_tablets*/, schema_version, args, trans))) { - LOG_WARN("failed to build reuse index args", K(ret)); - } - for (int64_t i = 0; OB_SUCC(ret) && i < args.count(); i++) { - int64_t pos = 0; - int64_t size = args[i].get_serialize_size(); - ObArenaAllocator allocator; - char *buf = nullptr; - if (OB_ISNULL(buf = static_cast(allocator.alloc(size)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate", K(ret)); - } else if (OB_FAIL(args[i].serialize(buf, size, pos))) { - LOG_WARN("failed to serialize arg", K(ret)); - } else if (OB_FAIL(trans.register_tx_data(args[i].tenant_id_, args[i].ls_id_, transaction::ObTxDataSourceType::UNBIND_TABLET_NEW_MDS, buf, pos))) { - LOG_WARN("failed to register tx data", K(ret)); + } else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, tenant_data_version))) { + LOG_WARN("get min data version failed", K(ret), K(tenant_id)); + } else if (OB_LIKELY(tenant_data_version >= DATA_VERSION_4_3_2_0)) { + const int64_t abs_timeout_us = THIS_WORKER.is_timeout_ts_valid() ? THIS_WORKER.get_timeout_ts() + : ObTimeUtility::current_time() + GCONF.rpc_timeout; + if (OB_FAIL(ObTabletBindingMdsHelper::modify_tablet_binding_for_unbind(tenant_id, orig_tablet_ids, hidden_tablet_ids, schema_version, abs_timeout_us, trans))) { + LOG_WARN("failed to modify tablet binding", K(ret), K(abs_timeout_us)); + } + } else { + ObArray args; + if (OB_FAIL(build_modify_tablet_binding_args( + tenant_id, orig_tablet_ids, false/*is_hidden_tablets*/, schema_version, args, trans))) { + LOG_WARN("failed to build reuse index args", K(ret)); + } else if (OB_FAIL(build_modify_tablet_binding_args( + tenant_id, hidden_tablet_ids, true/*is_hidden_tablets*/, schema_version, args, trans))) { + LOG_WARN("failed to build reuse index args", K(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < args.count(); i++) { + int64_t pos = 0; + int64_t size = args[i].get_serialize_size(); + ObArenaAllocator allocator; + char *buf = nullptr; + if (OB_ISNULL(buf = static_cast(allocator.alloc(size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate", K(ret)); + } else if (OB_FAIL(args[i].serialize(buf, size, pos))) { + LOG_WARN("failed to serialize arg", K(ret)); + } else if (OB_FAIL(trans.register_tx_data(args[i].tenant_id_, args[i].ls_id_, transaction::ObTxDataSourceType::UNBIND_TABLET_NEW_MDS, buf, pos))) { + LOG_WARN("failed to register tx data", K(ret)); + } } } return ret; diff --git a/src/rootserver/ob_ddl_service.h b/src/rootserver/ob_ddl_service.h index 2b4478a25c..70bbd882e0 100644 --- a/src/rootserver/ob_ddl_service.h +++ b/src/rootserver/ob_ddl_service.h @@ -2120,10 +2120,12 @@ private: common::ObIArray &new_aux_schemas); int build_single_table_rw_defensive_( const uint64_t tenant_id, + const uint64_t tenant_data_version, const ObArray &tablet_ids, const int64_t schema_version, ObDDLSQLTransaction &trans); int build_rw_defense_for_table_( + const uint64_t tenant_data_version, const ObTableSchema &table_schema, const int64_t new_data_table_schema_version, const ObIArray> &aux_schema_versions, diff --git a/src/rootserver/ob_partition_exchange.cpp b/src/rootserver/ob_partition_exchange.cpp index 3255cccacf..ca89f186d6 100644 --- a/src/rootserver/ob_partition_exchange.cpp +++ b/src/rootserver/ob_partition_exchange.cpp @@ -36,8 +36,8 @@ using namespace share; using namespace share::schema; namespace rootserver { -ObPartitionExchange::ObPartitionExchange(ObDDLService &ddl_service) - : ddl_service_(ddl_service) +ObPartitionExchange::ObPartitionExchange(ObDDLService &ddl_service, const uint64_t data_version) + : ddl_service_(ddl_service), data_version_(data_version) { } @@ -2199,30 +2199,38 @@ int ObPartitionExchange::build_single_table_rw_defensive_(const uint64_t tenant_ ObDDLSQLTransaction &trans) { int ret = OB_SUCCESS; - ObArray args; if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || tablet_ids.empty() || schema_version <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(tenant_id), K(tablet_ids), K(schema_version)); } else if (OB_UNLIKELY(!ddl_service_.is_inited())) { ret = OB_INNER_STAT_ERROR; LOG_WARN("ddl_service not init", K(ret)); - } else if (OB_FAIL(build_modify_tablet_binding_args_v1_( - tenant_id, tablet_ids, schema_version, args, trans))) { - LOG_WARN("failed to build reuse index args", K(ret)); - } - ObArenaAllocator allocator("DDLRWDefens"); - for (int64_t i = 0; OB_SUCC(ret) && i < args.count(); i++) { - int64_t pos = 0; - int64_t size = args[i].get_serialize_size(); - char *buf = nullptr; - allocator.reuse(); - if (OB_ISNULL(buf = static_cast(allocator.alloc(size)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate", K(ret)); - } else if (OB_FAIL(args[i].serialize(buf, size, pos))) { - LOG_WARN("failed to serialize arg", K(ret)); - } else if (OB_FAIL(trans.register_tx_data(args[i].tenant_id_, args[i].ls_id_, transaction::ObTxDataSourceType::UNBIND_TABLET_NEW_MDS, buf, pos))) { - LOG_WARN("failed to register tx data", K(ret)); + } else if (OB_LIKELY(data_version_ >= DATA_VERSION_4_3_2_0)) { + const int64_t abs_timeout_us = THIS_WORKER.is_timeout_ts_valid() ? THIS_WORKER.get_timeout_ts() + : ObTimeUtility::current_time() + GCONF.rpc_timeout; + if (OB_FAIL(ObTabletBindingMdsHelper::modify_tablet_binding_for_rw_defensive(tenant_id, tablet_ids, schema_version, abs_timeout_us, trans))) { + LOG_WARN("failed to modify tablet binding", K(ret), K(abs_timeout_us)); + } + } else { + ObArray args; + if (OB_FAIL(build_modify_tablet_binding_args_v1_( + tenant_id, tablet_ids, schema_version, args, trans))) { + LOG_WARN("failed to build reuse index args", K(ret)); + } + ObArenaAllocator allocator("DDLRWDefens"); + for (int64_t i = 0; OB_SUCC(ret) && i < args.count(); i++) { + int64_t pos = 0; + int64_t size = args[i].get_serialize_size(); + char *buf = nullptr; + allocator.reuse(); + if (OB_ISNULL(buf = static_cast(allocator.alloc(size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate", K(ret)); + } else if (OB_FAIL(args[i].serialize(buf, size, pos))) { + LOG_WARN("failed to serialize arg", K(ret)); + } else if (OB_FAIL(trans.register_tx_data(args[i].tenant_id_, args[i].ls_id_, transaction::ObTxDataSourceType::UNBIND_TABLET_NEW_MDS, buf, pos))) { + LOG_WARN("failed to register tx data", K(ret)); + } } } return ret; diff --git a/src/rootserver/ob_partition_exchange.h b/src/rootserver/ob_partition_exchange.h index 6e04376d41..46655529bb 100644 --- a/src/rootserver/ob_partition_exchange.h +++ b/src/rootserver/ob_partition_exchange.h @@ -37,7 +37,7 @@ class ObPartitionExchange final { public: typedef std::pair LSTabletID; - explicit ObPartitionExchange(ObDDLService &ddl_service); + explicit ObPartitionExchange(ObDDLService &ddl_service, const uint64_t data_version); ~ObPartitionExchange(); int check_and_exchange_partition(const obrpc::ObExchangePartitionArg &arg, obrpc::ObAlterTableRes &res, ObSchemaGetterGuard &schema_guard); private: @@ -263,6 +263,7 @@ private: int get_object_id_from_partition_schema_(ObPartitionSchema &partition_schema, const bool get_subpart_only, int64_t &object_id); private: ObDDLService &ddl_service_; + uint64_t data_version_; common::hash::ObHashMap used_pt_nt_id_map_; common::hash::ObHashMap used_table_to_tablet_id_map_; common::ObSArray unused_pt_index_id_; diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index 68c1c1ae7a..cfd6a361ef 100755 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -4614,7 +4614,6 @@ int ObRootService::exchange_partition(const obrpc::ObExchangePartitionArg &arg, int ret = OB_SUCCESS; uint64_t compat_version = 0; ObSchemaGetterGuard schema_guard; - ObPartitionExchange partition_exchange(ddl_service_); schema_guard.set_session_id(arg.session_id_); LOG_DEBUG("receive exchange partition arg", K(ret), K(arg)); if (!inited_) { @@ -4632,8 +4631,11 @@ int ObRootService::exchange_partition(const obrpc::ObExchangePartitionArg &arg, LOG_WARN("get schema guard in inner table failed", K(ret)); } else if (OB_FAIL(check_parallel_ddl_conflict(schema_guard, arg))) { LOG_WARN("check parallel ddl conflict failed", K(ret)); - } else if (OB_FAIL(partition_exchange.check_and_exchange_partition(arg, res, schema_guard))) { - LOG_WARN("fail to check and exchange partition", K(ret), K(arg), K(res)); + } else { + ObPartitionExchange partition_exchange(ddl_service_, compat_version); + if (OB_FAIL(partition_exchange.check_and_exchange_partition(arg, res, schema_guard))) { + LOG_WARN("fail to check and exchange partition", K(ret), K(arg), K(res)); + } } char table_id_buffer[256]; snprintf(table_id_buffer, sizeof(table_id_buffer), "table_id:%ld, exchange_table_id:%ld", diff --git a/src/rootserver/ob_tablet_creator.cpp b/src/rootserver/ob_tablet_creator.cpp index f7a7c76481..40a41d6f3f 100644 --- a/src/rootserver/ob_tablet_creator.cpp +++ b/src/rootserver/ob_tablet_creator.cpp @@ -414,6 +414,16 @@ int ObTabletCreator::execute() LOG_INFO("generate create arg", KR(ret), K(buf_len), K(batch_arg->batch_arg_.tablets_.count()), K(batch_arg->batch_arg_), "cost_ts", end_time - start_time); } while (need_retry(ret)); + if (OB_SUCC(ret) && batch_arg->batch_arg_.set_binding_info_outside_create()) { + const int64_t start_time = ObTimeUtility::current_time(); + if (OB_FAIL(ObTabletBindingMdsHelper::modify_tablet_binding_for_create(tenant_id_, + batch_arg->batch_arg_, ctx.get_abs_timeout(), trans_))) { + LOG_WARN("failed to modify tablet binding for create", K(ret)); + } + const int64_t end_time = ObTimeUtility::current_time(); + LOG_INFO("modify binding for create", KR(ret), K(buf_len), K(batch_arg->batch_arg_.tablets_.count()), + "cost_ts", end_time - start_time); + } } batch_arg = batch_arg->next_; } // end while diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 43e5fff830..ab2f27f570 100644 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -9224,6 +9224,19 @@ int ObBatchCreateTabletArg::assign(const ObBatchCreateTabletArg &arg) return ret; } +bool ObBatchCreateTabletArg::set_binding_info_outside_create() const +{ + int bool_ret = false; + uint64_t min_data_version = UINT64_MAX; + for (int64_t i = 0; i < tablet_extra_infos_.count(); i++) { + min_data_version = std::min(min_data_version, tablet_extra_infos_.at(i).tenant_data_version_); + } + if (UINT64_MAX != min_data_version && DATA_VERSION_4_3_2_0 <= min_data_version) { + bool_ret = true; + } + return bool_ret; +} + OB_SERIALIZE_MEMBER((ObContextDDLArg, ObDDLArg), stmt_type_, ctx_schema_, @@ -10589,6 +10602,43 @@ int ObBatchSetTabletAutoincSeqRes::assign(const ObBatchSetTabletAutoincSeqRes &o return autoinc_params_.assign(other.autoinc_params_); } +OB_SERIALIZE_MEMBER(ObBatchGetTabletBindingArg, tenant_id_, ls_id_, tablet_ids_, check_committed_); + +int ObBatchGetTabletBindingArg::assign(const ObBatchGetTabletBindingArg &other) +{ + int ret = OB_SUCCESS; + tenant_id_ = other.tenant_id_; + ls_id_ = other.ls_id_; + check_committed_ = other.check_committed_; + if (OB_FAIL(tablet_ids_.assign(other.tablet_ids_))) { + LOG_WARN("failed to assign", K(ret)); + } + return ret; +} + +int ObBatchGetTabletBindingArg::init(const uint64_t tenant_id, const share::ObLSID &ls_id, const ObIArray &tablet_ids, const bool check_committed) +{ + int ret = OB_SUCCESS; + tenant_id_ = tenant_id; + ls_id_ = ls_id; + check_committed_ = check_committed; + if (OB_FAIL(tablet_ids_.assign(tablet_ids))) { + LOG_WARN("failed to assign", K(ret)); + } + if (OB_SUCC(ret) && OB_UNLIKELY(!is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", K(ret), K(*this)); + } + return ret; +} + +OB_SERIALIZE_MEMBER(ObBatchGetTabletBindingRes, binding_datas_); + +int ObBatchGetTabletBindingRes::assign(const ObBatchGetTabletBindingRes &other) +{ + return binding_datas_.assign(other.binding_datas_); +} + OB_SERIALIZE_MEMBER(ObFetchLocationResult, servers_); int ObFetchLocationResult::assign(const ObFetchLocationResult &other) diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 6553e4aa5c..96abe6c490 100644 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -57,6 +57,7 @@ #include "share/ob_alive_server_tracer.h"//ServerAddr #include "storage/blocksstable/ob_block_sstable_struct.h" #include "storage/ddl/ob_ddl_struct.h" +#include "storage/tablet/ob_tablet_binding_mds_user_data.h" #include "storage/tx/ob_trans_define.h" #include "storage/tx/ob_multi_data_source.h" #include "share/unit/ob_unit_info.h" //ObUnit* @@ -3966,6 +3967,7 @@ public: static int skip_unis_array_len(const char *buf, int64_t data_len, int64_t &pos); + bool set_binding_info_outside_create() const; DECLARE_TO_STRING; public: @@ -10773,6 +10775,40 @@ public: common::ObSEArray autoinc_params_; }; +struct ObBatchGetTabletBindingArg final +{ + OB_UNIS_VERSION(1); +public: + ObBatchGetTabletBindingArg() + : tenant_id_(OB_INVALID_ID), ls_id_(), tablet_ids_(), check_committed_(false) + {} + ~ObBatchGetTabletBindingArg() {} +public: + int assign(const ObBatchGetTabletBindingArg &other); + bool is_valid() const { return tenant_id_ != OB_INVALID_ID && ls_id_.is_valid() && tablet_ids_.count() > 0; } + int init(const uint64_t tenant_id, const share::ObLSID &ls_id, const common::ObIArray &tablet_ids, const bool check_committed); + TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_ids), K_(check_committed)); +public: + uint64_t tenant_id_; + share::ObLSID ls_id_; + common::ObSArray tablet_ids_; + bool check_committed_; +}; + +struct ObBatchGetTabletBindingRes final +{ + OB_UNIS_VERSION(1); +public: + ObBatchGetTabletBindingRes() : binding_datas_() {} + ~ObBatchGetTabletBindingRes() {} +public: + int assign(const ObBatchGetTabletBindingRes &other); + bool is_valid() const { return binding_datas_.count() > 0; } + TO_STRING_KV(K_(binding_datas)); +public: + common::ObSArray binding_datas_; +}; + struct ObFetchLocationResult { public: diff --git a/src/share/ob_srv_rpc_proxy.h b/src/share/ob_srv_rpc_proxy.h index fab5e916a3..7dd15de385 100644 --- a/src/share/ob_srv_rpc_proxy.h +++ b/src/share/ob_srv_rpc_proxy.h @@ -150,6 +150,7 @@ public: RPC_AP(PR5 batch_get_tablet_autoinc_seq, OB_BATCH_GET_TABLET_AUTOINC_SEQ, (obrpc::ObBatchGetTabletAutoincSeqArg), obrpc::ObBatchGetTabletAutoincSeqRes); RPC_AP(PR5 batch_set_tablet_autoinc_seq, OB_BATCH_SET_TABLET_AUTOINC_SEQ, (obrpc::ObBatchSetTabletAutoincSeqArg), obrpc::ObBatchSetTabletAutoincSeqRes); RPC_AP(PR5 clear_tablet_autoinc_seq_cache, OB_CLEAR_TABLET_AUTOINC_SEQ_CACHE, (obrpc::ObClearTabletAutoincSeqCacheArg), obrpc::Int64); + RPC_S(PR5 batch_get_tablet_binding, OB_BATCH_GET_TABLET_BINDING, (obrpc::ObBatchGetTabletBindingArg), obrpc::ObBatchGetTabletBindingRes); RPC_S(PRD force_create_sys_table, OB_FORCE_CREATE_SYS_TABLE, (ObForceCreateSysTableArg)); RPC_S(PRD schema_revise, OB_SCHEMA_REVISE, (ObSchemaReviseArg)); RPC_S(PRD force_set_locality, OB_FORCE_SET_LOCALITY, (ObForceSetLocalityArg)); diff --git a/src/storage/multi_data_source/compile_utility/mds_register.h b/src/storage/multi_data_source/compile_utility/mds_register.h index 8485df6a4d..9e684f56af 100644 --- a/src/storage/multi_data_source/compile_utility/mds_register.h +++ b/src/storage/multi_data_source/compile_utility/mds_register.h @@ -176,6 +176,10 @@ _GENERATE_MDS_FRAME_CODE_FOR_TRANSACTION_(HELPER_CLASS, BUFFER_CTX_TYPE, ID, ENU ::oceanbase::storage::mds::MdsCtx, \ 33,\ STANDBY_UPGRADE_DATA_VERSION) + GENERATE_MDS_FRAME_CODE_FOR_TRANSACTION(::oceanbase::storage::ObTabletBindingMdsHelper,\ + ::oceanbase::storage::mds::MdsCtx,\ + 34,\ + TABLET_BINDING) // # 余留位置(此行之前占位) #undef GENERATE_MDS_FRAME_CODE_FOR_TRANSACTION #endif @@ -253,4 +257,4 @@ _GENERATE_MDS_UNIT_(KEY_TYPE, VALUE_TYPE, NEED_MULTI_VERSION) true) // replace this line if you are the first user to register LS INNER TABLET // # 余留位置(此行之前占位) #endif -/**************************************************************************************************/ \ No newline at end of file +/**************************************************************************************************/ diff --git a/src/storage/ob_tablet_autoinc_seq_rpc_handler.cpp b/src/storage/ob_tablet_autoinc_seq_rpc_handler.cpp index aa7e3daebe..c539ab8c58 100644 --- a/src/storage/ob_tablet_autoinc_seq_rpc_handler.cpp +++ b/src/storage/ob_tablet_autoinc_seq_rpc_handler.cpp @@ -67,11 +67,18 @@ int ObSyncTabletSeqReplayExecutor::do_replay_(ObTabletHandle &handle) ObArenaAllocator allocator; ObTabletAutoincSeq curr_autoinc_seq; uint64_t curr_autoinc_seq_value; - if (OB_FAIL(tablet->get_autoinc_seq(allocator, share::SCN::max_scn(), curr_autoinc_seq))) { + bool need_replay = true; + if (CLUSTER_CURRENT_VERSION >= CLUSTER_VERSION_4_3_2_0) { + // just replay for multi-vesion mds + } else if (OB_FAIL(tablet->get_autoinc_seq(allocator, share::SCN::max_scn(), curr_autoinc_seq))) { LOG_WARN("fail to get latest autoinc seq", K(ret), KPC(tablet)); } else if (OB_FAIL(curr_autoinc_seq.get_autoinc_seq_value(curr_autoinc_seq_value))) { LOG_WARN("failed to get autoinc seq value", K(ret), KPC(tablet), K(curr_autoinc_seq)); - } else if (seq_ > curr_autoinc_seq_value) { + } else if (seq_ <= curr_autoinc_seq_value) { + need_replay = false; + } + + if (OB_SUCC(ret) && need_replay) { if (OB_FAIL(curr_autoinc_seq.set_autoinc_seq_value(allocator, seq_))) { LOG_WARN("failed to set autoinc seq value", K(ret), K(seq_), K(curr_autoinc_seq)); } else { diff --git a/src/storage/tablet/ob_tablet_binding_helper.cpp b/src/storage/tablet/ob_tablet_binding_helper.cpp index 7344d36296..895500a4f4 100644 --- a/src/storage/tablet/ob_tablet_binding_helper.cpp +++ b/src/storage/tablet/ob_tablet_binding_helper.cpp @@ -18,6 +18,7 @@ #include "lib/lock/ob_tc_rwlock.h" #include "lib/utility/ob_unify_serialize.h" #include "share/ob_rpc_struct.h" +#include "share/tablet/ob_tablet_to_ls_operator.h" #include "storage/ls/ob_ls.h" #include "storage/memtable/ob_memtable.h" #include "storage/meta_mem/ob_tenant_meta_mem_mgr.h" @@ -239,6 +240,31 @@ int ObTabletBindingHelper::modify_tablet_binding_for_new_mds_create(const ObBatc return ret; } +int ObBindHiddenTabletToOrigTabletOp::assign(const ObBindHiddenTabletToOrigTabletOp &other) +{ + int ret = OB_SUCCESS; + info_ = other.info_; + return ret; +} + +int ObBindHiddenTabletToOrigTabletOp::operator()(ObTabletBindingMdsUserData &data) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(info_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", K(ret)); + } else { + const ObTabletID &orig_tablet_id = info_->data_tablet_id_; + for (int64_t i = 0; OB_SUCC(ret) && i < info_->tablet_ids_.count(); i++) { + const ObTabletID &tablet_id = info_->tablet_ids_.at(i); + if (tablet_id != orig_tablet_id && tablet_id != data.hidden_tablet_id_) { + data.hidden_tablet_id_ = tablet_id; + } + } + } + return ret; +} + int ObTabletBindingHelper::bind_hidden_tablet_to_orig_tablet( ObLS &ls, const ObCreateTabletInfo &info, @@ -246,36 +272,33 @@ int ObTabletBindingHelper::bind_hidden_tablet_to_orig_tablet( mds::BufferCtx &ctx, const bool for_old_mds) { - return modify_tablet_binding_new_mds(ls, info.data_tablet_id_, replay_scn, ctx, for_old_mds, [&info](ObTabletBindingMdsUserData &data) -> int { - int ret = OB_SUCCESS; - const ObTabletID &orig_tablet_id = info.data_tablet_id_; - for (int64_t i = 0; OB_SUCC(ret) && i < info.tablet_ids_.count(); i++) { - const ObTabletID &tablet_id = info.tablet_ids_.at(i); - if (tablet_id != orig_tablet_id && tablet_id != data.hidden_tablet_id_) { - data.hidden_tablet_id_ = tablet_id; - } - } - return ret; - }); + ObBindHiddenTabletToOrigTabletOp op(info); + return modify_tablet_binding_new_mds(ls, info.data_tablet_id_, replay_scn, ctx, for_old_mds, op); } -int ObTabletBindingHelper::bind_lob_tablet_to_data_tablet( - ObLS &ls, - const ObBatchCreateTabletArg &arg, - const ObCreateTabletInfo &info, - const share::SCN &replay_scn, - mds::BufferCtx &ctx) +int ObBindLobTabletToDataTabletOp::assign(const ObBindLobTabletToDataTabletOp &other) { - return modify_tablet_binding_new_mds(ls, info.data_tablet_id_, replay_scn, ctx, arg.is_old_mds_, [&arg, &info](ObTabletBindingMdsUserData &data) -> int { - int ret = OB_SUCCESS; - const ObTabletID &data_tablet_id = info.data_tablet_id_; - for (int64_t i = 0; OB_SUCC(ret) && i < info.tablet_ids_.count(); i++) { - const ObTabletID &tablet_id = info.tablet_ids_.at(i); + int ret = OB_SUCCESS; + arg_ = other.arg_; + info_ = other.info_; + return ret; +} + +int ObBindLobTabletToDataTabletOp::operator()(ObTabletBindingMdsUserData &data) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(arg_) || OB_ISNULL(info_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", K(ret), KPC(arg_), KPC(info_)); + } else { + const ObTabletID &data_tablet_id = info_->data_tablet_id_; + for (int64_t i = 0; OB_SUCC(ret) && i < info_->tablet_ids_.count(); i++) { + const ObTabletID &tablet_id = info_->tablet_ids_.at(i); if (tablet_id != data_tablet_id) { - const ObCreateTabletSchema *create_tablet_schema = arg.create_tablet_schemas_.at(info.table_schema_index_.at(i)); + const ObCreateTabletSchema *create_tablet_schema = arg_->create_tablet_schemas_.at(info_->table_schema_index_.at(i)); if (OB_ISNULL(create_tablet_schema)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("storage is NULL", KR(ret), K(arg)); + LOG_WARN("storage is NULL", KR(ret), K(arg_)); } else if (create_tablet_schema->is_aux_lob_meta_table()) { data.lob_meta_tablet_id_ = tablet_id; } else if (create_tablet_schema->is_aux_lob_piece_table()) { @@ -285,8 +308,19 @@ int ObTabletBindingHelper::bind_lob_tablet_to_data_tablet( } } } - return ret; - }); + } + return ret; +} + +int ObTabletBindingHelper::bind_lob_tablet_to_data_tablet( + ObLS &ls, + const ObBatchCreateTabletArg &arg, + const ObCreateTabletInfo &info, + const share::SCN &replay_scn, + mds::BufferCtx &ctx) +{ + ObBindLobTabletToDataTabletOp op(arg, info); + return modify_tablet_binding_new_mds(ls, info.data_tablet_id_, replay_scn, ctx, arg.is_old_mds_, op); } // TODO (lihongqin.lhq) Separate the code of replay @@ -297,7 +331,7 @@ int ObTabletBindingHelper::modify_tablet_binding_new_mds( const share::SCN &replay_scn, mds::BufferCtx &ctx, bool for_old_mds, - F op) + F &&op) { MDS_TG(100_ms); int ret = OB_SUCCESS; @@ -417,6 +451,18 @@ int ObTabletUnbindMdsHelper::on_replay(const char* buf, const int64_t len, const return ret; } +int ObUnbindHiddenTabletFromOrigTabletOp::operator()(ObTabletBindingMdsUserData &data) +{ + int ret = OB_SUCCESS; + data.hidden_tablet_id_.reset(); + if (OB_INVALID_VERSION != schema_version_) { + data.redefined_ = true; + data.snapshot_version_ = OB_INVALID_VERSION; // will be fill back on commit + data.schema_version_ = schema_version_; + } + return ret; +}; + int ObTabletUnbindMdsHelper::unbind_hidden_tablets_from_orig_tablets( ObLS &ls, const ObBatchUnbindTabletArg &arg, @@ -426,21 +472,23 @@ int ObTabletUnbindMdsHelper::unbind_hidden_tablets_from_orig_tablets( int ret = OB_SUCCESS; for (int64_t i = 0; OB_SUCC(ret) && i < arg.orig_tablet_ids_.count(); i++) { const ObTabletID &orig_tablet = arg.orig_tablet_ids_.at(i); - if (OB_FAIL(ObTabletBindingHelper::modify_tablet_binding_new_mds(ls, orig_tablet, replay_scn, ctx, arg.is_old_mds_, [&arg](ObTabletBindingMdsUserData &data) -> int { - data.hidden_tablet_id_.reset(); - if (arg.is_redefined()) { - data.redefined_ = true; - data.snapshot_version_ = OB_INVALID_VERSION; // will be fill back on commit - data.schema_version_ = arg.schema_version_; - } - return OB_SUCCESS; - }))) { + ObUnbindHiddenTabletFromOrigTabletOp op(arg.schema_version_); + if (OB_FAIL(ObTabletBindingHelper::modify_tablet_binding_new_mds(ls, orig_tablet, replay_scn, ctx, arg.is_old_mds_, op))) { LOG_WARN("failed to modify tablet binding", K(ret)); } } return ret; } +int ObSetRwDefensiveOp::operator()(ObTabletBindingMdsUserData &data) +{ + int ret = OB_SUCCESS; + data.redefined_ = false; + data.snapshot_version_ = OB_INVALID_VERSION; // will be fill back on commit + data.schema_version_ = schema_version_; + return ret; +}; + int ObTabletUnbindMdsHelper::set_redefined_versions_for_hidden_tablets( ObLS &ls, const ObBatchUnbindTabletArg &arg, @@ -450,12 +498,8 @@ int ObTabletUnbindMdsHelper::set_redefined_versions_for_hidden_tablets( int ret = OB_SUCCESS; for (int64_t i = 0; OB_SUCC(ret) && i < arg.hidden_tablet_ids_.count(); i++) { const ObTabletID &hidden_tablet = arg.hidden_tablet_ids_.at(i); - if (OB_FAIL(ObTabletBindingHelper::modify_tablet_binding_new_mds(ls, hidden_tablet, replay_scn, ctx, arg.is_old_mds_, [&arg](ObTabletBindingMdsUserData &data) -> int { - data.redefined_ = false; - data.snapshot_version_ = OB_INVALID_VERSION; // will be fill back on commit - data.schema_version_ = arg.schema_version_; - return OB_SUCCESS; - }))) { + ObSetRwDefensiveOp op(arg.schema_version_); + if (OB_FAIL(ObTabletBindingHelper::modify_tablet_binding_new_mds(ls, hidden_tablet, replay_scn, ctx, arg.is_old_mds_, op))) { LOG_WARN("failed to modify tablet binding", K(ret)); } } @@ -481,5 +525,502 @@ int ObTabletUnbindMdsHelper::modify_tablet_binding_for_unbind(const ObBatchUnbin return ret; } +ObTabletBindingMdsArg::ObTabletBindingMdsArg() : tenant_id_(OB_INVALID_TENANT_ID), ls_id_(), tablet_ids_(), binding_datas_() +{ + reset(); +} + +bool ObTabletBindingMdsArg::is_valid() const +{ + return OB_INVALID_TENANT_ID != tenant_id_ && ls_id_.is_valid() && tablet_ids_.count() == binding_datas_.count(); +} + +void ObTabletBindingMdsArg::reset() +{ + tenant_id_ = OB_INVALID_TENANT_ID; + ls_id_.reset(); + tablet_ids_.reset(); + binding_datas_.reset(); +} + +int ObTabletBindingMdsArg::assign(const ObTabletBindingMdsArg &other) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!other.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(other)); + } else if (OB_FAIL(tablet_ids_.assign(other.tablet_ids_))) { + LOG_WARN("failed to assign", K(ret)); + } else if (OB_FAIL(binding_datas_.assign(other.binding_datas_))) { + LOG_WARN("failed to assign", K(ret)); + } else { + tenant_id_ = other.tenant_id_; + ls_id_ = other.ls_id_; + } + return ret; +} + +OB_SERIALIZE_MEMBER(ObTabletBindingMdsArg, tenant_id_, ls_id_, tablet_ids_, binding_datas_); + +int ObTabletBindingMdsHelper::get_sorted_ls_tablets( + const uint64_t tenant_id, + const ObIArray &tablet_ids, + ObArray> &ls_tablets, + ObMySQLTransaction &trans) +{ + int ret = OB_SUCCESS; + ls_tablets.reset(); + ObArray ls_ids; + if (OB_FAIL(ObTabletToLSTableOperator::batch_get_ls(trans, tenant_id, tablet_ids, ls_ids))) { + LOG_WARN("failed to batch get ls", K(ret)); + } else if (OB_UNLIKELY(tablet_ids.count() != ls_ids.count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid tablet ids ls ids", K(ret), K(tablet_ids), K(ls_ids)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); i++) { + if (OB_FAIL(ls_tablets.push_back(std::make_pair(ls_ids[i], tablet_ids.at(i))))) { + LOG_WARN("failed to push back tablet id and ls id", K(ret)); + } + } + if (OB_SUCC(ret)) { + lib::ob_sort(ls_tablets.begin(), ls_tablets.end(), LSTabletCmp()); + } + return ret; +} + +int ObTabletBindingMdsHelper::batch_get_tablet_binding( + const int64_t abs_timeout_us, + const ObBatchGetTabletBindingArg &arg, + ObBatchGetTabletBindingRes &res) +{ + int ret = OB_SUCCESS; + uint64_t tenant_id = arg.tenant_id_; + if (OB_UNLIKELY(!arg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(arg)); + } else { + MTL_SWITCH(tenant_id) { + ObLSService *ls_service = MTL(ObLSService *); + logservice::ObLogService *log_service = MTL(logservice::ObLogService*); + ObLSHandle ls_handle; + ObLS *ls = nullptr; + ObRole role = INVALID_ROLE; + if (OB_ISNULL(ls_service) || OB_ISNULL(log_service)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected ls_service or log_service", K(ret)); + } else if (OB_FAIL(ls_service->get_ls(arg.ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { + LOG_WARN("get ls failed", K(ret), K(arg)); + } else if (OB_ISNULL(ls = ls_handle.get_ls())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid ls", K(ret), K(arg.ls_id_)); + } else if (OB_FAIL(ls->get_ls_role(role))) { + LOG_WARN("get role failed", K(ret), K(MTL_ID()), K(arg.ls_id_)); + } else if (OB_UNLIKELY(ObRole::LEADER != role)) { + ret = OB_NOT_MASTER; + LOG_WARN("ls not leader", K(ret), K(MTL_ID()), K(arg.ls_id_)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < arg.tablet_ids_.count(); i++) { + const ObTabletID &tablet_id = arg.tablet_ids_.at(i); + ObTabletHandle tablet_handle; + ObTabletBindingMdsUserData data; + if (OB_FAIL(ObTabletBindingHelper::get_tablet_for_new_mds(*ls, tablet_id, share::SCN::invalid_scn(), tablet_handle))) { + LOG_WARN("failed to get tablet", K(ret), K(tablet_id), K(abs_timeout_us)); + } else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_data(share::SCN::max_scn(), data, abs_timeout_us - ObTimeUtility::current_time()))) { + LOG_WARN("failed to update tablet autoinc seq", K(ret), K(abs_timeout_us)); + } else if (OB_FAIL(res.binding_datas_.push_back(data))) { + LOG_WARN("failed to push back", K(ret)); + } + + // currently not support to read uncommitted mds set by this transaction, so check and avoid such usage + if (OB_SUCC(ret) && arg.check_committed_) { + ObTabletBindingMdsUserData tmp_data; + bool is_committed = true; + if (OB_FAIL(tablet_handle.get_obj()->get_latest_ddl_data(tmp_data, is_committed))) { + if (OB_EMPTY_RESULT == ret) { + is_committed = true; + ret = OB_SUCCESS; + } else { + LOG_WARN("failed to get latest ddl data", K(ret)); + } + } else if (OB_UNLIKELY(!is_committed)) { + ret = OB_EAGAIN; + LOG_WARN("check committed failed", K(ret), K(tenant_id), K(arg.ls_id_), K(tablet_id), K(tmp_data)); + } + } + } + } + } + } + return ret; +} + +int ObTabletBindingMdsHelper::get_tablet_binding_mds_by_rpc( + const uint64_t tenant_id, + const ObLSID &ls_id, + const ObIArray &tablet_ids, + const int64_t abs_timeout_us, + ObIArray &datas) +{ + int ret = OB_SUCCESS; + const int64_t cluster_id = GCONF.cluster_id; + obrpc::ObSrvRpcProxy *srv_rpc_proxy = nullptr; + share::ObLocationService *location_service = nullptr; + ObAddr leader_addr; + obrpc::ObBatchGetTabletBindingArg arg; + obrpc::ObBatchGetTabletBindingRes res; + if (OB_ISNULL(srv_rpc_proxy = GCTX.srv_rpc_proxy_) + || OB_ISNULL(location_service = GCTX.location_service_)) { + ret = OB_ERR_SYS; + LOG_WARN("root service or location_cache is null", K(ret), KP(srv_rpc_proxy), KP(location_service)); + } else if (OB_FAIL(arg.init(tenant_id, ls_id, tablet_ids, true/*check_committed*/))) { + LOG_WARN("failed to init arg", K(ret), K(tenant_id), K(ls_id), K(ls_id)); + } else { + bool force_renew = false; + bool finish = false; + for (int64_t retry_times = 0; OB_SUCC(ret) && !finish; retry_times++) { + if (OB_FAIL(location_service->get_leader(cluster_id, tenant_id, ls_id, force_renew, leader_addr))) { + LOG_WARN("fail to get ls locaiton leader", KR(ret), K(tenant_id), K(ls_id)); + } else if (OB_FAIL(srv_rpc_proxy->to(leader_addr).timeout(abs_timeout_us - ObTimeUtility::current_time()).batch_get_tablet_binding(arg, res))) { + LOG_WARN("fail to batch get tablet binding", K(ret), K(retry_times), K(abs_timeout_us)); + } else { + finish = true; + } + if (OB_FAIL(ret)) { + force_renew = true; + if (OB_LS_LOCATION_LEADER_NOT_EXIST == ret || OB_GET_LOCATION_TIME_OUT == ret || OB_NOT_MASTER == ret + || OB_ERR_SHARED_LOCK_CONFLICT == ret || OB_LS_OFFLINE == ret + || OB_NOT_INIT == ret || OB_LS_NOT_EXIST == ret || OB_TABLET_NOT_EXIST == ret || OB_TENANT_NOT_IN_SERVER == ret || OB_LS_LOCATION_NOT_EXIST == ret) { + // overwrite ret + if (OB_UNLIKELY(ObTimeUtility::current_time() > abs_timeout_us)) { + ret = OB_TIMEOUT; + LOG_WARN("timeout", K(ret), K(abs_timeout_us)); + } else if (OB_FAIL(THIS_WORKER.check_status())) { + LOG_WARN("failed to check status", K(ret), K(abs_timeout_us)); + } else if (retry_times >= 3) { + ob_usleep(100 * 1000L); // 100ms + } + } + } + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(datas.assign(res.binding_datas_))) { + LOG_WARN("failed to assign", K(ret)); + } + return ret; +} + +int ObTabletBindingMdsHelper::modify_tablet_binding_for_create( + const uint64_t tenant_id, + const obrpc::ObBatchCreateTabletArg &arg, + const int64_t abs_timeout_us, + ObMySQLTransaction &trans) +{ + int ret = OB_SUCCESS; + const ObLSID &ls_id = arg.id_; + + if (OB_SUCC(ret)) { + ObArray batch_tablet_ids; + ObArray batch_ops; + for (int64_t i = 0; OB_SUCC(ret) && i < arg.tablets_.count(); i++) { + const ObCreateTabletInfo &info = arg.tablets_[i]; + const bool is_last = i == arg.tablets_.count() - 1; + if (ObTabletCreateDeleteHelper::is_pure_hidden_tablets(info)) { + ObBindHiddenTabletToOrigTabletOp op(info); + if (OB_FAIL(batch_tablet_ids.push_back(info.data_tablet_id_))) { + LOG_WARN("failed to push back", K(ret)); + } else if (OB_FAIL(batch_ops.push_back(op))) { + LOG_WARN("failed to push back", K(ret)); + } + } + if (OB_FAIL(ret)) { + } else if (is_last || batch_tablet_ids.count() >= ObTabletBindingMdsArg::BATCH_TABLET_CNT) { + if (OB_FAIL(modify_tablet_binding_(tenant_id, ls_id, batch_tablet_ids, abs_timeout_us, + ModifyBindingByOps(batch_ops), trans))) { + LOG_WARN("failed to modify tablet binding", K(ret)); + } else { + batch_tablet_ids.reuse(); + batch_ops.reuse(); + } + } + } + if (OB_SUCC(ret) && OB_UNLIKELY(!batch_tablet_ids.empty() || !batch_ops.empty())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("batch not consumed out", K(ret), K(batch_tablet_ids), K(batch_ops)); + } + } + + if (OB_SUCC(ret)) { + ObArray batch_tablet_ids; + ObArray batch_ops; + for (int64_t i = 0; OB_SUCC(ret) && i < arg.tablets_.count(); i++) { + const ObCreateTabletInfo &info = arg.tablets_[i]; + const bool is_last = i == arg.tablets_.count() - 1; + bool has_lob = false; + if (OB_FAIL(ObTabletBindingHelper::has_lob_tablets(arg, info, has_lob))) { + LOG_WARN("failed to has_lob_tablets", KR(ret)); + } else if (has_lob) { + ObBindLobTabletToDataTabletOp op(arg, info); + if (OB_FAIL(batch_tablet_ids.push_back(info.data_tablet_id_))) { + LOG_WARN("failed to push back", K(ret)); + } else if (OB_FAIL(batch_ops.push_back(op))) { + LOG_WARN("failed to push back", K(ret)); + } + } + if (OB_FAIL(ret)) { + } else if (is_last || batch_tablet_ids.count() >= ObTabletBindingMdsArg::BATCH_TABLET_CNT) { + if (OB_FAIL(modify_tablet_binding_(tenant_id, ls_id, batch_tablet_ids, abs_timeout_us, + ModifyBindingByOps(batch_ops), trans))) { + LOG_WARN("failed to modify tablet binding", K(ret)); + } else { + batch_tablet_ids.reuse(); + batch_ops.reuse(); + } + } + } + if (OB_SUCC(ret) && OB_UNLIKELY(!batch_tablet_ids.empty() || !batch_ops.empty())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("batch not consumed out", K(ret), K(batch_tablet_ids), K(batch_ops)); + } + } + return ret; +} + +// redefined_schema_version is not OB_INVALID_VERSION iff for ddl succ +int ObTabletBindingMdsHelper::modify_tablet_binding_for_unbind( + const uint64_t tenant_id, + const ObIArray &orig_tablet_ids, + const ObIArray &hidden_tablet_ids, + const int64_t redefined_schema_version, + const int64_t abs_timeout_us, + ObMySQLTransaction &trans) +{ + int ret = OB_SUCCESS; + if (OB_SUCC(ret)) { + ObUnbindHiddenTabletFromOrigTabletOp op(redefined_schema_version); + if (OB_FAIL(modify_tablet_binding_(tenant_id, orig_tablet_ids, abs_timeout_us, op, trans))) { + LOG_WARN("failed to modify tablet binding", K(ret)); + } + } + + if (OB_SUCC(ret) && OB_INVALID_VERSION != redefined_schema_version) { + ObSetRwDefensiveOp op(redefined_schema_version); + if (OB_FAIL(modify_tablet_binding_(tenant_id, hidden_tablet_ids, abs_timeout_us, op, trans))) { + LOG_WARN("failed to modify tablet binding", K(ret)); + } + } + return ret; +} + +int ObTabletBindingMdsHelper::modify_tablet_binding_for_rw_defensive( + const uint64_t tenant_id, + const ObIArray &tablet_ids, + const int64_t schema_version, + const int64_t abs_timeout_us, + ObMySQLTransaction &trans) +{ + int ret = OB_SUCCESS; + ObSetRwDefensiveOp op(schema_version); + if (OB_UNLIKELY(OB_INVALID_VERSION == schema_version)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", K(ret), K(tenant_id), K(schema_version), K(tablet_ids)); + } else if (OB_FAIL(modify_tablet_binding_(tenant_id, tablet_ids, abs_timeout_us, op, trans))) { + LOG_WARN("failed to modify tablet binding", K(ret)); + } + return ret; +} + +template +int ObTabletBindingMdsHelper::modify_tablet_binding_( + const uint64_t tenant_id, + const share::ObLSID &ls_id, + const ObIArray &tablet_ids, + const int64_t abs_timeout_us, + F &&op, + ObMySQLTransaction &trans) +{ + int ret = OB_SUCCESS; + if (!tablet_ids.empty()) { + ObArray old_datas; + if (OB_FAIL(get_tablet_binding_mds_by_rpc(tenant_id, ls_id, tablet_ids, abs_timeout_us, old_datas))) { + LOG_WARN("failed to get tablet binding mds", K(ret)); + } else { + ObTabletBindingMdsArg arg; + arg.tenant_id_ = tenant_id; + arg.ls_id_ = ls_id; + if (OB_FAIL(arg.tablet_ids_.assign(tablet_ids))) { + LOG_WARN("failed to assign", K(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < old_datas.count(); i++) { + if (OB_FAIL(arg.binding_datas_.push_back(old_datas.at(i)))) { + LOG_WARN("failed to push back", K(ret)); + } else { + ObTabletBindingMdsUserData &data = arg.binding_datas_.at(arg.binding_datas_.count() - 1); + ret = op(i, data); + } + } + if (OB_FAIL(ret)) { + } else if (OB_UNLIKELY(!arg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", K(ret), K(arg)); + } else if (OB_FAIL(register_mds_(arg, trans))) { + LOG_WARN("failed to register mds", K(ret)); + } + } + } + return ret; +} + +template +int ObTabletBindingMdsHelper::modify_tablet_binding_( + const uint64_t tenant_id, + const ObIArray &tablet_ids, + const int64_t abs_timeout_us, + F &&op, + ObMySQLTransaction &trans) +{ + int ret = OB_SUCCESS; + if (!tablet_ids.empty()) { + ObArray> ls_tablets; + ObArray this_batch_tablet_ids; + if (OB_FAIL(get_sorted_ls_tablets(tenant_id, tablet_ids, ls_tablets, trans))) { + LOG_WARN("failed to get sorted ls tablets", K(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < ls_tablets.count(); i++) { + const ObLSID &ls_id = ls_tablets.at(i).first; + const ObTabletID &tablet_id = ls_tablets.at(i).second; + const bool is_last_or_next_ls_id_changed = i == ls_tablets.count() - 1 || ls_id != ls_tablets.at(i+1).first; + if (OB_FAIL(this_batch_tablet_ids.push_back(tablet_id))) { + LOG_WARN("failed to push back", K(ret)); + } else if (is_last_or_next_ls_id_changed || this_batch_tablet_ids.count() >= ObTabletBindingMdsArg::BATCH_TABLET_CNT) { + if (OB_FAIL(modify_tablet_binding_(tenant_id, ls_id, this_batch_tablet_ids, abs_timeout_us, + ModifyBindingByOp(op), trans))) { + LOG_WARN("failed to modify tablet binding", K(ret)); + } else { + this_batch_tablet_ids.reuse(); + } + } + } + if (OB_SUCC(ret) && OB_UNLIKELY(!this_batch_tablet_ids.empty())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("batch not consumed out", K(ret), K(this_batch_tablet_ids)); + } + } + return ret; +} + +int ObTabletBindingMdsHelper::register_mds_( + const ObTabletBindingMdsArg &arg, + ObMySQLTransaction &trans) +{ + int ret = OB_SUCCESS; + sqlclient::ObISQLConnection *isql_conn = nullptr; + if (OB_ISNULL(isql_conn = trans.get_connection())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid connection", K(ret)); + } else { + const int64_t size = arg.get_serialize_size(); + ObArenaAllocator allocator("TblBind"); + char *buf = nullptr; + int64_t pos = 0; + if (OB_ISNULL(buf = static_cast(allocator.alloc(size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate", K(ret)); + } else if (OB_FAIL(arg.serialize(buf, size, pos))) { + LOG_WARN("failed to serialize arg", K(ret)); + } else if (OB_FAIL(static_cast(isql_conn)->register_multi_data_source( + arg.tenant_id_, arg.ls_id_, ObTxDataSourceType::TABLET_BINDING, buf, pos))) { + LOG_WARN("failed to register mds", K(ret)); + } + } + return ret; +} + +int ObTabletBindingMdsHelper::on_register(const char* buf, const int64_t len, mds::BufferCtx &ctx) +{ + int ret = OB_SUCCESS; + int64_t pos = 0; + ObTabletBindingMdsArg arg; + if (OB_FAIL(arg.deserialize(buf, len, pos))) { + LOG_WARN("failed to deserialize arg", K(ret)); + } else if (OB_FAIL(modify_(arg, SCN::invalid_scn(), ctx))) { + LOG_WARN("failed to register_process", K(ret)); + } + return ret; +} + +int ObTabletBindingMdsHelper::on_replay(const char* buf, const int64_t len, const share::SCN &scn, mds::BufferCtx &ctx) +{ + int ret = OB_SUCCESS; + int64_t pos = 0; + ObTabletBindingMdsArg arg; + if (OB_FAIL(arg.deserialize(buf, len, pos))) { + LOG_WARN("failed to deserialize arg", K(ret)); + } else if (OB_FAIL(modify_(arg, scn, ctx))) { + LOG_WARN("failed to register_process", K(ret)); + } + return ret; +} + +int ObTabletBindingMdsHelper::modify_( + const ObTabletBindingMdsArg &arg, + const share::SCN &scn, + mds::BufferCtx &ctx) +{ + int ret = OB_SUCCESS; + const share::ObLSID &ls_id = arg.ls_id_; + ObLSHandle ls_handle; + ObLS *ls = nullptr; + if (OB_UNLIKELY(!arg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", K(ret), K(arg)); + } else if (OB_FAIL(MTL(ObLSService*)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) { + LOG_WARN("fail to get ls", KR(ret), K(arg)); + } else if (OB_UNLIKELY(nullptr == (ls = ls_handle.get_ls()))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls should not be NULL", KR(ret), K(arg), KP(ls)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < arg.tablet_ids_.count(); i++) { + const ObTabletID &tablet_id = arg.tablet_ids_[i]; + if (OB_FAIL(set_tablet_binding_mds_(*ls, tablet_id, scn, arg.binding_datas_[i], ctx))) { + LOG_WARN("failed to set tablet binding mds", K(ret), K(ls_id), K(tablet_id), K(scn)); + } + } + LOG_INFO("modify tablet binding data", K(ret), K(scn), K(ctx.get_writer()), K(arg)); + return ret; +} + +int ObTabletBindingMdsHelper::set_tablet_binding_mds_( + ObLS &ls, + const ObTabletID &tablet_id, + const share::SCN &replay_scn, + const ObTabletBindingMdsUserData &data, + mds::BufferCtx &ctx) +{ + MDS_TG(100_ms); + int ret = OB_SUCCESS; + if (!replay_scn.is_valid()) { + mds::MdsCtx &user_ctx = static_cast(ctx); + ObLSHandle ls_handle; + if (CLICK_FAIL(ls.get_tablet_svr()->set_ddl_info(tablet_id, data, user_ctx, 0/*lock_timeout_us*/))) { + if (OB_ERR_EXCLUSIVE_LOCK_CONFLICT == ret) { + ret = OB_EAGAIN; + } else { + LOG_WARN("failed to save tablet binding info", K(ret)); + } + } + } else { + ObTabletBindingReplayExecutor replay_executor; + if (CLICK_FAIL(replay_executor.init(ctx, data, replay_scn, false/*for_old_mds*/))) { + LOG_WARN("failed to init replay executor", K(ret)); + } else if (CLICK_FAIL(replay_executor.execute(replay_scn, ls.get_ls_id(), tablet_id))) { + if (OB_EAGAIN != ret) { + LOG_WARN("failed to replay tablet binding log", K(ret)); + } + } + } + return ret; +} + } // namespace storage } // namespace oceanbase diff --git a/src/storage/tablet/ob_tablet_binding_helper.h b/src/storage/tablet/ob_tablet_binding_helper.h index e258327482..551949951a 100644 --- a/src/storage/tablet/ob_tablet_binding_helper.h +++ b/src/storage/tablet/ob_tablet_binding_helper.h @@ -16,8 +16,10 @@ #include "common/ob_tablet_id.h" #include "lib/container/ob_array.h" #include "lib/container/ob_array_serialization.h" +#include "lib/mysqlclient/ob_mysql_transaction.h" #include "lib/ob_define.h" #include "share/ob_ls_id.h" +#include "storage/tablet/ob_tablet_binding_mds_user_data.h" namespace oceanbase { @@ -26,6 +28,8 @@ namespace obrpc struct ObBatchCreateTabletArg; struct ObBatchRemoveTabletArg; struct ObCreateTabletInfo; +struct ObBatchGetTabletBindingArg; +struct ObBatchGetTabletBindingRes; } namespace share @@ -53,6 +57,7 @@ class ObTabletHandle; class ObTabletTxMultiSourceDataUnit; class ObTabletMapKey; +// deprecated class ObBatchUnbindTabletArg final { public: @@ -77,6 +82,7 @@ private: DISALLOW_COPY_AND_ASSIGN(ObBatchUnbindTabletArg); }; +// deprecated class ObTabletBindingHelper final { public: @@ -93,7 +99,7 @@ public: // common template - static int modify_tablet_binding_new_mds(ObLS &ls, const ObTabletID &tablet_id, const share::SCN &replay_scn, mds::BufferCtx &ctx, const bool for_old_mds, F op); + static int modify_tablet_binding_new_mds(ObLS &ls, const ObTabletID &tablet_id, const share::SCN &replay_scn, mds::BufferCtx &ctx, const bool for_old_mds, F &&op); static int has_lob_tablets(const obrpc::ObBatchCreateTabletArg &arg, const obrpc::ObCreateTabletInfo &info, bool &has_lob); static int get_ls(const share::ObLSID &ls_id, ObLSHandle &ls_handle); private: @@ -103,6 +109,7 @@ private: DISALLOW_COPY_AND_ASSIGN(ObTabletBindingHelper); }; +// deprecated class ObTabletUnbindMdsHelper { public: @@ -117,6 +124,180 @@ private: static int modify_tablet_binding_for_unbind(const ObBatchUnbindTabletArg &arg, const share::SCN &replay_scn, mds::BufferCtx &ctx); }; +class ObTabletBindingMdsArg final +{ +public: + OB_UNIS_VERSION(1); +public: + // arg with such tablet cnt cannot be more than mds buffer limit (1.5M) + const static int64_t BATCH_TABLET_CNT = 8192; + ObTabletBindingMdsArg(); + ~ObTabletBindingMdsArg() {} + bool is_valid() const; + void reset(); + int assign(const ObTabletBindingMdsArg &other); + TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_ids), K_(binding_datas)); +public: + uint64_t tenant_id_; + uint64_t tenant_data_version_; + share::ObLSID ls_id_; + ObSArray tablet_ids_; + ObSArray binding_datas_; +private: + DISALLOW_COPY_AND_ASSIGN(ObTabletBindingMdsArg); +}; + +class ObBindHiddenTabletToOrigTabletOp final +{ +public: + ObBindHiddenTabletToOrigTabletOp() : info_(nullptr) {} + ObBindHiddenTabletToOrigTabletOp(const obrpc::ObCreateTabletInfo &info) : info_(&info) {} + ~ObBindHiddenTabletToOrigTabletOp() = default; + int assign(const ObBindHiddenTabletToOrigTabletOp &other); + int operator()(ObTabletBindingMdsUserData &data); + TO_STRING_KV(KPC_(info)); +private: + const obrpc::ObCreateTabletInfo *info_; +private: + DISALLOW_COPY_AND_ASSIGN(ObBindHiddenTabletToOrigTabletOp); +}; + +class ObBindLobTabletToDataTabletOp final +{ +public: + ObBindLobTabletToDataTabletOp() : arg_(nullptr), info_(nullptr) {} + ObBindLobTabletToDataTabletOp(const obrpc::ObBatchCreateTabletArg &arg, const obrpc::ObCreateTabletInfo &info) + : arg_(&arg), info_(&info) {} + int assign(const ObBindLobTabletToDataTabletOp &other); + ~ObBindLobTabletToDataTabletOp() = default; + int operator()(ObTabletBindingMdsUserData &data); + TO_STRING_KV(KPC_(arg), KPC_(info)); +private: + const obrpc::ObBatchCreateTabletArg *arg_; + const obrpc::ObCreateTabletInfo *info_; +}; + +class ObUnbindHiddenTabletFromOrigTabletOp final +{ +public: + ObUnbindHiddenTabletFromOrigTabletOp(const int64_t schema_version) + : schema_version_(schema_version) {} + ~ObUnbindHiddenTabletFromOrigTabletOp() = default; + int operator()(ObTabletBindingMdsUserData &data); +private: + int64_t schema_version_; +private: + DISALLOW_COPY_AND_ASSIGN(ObUnbindHiddenTabletFromOrigTabletOp); +}; + +class ObSetRwDefensiveOp final +{ +public: + ObSetRwDefensiveOp(const int64_t schema_version) + : schema_version_(schema_version) {} + ~ObSetRwDefensiveOp() = default; + int operator()(ObTabletBindingMdsUserData &data); +private: + int64_t schema_version_; +private: + DISALLOW_COPY_AND_ASSIGN(ObSetRwDefensiveOp); +}; + +template +struct ModifyBindingByOp final +{ + ModifyBindingByOp(F &&op) : op_(op) {} + ~ModifyBindingByOp() = default; + int operator()(const int64_t i, ObTabletBindingMdsUserData &data) { return op_(data); } + F &&op_; +}; + +template +struct ModifyBindingByOps final +{ + ModifyBindingByOps(ObIArray &ops) : ops_(ops) {} + ~ModifyBindingByOps() = default; + int operator()(const int64_t i, ObTabletBindingMdsUserData &data) { return ops_.at(i)(data); } + ObIArray &ops_; +}; + +struct LSTabletCmp final +{ + bool operator()(const std::pair &lhs, const std::pair &rhs) { + return lhs.first != rhs.first ? lhs.first < rhs.first : lhs.second < rhs.second; + } +}; + +class ObTabletBindingMdsHelper +{ +public: + static int on_register(const char* buf, const int64_t len, mds::BufferCtx &ctx); + static int on_replay(const char* buf, const int64_t len, const share::SCN &scn, mds::BufferCtx &ctx); + +public: + static int get_sorted_ls_tablets( + const uint64_t tenant_id, + const ObIArray &tablet_ids, + ObArray> &ls_tablets, + ObMySQLTransaction &trans); + static int batch_get_tablet_binding( + const int64_t abs_timeout_us, + const obrpc::ObBatchGetTabletBindingArg &arg, + obrpc::ObBatchGetTabletBindingRes &res); + static int get_tablet_binding_mds_by_rpc( + const uint64_t tenant_id, + const share::ObLSID &ls_id, + const ObIArray &tablet_ids, + const int64_t abs_timeout_us, + ObIArray &datas); + static int modify_tablet_binding_for_create( + const uint64_t tenant_id, + const obrpc::ObBatchCreateTabletArg &arg, + const int64_t abs_timeout_us, + ObMySQLTransaction &trans); + static int modify_tablet_binding_for_unbind( + const uint64_t tenant_id, + const ObIArray &orig_tablet_ids, + const ObIArray &hidden_tablet_ids, + const int64_t redefined_schema_version, + const int64_t abs_timeout_us, + ObMySQLTransaction &trans); + static int modify_tablet_binding_for_rw_defensive( + const uint64_t tenant_id, + const ObIArray &tablet_ids, + const int64_t schema_version, + const int64_t abs_timeout_us, + ObMySQLTransaction &trans); + +private: + template + static int modify_tablet_binding_( + const uint64_t tenant_id, + const share::ObLSID &ls_id, + const ObIArray &tablet_ids, + const int64_t abs_timeout_us, + F &&op, + ObMySQLTransaction &trans); + template + static int modify_tablet_binding_( + const uint64_t tenant_id, + const ObIArray &tablet_ids, + const int64_t abs_timeout_us, + F &&op, + ObMySQLTransaction &trans); + static int register_mds_( + const ObTabletBindingMdsArg &arg, + ObMySQLTransaction &trans); + + static int modify_(const ObTabletBindingMdsArg &arg, const share::SCN &scn, mds::BufferCtx &ctx); + static int set_tablet_binding_mds_( + ObLS &ls, + const ObTabletID &tablet_id, + const share::SCN &replay_scn, + const ObTabletBindingMdsUserData &data, + mds::BufferCtx &ctx); +}; + } // namespace storage } // namespace oceanbase diff --git a/src/storage/tablet/ob_tablet_binding_replay_executor.cpp b/src/storage/tablet/ob_tablet_binding_replay_executor.cpp index dd7abc53b0..32818420d7 100644 --- a/src/storage/tablet/ob_tablet_binding_replay_executor.cpp +++ b/src/storage/tablet/ob_tablet_binding_replay_executor.cpp @@ -26,7 +26,7 @@ ObTabletBindingReplayExecutor::ObTabletBindingReplayExecutor() int ObTabletBindingReplayExecutor::init( mds::BufferCtx &user_ctx, - ObTabletBindingMdsUserData &user_data, + const ObTabletBindingMdsUserData &user_data, const share::SCN &scn, const bool for_old_mds) { diff --git a/src/storage/tablet/ob_tablet_binding_replay_executor.h b/src/storage/tablet/ob_tablet_binding_replay_executor.h index 576c6d2fb3..27bf5ea43e 100644 --- a/src/storage/tablet/ob_tablet_binding_replay_executor.h +++ b/src/storage/tablet/ob_tablet_binding_replay_executor.h @@ -30,7 +30,7 @@ public: int init( mds::BufferCtx &user_ctx, - ObTabletBindingMdsUserData &user_data, + const ObTabletBindingMdsUserData &user_data, const share::SCN &scn, const bool for_old_mds); @@ -51,7 +51,7 @@ protected: private: mds::BufferCtx *user_ctx_; - ObTabletBindingMdsUserData *user_data_; + const ObTabletBindingMdsUserData *user_data_; share::SCN scn_; bool for_old_mds_; }; diff --git a/src/storage/tablet/ob_tablet_create_mds_helper.cpp b/src/storage/tablet/ob_tablet_create_mds_helper.cpp index 19d0d07ed6..ecdb7892bc 100644 --- a/src/storage/tablet/ob_tablet_create_mds_helper.cpp +++ b/src/storage/tablet/ob_tablet_create_mds_helper.cpp @@ -69,7 +69,7 @@ int ObTabletCreateMdsHelper::register_process( LOG_WARN("unexpected error, arg is not valid", K(ret), K(arg)); } else if (CLICK_FAIL(create_tablets(arg, false/*for_replay*/, share::SCN::invalid_scn(), ctx, tablet_id_array))) { LOG_WARN("failed to create tablets", K(ret), K(arg)); - } else if (CLICK_FAIL(ObTabletBindingHelper::modify_tablet_binding_for_new_mds_create(arg, SCN::invalid_scn(), ctx))) { + } else if (!arg.set_binding_info_outside_create() && CLICK_FAIL(ObTabletBindingHelper::modify_tablet_binding_for_new_mds_create(arg, SCN::invalid_scn(), ctx))) { LOG_WARN("failed to modify tablet binding", K(ret)); } @@ -144,7 +144,7 @@ int ObTabletCreateMdsHelper::replay_process( K(ret), K(scn), K(tablet_change_checkpoint_scn)); } else if (CLICK_FAIL(create_tablets(arg, true/*for_replay*/, scn, ctx, tablet_id_array))) { LOG_WARN("failed to create tablets", K(ret), K(arg), K(scn)); - } else if (CLICK_FAIL(ObTabletBindingHelper::modify_tablet_binding_for_new_mds_create(arg, scn, ctx))) { + } else if (!arg.set_binding_info_outside_create() && CLICK_FAIL(ObTabletBindingHelper::modify_tablet_binding_for_new_mds_create(arg, scn, ctx))) { LOG_WARN("failed to modify tablet binding", K(ret)); } else if (CLICK_FAIL(ObTabletCreateDeleteMdsUserData::set_tablet_gc_trigger(ls_id))) { LOG_WARN("failed to trigger tablet gc task", K(ret)); diff --git a/src/storage/tx/ob_multi_data_source_printer.cpp b/src/storage/tx/ob_multi_data_source_printer.cpp index aa13dc9d03..edd13848c9 100644 --- a/src/storage/tx/ob_multi_data_source_printer.cpp +++ b/src/storage/tx/ob_multi_data_source_printer.cpp @@ -49,6 +49,7 @@ const char *ObMultiDataSourcePrinter::to_str_mds_type(const ObTxDataSourceType & TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TRANSFER_MOVE_TX_CTX); TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TRANSFER_DEST_PREPARE); TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, CHANGE_TABLET_TO_TABLE_MDS); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TABLET_BINDING); TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, MAX_TYPE); } @@ -71,4 +72,4 @@ const char *ObMultiDataSourcePrinter::to_str_notify_type(const NotifyType ¬if return str; } } -} \ No newline at end of file +} diff --git a/src/storage/tx/ob_tx_log.cpp b/src/storage/tx/ob_tx_log.cpp index be8b90c023..2954f3eb8c 100644 --- a/src/storage/tx/ob_tx_log.cpp +++ b/src/storage/tx/ob_tx_log.cpp @@ -41,7 +41,8 @@ ObTxLogTypeChecker::need_replay_barrier(const ObTxLogType log_type, || data_source_type == ObTxDataSourceType::START_TRANSFER_OUT || data_source_type == ObTxDataSourceType::START_TRANSFER_OUT_PREPARE || data_source_type == ObTxDataSourceType::FINISH_TRANSFER_OUT - || data_source_type == ObTxDataSourceType::START_TRANSFER_IN) { + || data_source_type == ObTxDataSourceType::START_TRANSFER_IN + || data_source_type == ObTxDataSourceType::TABLET_BINDING) { barrier_flag = logservice::ObReplayBarrierType::PRE_BARRIER;