diff --git a/src/rootserver/CMakeLists.txt b/src/rootserver/CMakeLists.txt index b8beb598e9..64cabb5ec3 100644 --- a/src/rootserver/CMakeLists.txt +++ b/src/rootserver/CMakeLists.txt @@ -128,6 +128,7 @@ ob_set_subtarget(ob_rootserver parallel_ddl parallel_ddl/ob_create_table_helper.cpp parallel_ddl/ob_create_view_helper.cpp parallel_ddl/ob_index_name_checker.cpp + parallel_ddl/ob_tablet_balance_allocator.cpp ) ob_set_subtarget(ob_rootserver freeze diff --git a/src/rootserver/balance/ob_balance_group_define.cpp b/src/rootserver/balance/ob_balance_group_define.cpp index dab5462791..6ff2052d26 100644 --- a/src/rootserver/balance/ob_balance_group_define.cpp +++ b/src/rootserver/balance/ob_balance_group_define.cpp @@ -27,6 +27,8 @@ using namespace share::schema; namespace rootserver { +const char* ObBalanceGroup::NON_PART_BG_NAME = "NON_PART_TABLE"; + int ObBalanceGroup::init_by_tablegroup(const ObSimpleTablegroupSchema &tg, const int64_t max_part_level, const int64_t part_group_index/* = 0*/) @@ -85,7 +87,7 @@ int ObBalanceGroup::init_by_table(const ObSimpleTableSchemaV2 &table_schema, LOG_WARN("table is in tablegroup, should init balance group by tablegroup", KR(ret), K(table_schema)); } else if (PARTITION_LEVEL_ZERO == part_level) { // All tenant's non-partition table is a balance group - if (OB_FAIL(bg_name_str.append_fmt("NON_PART_TABLE"))) { + if (OB_FAIL(bg_name_str.append_fmt("%s", NON_PART_BG_NAME))) { LOG_WARN("fail to append fmt", KR(ret), K(table_schema)); } else { id_ = ObBalanceGroupID(0, 0); diff --git a/src/rootserver/balance/ob_balance_group_define.h b/src/rootserver/balance/ob_balance_group_define.h index 99e331f0e7..450023a2e3 100644 --- a/src/rootserver/balance/ob_balance_group_define.h +++ b/src/rootserver/balance/ob_balance_group_define.h @@ -96,7 +96,8 @@ public: } TO_STRING_KV(K_(id), K_(name)); - +public: + const static char* NON_PART_BG_NAME; private: ObBalanceGroupID id_; ObBalanceGroupName name_; diff --git a/src/rootserver/ob_balance_group_ls_stat_operator.cpp b/src/rootserver/ob_balance_group_ls_stat_operator.cpp index f85554edf5..eea76b6864 100755 --- a/src/rootserver/ob_balance_group_ls_stat_operator.cpp +++ b/src/rootserver/ob_balance_group_ls_stat_operator.cpp @@ -35,6 +35,8 @@ #include "share/ls/ob_ls_table_operator.h" // ObLSTableOperator #include "share/location_cache/ob_location_service.h" // ObLocationService #include "share/ob_rpc_struct.h" // ObCreateDupLSArg & ObCreateDupLSResult +#include "rootserver/ob_root_service.h" +#include "rootserver/parallel_ddl/ob_tablet_balance_allocator.h" namespace oceanbase { @@ -338,6 +340,46 @@ int ObBalanceGroupLSStatOperator::insert_update_balance_group_ls_stat( return ret; } +int ObBalanceGroupLSStatOperator::inc_balance_group_ls_stat( + const int64_t timeout, + common::ObISQLClient &sql_client, + const uint64_t tenant_id, + const ObBalanceGroupLSStat &ls_stat) +{ + int ret = OB_SUCCESS; + common::ObTimeoutCtx timeout_ctx; + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (OB_UNLIKELY( + timeout <= 0 + || OB_INVALID_TENANT_ID == tenant_id + || !ls_stat.get_balance_group_id().is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_stat)); + } else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx( + timeout_ctx, timeout))) { + LOG_WARN("fail to set timeout", KR(ret), K(timeout)); + } else { + const uint64_t sql_tenant_id = gen_meta_tenant_id(tenant_id); + common::ObSqlString inc_sql; + int64_t affected_rows = 0; + if (OB_FAIL(generate_inc_sql_(ls_stat, inc_sql))) { + LOG_WARN("fail to generate inc sql", KR(ret), + K(tenant_id), K(ls_stat), K(inc_sql)); + } else if (OB_FAIL(sql_client.write( + sql_tenant_id, inc_sql.ptr(), affected_rows))) { + LOG_WARN("fail to insert update", KR(ret), + K(tenant_id), K(inc_sql)); + } else if (OB_UNLIKELY(affected_rows > 2)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected affected rows", KR(ret), + K(tenant_id), K(inc_sql), K(affected_rows)); + } + } + return ret; +} + int ObBalanceGroupLSStatOperator::delete_balance_group_ls_stat( const int64_t timeout, common::ObISQLClient &sql_client, @@ -357,6 +399,46 @@ int ObBalanceGroupLSStatOperator::delete_balance_group_ls_stat( return ret; } +int ObBalanceGroupLSStatOperator::generate_inc_sql_( + const ObBalanceGroupLSStat &bg_ls_stat, + common::ObSqlString &sql_string) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (OB_UNLIKELY(!bg_ls_stat.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(bg_ls_stat)); + } else if (OB_FAIL(sql_string.append_fmt( + "INSERT INTO %s (" + "tenant_id, " + "balance_group_id_high, " + "balance_group_id_low, " + "ls_id, " + "tablet_group_count, " + "balance_group_name)" + " VALUES (" + "%ld, %ld, %ld, %ld, %ld, '%s') " + "ON DUPLICATE KEY UPDATE " + "tablet_group_count = tablet_group_count + %ld, " + "balance_group_name = '%s'", + OB_ALL_BALANCE_GROUP_LS_STAT_TNAME, + bg_ls_stat.get_tenant_id(), + bg_ls_stat.get_balance_group_id().id_high_, + bg_ls_stat.get_balance_group_id().id_low_, + bg_ls_stat.get_ls_id().id(), + bg_ls_stat.get_tablet_group_count(), + to_cstring(ObHexEscapeSqlStr(bg_ls_stat.get_balance_group_name().str())), + bg_ls_stat.get_tablet_group_count(), + to_cstring(ObHexEscapeSqlStr(bg_ls_stat.get_balance_group_name().str()))))) { + LOG_WARN("fail to append fmt", KR(ret), K(bg_ls_stat)); + } else { + LOG_INFO("balance group ls inc sql", K(sql_string)); + } + return ret; +} + int ObBalanceGroupLSStatOperator::generate_insert_update_sql( const ObBalanceGroupLSStat &bg_ls_stat, common::ObSqlString &sql_string) @@ -402,7 +484,8 @@ int ObBalanceGroupLSStatOperator::generate_insert_update_sql( ObNewTableTabletAllocator::ObNewTableTabletAllocator( const uint64_t tenant_id, share::schema::ObSchemaGetterGuard &schema_guard, - common::ObMySQLProxy *sql_proxy) + common::ObMySQLProxy *sql_proxy, + const bool use_parallel_ddl /*= false*/) : tenant_id_(tenant_id), schema_guard_(schema_guard), sql_proxy_(sql_proxy), @@ -410,7 +493,8 @@ ObNewTableTabletAllocator::ObNewTableTabletAllocator( status_(MyStatus::INVALID), ls_id_array_(), inited_(false), - is_add_partition_(false) + is_add_partition_(false), + use_parallel_ddl_(use_parallel_ddl) { } @@ -492,10 +576,10 @@ int ObNewTableTabletAllocator::prepare( // If ls status is not normal or is blocking tablet in, choose new ls for tablet creating. if (OB_FAIL(ret)) { } else if (is_related_table(table_schema.get_table_type(), table_schema.get_index_type())) { - // skip lock ls + // skip lock ls } else if (OB_FAIL(check_and_replace_ls_(trans, table_schema.get_tenant_id()))) { - LOG_WARN("lock user ls failed", KR(ret), - "tenant_id", table_schema.get_tenant_id(), K_(ls_id_array)); + LOG_WARN("lock user ls failed", KR(ret), + "tenant_id", table_schema.get_tenant_id(), K_(ls_id_array)); } } @@ -637,7 +721,7 @@ int ObNewTableTabletAllocator::get_available_ls( if (ls_attr.ls_is_normal() && SYS_LS != ls_attr.get_ls_id() && !ls_attr.get_ls_flag().is_block_tablet_in() - && !ls_attr.get_ls_flag().is_duplicate_ls()) { + && !ls_attr.get_ls_flag().is_duplicate_ls()) { if (OB_FAIL(ls_id_array.push_back(ls_attr.get_ls_id()))) { LOG_WARN("fail to push back", KR(ret), K(ls_attr), K(ls_id_array)); } @@ -987,6 +1071,39 @@ int ObNewTableTabletAllocator::alloc_tablet_for_non_partitioned_balance_group( return ret; } +int ObNewTableTabletAllocator::alloc_tablet_for_non_partitioned_balance_group_by_cache_( + const share::schema::ObTableSchema &table_schema) +{ + int ret = OB_SUCCESS; + LOG_INFO("alloc tablet for non partitioned balance group by cache", + "tenant_id", table_schema.get_tenant_id(), + "table_id", table_schema.get_table_id()); + common::ObArray ls_id_array; + share::ObLSID ls_id; + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("ObNewTableTabletAllocator not init", KR(ret)); + } else if (OB_UNLIKELY(PARTITION_LEVEL_ZERO != table_schema.get_part_level())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), + "part_num", table_schema.get_all_part_num(), + "part_level", table_schema.get_part_level(), + K(table_schema)); + } else if (OB_FAIL(get_available_ls(ls_id_array))) { + LOG_WARN("fail to get available ls", KR(ret)); + } else if (OB_ISNULL(GCTX.root_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("rootservice is null", KR(ret)); + } else if (OB_FAIL(GCTX.root_service_->get_ddl_service() + .get_non_partitioned_tablet_allocator() + .alloc_tablet(tenant_id_, ls_id_array, ls_id))) { + LOG_WARN("fail to alloc tablet by cache", KR(ret), K_(tenant_id)); + } else if (OB_FAIL(ls_id_array_.push_back(ls_id))) { + LOG_WARN("fail to push back ls id", KR(ret), K_(tenant_id), K(ls_id)); + } + return ret; +} + int ObNewTableTabletAllocator::alloc_tablet_for_partitioned_balance_group( const share::schema::ObTableSchema &table_schema) { @@ -1036,8 +1153,14 @@ int ObNewTableTabletAllocator::alloc_tablet_by_count_balance( } } } else if (PARTITION_LEVEL_ZERO == table_schema.get_part_level()) { - if (OB_FAIL(alloc_tablet_for_non_partitioned_balance_group(table_schema))) { - LOG_WARN("fail to alloc tablet by non partitioned balance group", KR(ret)); + if (!use_parallel_ddl_) { + if (OB_FAIL(alloc_tablet_for_non_partitioned_balance_group(table_schema))) { + LOG_WARN("fail to alloc tablet by non partitioned balance group", KR(ret)); + } + } else { + if (OB_FAIL(alloc_tablet_for_non_partitioned_balance_group_by_cache_(table_schema))) { + LOG_WARN("fail to alloc tablet by non partitioned balance group by cache", KR(ret)); + } } } else { if (OB_FAIL(alloc_tablet_for_partitioned_balance_group(table_schema))) { diff --git a/src/rootserver/ob_balance_group_ls_stat_operator.h b/src/rootserver/ob_balance_group_ls_stat_operator.h index d99c66624a..456956dc84 100644 --- a/src/rootserver/ob_balance_group_ls_stat_operator.h +++ b/src/rootserver/ob_balance_group_ls_stat_operator.h @@ -130,6 +130,11 @@ public: const uint64_t tenant_id, const ObBalanceGroupID &balance_group_id, const common::ObIArray &balance_group_ls_stat_array); + int inc_balance_group_ls_stat( + const int64_t timeout_abs, + common::ObISQLClient &sql_client, + const uint64_t tenant_id, + const ObBalanceGroupLSStat &ls_stat); int delete_balance_group_ls_stat( const int64_t timeout, common::ObISQLClient &sql_client, @@ -138,6 +143,9 @@ private: int generate_insert_update_sql( const ObBalanceGroupLSStat &bg_ls_stat, common::ObSqlString &sql_string); + int generate_inc_sql_( + const ObBalanceGroupLSStat &bg_ls_stat, + common::ObSqlString &sql_string); private: bool inited_; common::ObMySQLProxy *sql_proxy_; @@ -151,7 +159,8 @@ public: ObNewTableTabletAllocator( const uint64_t tenant_id, share::schema::ObSchemaGetterGuard &schema_guard, - common::ObMySQLProxy *sql_proxy); + common::ObMySQLProxy *sql_proxy, + const bool use_parallel_ddl = false); virtual ~ObNewTableTabletAllocator(); public: int init(); @@ -197,6 +206,8 @@ private: const share::schema::ObTableSchema &table_schema); int alloc_tablet_for_non_partitioned_balance_group( const share::schema::ObTableSchema &table_schema); + int alloc_tablet_for_non_partitioned_balance_group_by_cache_( + const share::schema::ObTableSchema &table_schema); int alloc_tablet_for_partitioned_balance_group( const share::schema::ObTableSchema &table_schema); int alloc_tablet_for_one_level_partitioned_balance_group( @@ -262,6 +273,7 @@ private: bool inited_; bool is_add_partition_; static int64_t alloc_tablet_ls_offset_; + bool use_parallel_ddl_; }; }//end namespace rootserver diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index c80de2b791..10592a58a9 100755 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -178,7 +178,8 @@ ObDDLService::ObDDLService() unit_mgr_(NULL), snapshot_mgr_(NULL), ddl_lock_(), - index_name_checker_() + index_name_checker_(), + non_partitioned_tablet_allocator_() { } @@ -197,6 +198,8 @@ int ObDDLService::init(obrpc::ObSrvRpcProxy &rpc_proxy, LOG_WARN("init twice", KR(ret)); } else if (OB_FAIL(index_name_checker_.init(sql_proxy))) { LOG_WARN("fail to init index name checker", KR(ret)); + } else if (OB_FAIL(non_partitioned_tablet_allocator_.init(sql_proxy))) { + LOG_WARN("fail to init non partitioned tablet allocator", KR(ret)); } else { rpc_proxy_ = &rpc_proxy; common_rpc_ = &common_rpc; @@ -25020,6 +25023,13 @@ int ObDDLService::drop_tenant(const ObDropTenantArg &arg) LOG_WARN("delete_recycle_object failed", KR(ret), KPC(tenant_schema)); } } + if (OB_SUCC(ret)) { + if (OB_FAIL(reset_parallel_cache(meta_tenant_id))) { + LOG_WARN("fail to reset parallel cache", KR(ret), K(meta_tenant_id)); + } else if (OB_FAIL(reset_parallel_cache(user_tenant_id))) { + LOG_WARN("fail to reset parallel cache", KR(ret), K(user_tenant_id)); + } + } } else {// put tenant into recyclebin ObTenantSchema new_tenant_schema; ObSqlString new_tenant_name; @@ -32921,6 +32931,22 @@ ObDDLSQLTransaction::~ObDDLSQLTransaction() } } +int ObDDLService::reset_parallel_cache(const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(index_name_checker_.reset_cache(tenant_id))) { + ret = OB_FAIL(ret) ? ret : tmp_ret; + LOG_ERROR("reset cache failed", KR(tmp_ret), KR(ret), K(tenant_id)); + } + + if (OB_TMP_FAIL(non_partitioned_tablet_allocator_.reset_cache(tenant_id))) { + ret = OB_FAIL(ret) ? ret : tmp_ret; + LOG_ERROR("reset cache failed", KR(tmp_ret), KR(ret), K(tenant_id)); + } + return ret; +} + /* * @description: * start transaction for DDL, lock and check schema has refreshed @@ -33035,13 +33061,9 @@ int ObDDLSQLTransaction::end(const bool commit) if (OB_ISNULL(GCTX.root_service_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("root_service is null", KR(ret)); - } else { - ObIndexNameChecker &checker = GCTX.root_service_ - ->get_ddl_service() - .get_index_name_checker(); - if (OB_FAIL(checker.reset_cache(tenant_id_))) { - LOG_ERROR("reset cache failed", KR(ret), K(tenant_id_)); - } + } else if (OB_FAIL(GCTX.root_service_->get_ddl_service() + .reset_parallel_cache(tenant_id_))) { + LOG_WARN("fail to reset parallel cache", KR(ret), K_(tenant_id)); } } diff --git a/src/rootserver/ob_ddl_service.h b/src/rootserver/ob_ddl_service.h index d5c0602df3..8ce50a7bba 100644 --- a/src/rootserver/ob_ddl_service.h +++ b/src/rootserver/ob_ddl_service.h @@ -38,6 +38,7 @@ #include "common/ob_common_utility.h" #include "share/config/ob_config.h" // ObConfigPairs #include "rootserver/parallel_ddl/ob_index_name_checker.h" +#include "rootserver/parallel_ddl/ob_tablet_balance_allocator.h" namespace oceanbase { @@ -125,6 +126,10 @@ public: ObSnapshotInfoManager &get_snapshot_mgr() { return *snapshot_mgr_; } share::ObLSTableOperator &get_lst_operator() { return *lst_operator_; } share::schema::ObIndexNameChecker &get_index_name_checker() { return index_name_checker_; } + share::schema::ObNonPartitionedTableTabletAllocator &get_non_partitioned_tablet_allocator() + { + return non_partitioned_tablet_allocator_; + } // create_index_table will fill table_id and frozen_version to table_schema virtual int create_index_table(const obrpc::ObCreateIndexArg &arg, @@ -1121,6 +1126,7 @@ int check_table_udt_id_is_exist(share::schema::ObSchemaGetterGuard &schema_guard share::schema::ObSchemaGetterGuard &schema_guard, share::schema::ObTableSchema &schema); + int reset_parallel_cache(const uint64_t tenant_id); private: enum PartitionBornMethod : int64_t { @@ -2571,6 +2577,7 @@ private: // for paralled ddl to cache oracle's index name map share::schema::ObIndexNameChecker index_name_checker_; + share::schema::ObNonPartitionedTableTabletAllocator non_partitioned_tablet_allocator_; private: DISALLOW_COPY_AND_ASSIGN(ObDDLService); }; diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index 29e9f9bb0c..74cb469dc2 100755 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -1286,6 +1286,8 @@ void ObRootService::wait() FLOG_INFO("global ctx timer exit success"); ddl_service_.get_index_name_checker().reset_all_cache(); FLOG_INFO("reset index name checker success"); + ddl_service_.get_non_partitioned_tablet_allocator().reset_all_cache(); + FLOG_INFO("reset non partitioned tablet allocator success"); ObUpdateRsListTask::clear_lock(); THE_RS_JOB_TABLE.reset_max_job_id(); int64_t cost = ObTimeUtility::current_time() - start_time; diff --git a/src/rootserver/parallel_ddl/ob_create_table_helper.cpp b/src/rootserver/parallel_ddl/ob_create_table_helper.cpp index 247eb9174b..fb0f041030 100644 --- a/src/rootserver/parallel_ddl/ob_create_table_helper.cpp +++ b/src/rootserver/parallel_ddl/ob_create_table_helper.cpp @@ -2331,7 +2331,8 @@ int ObCreateTableHelper::create_tablets_() ObNewTableTabletAllocator new_table_tablet_allocator( tenant_id_, schema_guard, - sql_proxy_); + sql_proxy_, + true /*use parallel ddl*/); int64_t last_schema_version = OB_INVALID_VERSION; auto *tsi_generator = GET_TSI(TSISchemaVersionGenerator); if (OB_FAIL(table_creator.init(true/*need_tablet_cnt_check*/))) { diff --git a/src/rootserver/parallel_ddl/ob_index_name_checker.cpp b/src/rootserver/parallel_ddl/ob_index_name_checker.cpp index 6a121ae2ef..0349c0e952 100644 --- a/src/rootserver/parallel_ddl/ob_index_name_checker.cpp +++ b/src/rootserver/parallel_ddl/ob_index_name_checker.cpp @@ -334,7 +334,6 @@ void ObIndexNameChecker::reset_all_cache() int ObIndexNameChecker::reset_cache(const uint64_t tenant_id) { int ret = OB_SUCCESS; - bool can_skip = false; SpinRLockGuard guard(rwlock_); if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; diff --git a/src/rootserver/parallel_ddl/ob_tablet_balance_allocator.cpp b/src/rootserver/parallel_ddl/ob_tablet_balance_allocator.cpp new file mode 100644 index 0000000000..3c49d307c6 --- /dev/null +++ b/src/rootserver/parallel_ddl/ob_tablet_balance_allocator.cpp @@ -0,0 +1,409 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SHARE_SCHEMA + +#include "rootserver/parallel_ddl/ob_tablet_balance_allocator.h" +#include "rootserver/ob_balance_group_ls_stat_operator.h" +#include "share/ob_share_util.h" + +using namespace oceanbase::lib; +using namespace oceanbase::common; +using namespace oceanbase::share; +using namespace oceanbase::share::schema; +using namespace oceanbase::rootserver; + + +ObNonPartitionedTableTabletCache::ObNonPartitionedTableTabletCache( + const uint64_t tenant_id, + common::ObMySQLProxy &sql_proxy) + : mutex_(), + tenant_id_(tenant_id), + sql_proxy_(sql_proxy), + allocator_(ObMemAttr(OB_SYS_TENANT_ID, "NonPartTabtCac", ObCtxIds::SCHEMA_SERVICE), + PAGE_SIZE), + cache_(ARRAY_BLOCK_SIZE, ModulePageAllocator(allocator_)), + loaded_timestamp_(OB_INVALID_TIMESTAMP), + dump_timestamp_(OB_INVALID_TIMESTAMP) +{ +} + +void ObNonPartitionedTableTabletCache::reset_cache() +{ + lib::ObMutexGuard guard(mutex_); + (void) inner_reset_cache_(); +} + +void ObNonPartitionedTableTabletCache::inner_reset_cache_() +{ + cache_.reset(); + allocator_.reset(); + loaded_timestamp_ = OB_INVALID_TIMESTAMP; + LOG_INFO("[NON PARTITIONED TABLET CACHE] reset cache", K_(tenant_id)); +} + +// In the following cases, cache will be reload first: +// 1. cache_ is empty +// 2. cache_ is expire (consider transfer may change the placement of related tablets) +// 3. cache_ and avaliable_ls_ids are not matched (ls cnt or status changed) +bool ObNonPartitionedTableTabletCache::should_reload_cache_( + const common::ObIArray &avaliable_ls_ids) +{ + bool bret = false; + int64_t interval = INT64_MAX; + { + omt::ObTenantConfigGuard tenant_config(OTC_MGR.get_tenant_config_with_lock(tenant_id_)); + if (tenant_config.is_valid()) { + interval = tenant_config->partition_balance_schedule_interval; + } + } + if (loaded_timestamp_ < 0) { + bret = true; // case 1 + LOG_INFO("[NON PARTITIONED TABLET CACHE] failure/non parallel ddl occur or cache is empty, should be reloaded", K_(tenant_id)); + } else if (ObTimeUtility::current_time() - loaded_timestamp_ >= interval) { + bret = true; // case 2 + LOG_INFO("[NON PARTITIONED TABLET CACHE] cache is expire, should be reloaded", K_(tenant_id)); + } else { + // case 3 + if (avaliable_ls_ids.count() != cache_.count()) { + bret = true; + } else { + for (int64_t i = 0; !bret && i < cache_.count(); i++) { + ObLSID ls_id = cache_.at(i).get_ls_id(); + if (!has_exist_in_array(avaliable_ls_ids, ls_id)) { + bret = true; + } + } // end for + } + if (bret) { + LOG_INFO("[NON PARTITIONED TABLET CACHE] ls is changed, should be reloaded", K_(tenant_id)); + } + } + return bret; +} + +int ObNonPartitionedTableTabletCache::reload_cache_( + const common::ObIArray &avaliable_ls_ids) +{ + int ret = OB_SUCCESS; + (void) inner_reset_cache_(); + + ObBalanceGroupLSStatOperator op; + common::ObArray bg_ls_stat_array; + ObBalanceGroupID bg_id(0, 0); // for non-partitioned table + ObString bg_name(rootserver::ObBalanceGroup::NON_PART_BG_NAME); + const int64_t default_timeout = GCONF.internal_sql_execute_timeout; + int64_t start_time = ObTimeUtility::current_time(); + if (OB_FAIL(op.init(&sql_proxy_))) { + LOG_WARN("fail to init ObBalanceGroupLSStatOperator", KR(ret)); + } else if (OB_FAIL(op.get_balance_group_ls_stat( + default_timeout, + sql_proxy_, + tenant_id_, + bg_id, + false, /*for update*/ + bg_ls_stat_array))) { + LOG_WARN("fail to get balance ls stat array", KR(ret), K_(tenant_id)); + } else { + // 1. get existed ls stat + common::ObArray new_ls_stat_array; + for (int64_t i = 0; OB_SUCC(ret) && i < bg_ls_stat_array.count(); i++) { + const ObBalanceGroupLSStat &ls_stat = bg_ls_stat_array.at(i); + ObLSID ls_id = ls_stat.get_ls_id(); + if (has_exist_in_array(avaliable_ls_ids, ls_id)) { + if (OB_FAIL(new_ls_stat_array.push_back(ls_stat))) { + LOG_WARN("fail to push back ObBalanceGroupLSStat", KR(ret), K(ls_stat)); + } + } + } // end for + + // 2. insert missing ls stat + common::ObArray miss_ls_stat_array; + if (OB_SUCC(ret)) { + for (int64_t i = 0; OB_SUCC(ret) && i < avaliable_ls_ids.count(); i++) { + const ObLSID &ls_id = avaliable_ls_ids.at(i); + bool finded = false; + for (int64_t j = 0; !finded && OB_SUCC(ret) && j < bg_ls_stat_array.count(); j++) { + const ObBalanceGroupLSStat &ls_stat = bg_ls_stat_array.at(j); + if (ls_id == ls_stat.get_ls_id()) { + finded = true; + } + } // end for + if (OB_SUCC(ret) && !finded) { + ObBalanceGroupLSStat ls_stat; + if (OB_FAIL(ls_stat.build(tenant_id_, bg_id, ls_id, 0 /*bg cnt*/, bg_name))) { + LOG_WARN("fail to build ls_stat", KR(ret), K_(tenant_id), K(ls_id)); + } else if (OB_FAIL(miss_ls_stat_array.push_back(ls_stat))) { + LOG_WARN("fail to push back miss ls stat", KR(ret), K(ls_stat)); + } + } + } // end for + + if (OB_SUCC(ret) && miss_ls_stat_array.count() > 0) { + if (OB_FAIL(op.insert_update_balance_group_ls_stat( + default_timeout, tenant_id_, bg_id, miss_ls_stat_array))) { + LOG_WARN("fail to insert miss ls stat", KR(ret), K_(tenant_id), K(miss_ls_stat_array)); + } + } + } + + // 3. store in cache + if (FAILEDx(append(new_ls_stat_array, miss_ls_stat_array))) { + LOG_WARN("fail to append ls stat array", KR(ret), K_(tenant_id), K(miss_ls_stat_array)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < new_ls_stat_array.count(); i++) { + const ObBalanceGroupLSStat &ls_stat = new_ls_stat_array.at(i); + Pair pair(ls_stat.get_ls_id(), ls_stat.get_tablet_group_count()); + if (OB_FAIL(cache_.push_back(pair))) { + LOG_WARN("fail to push back pair", KR(ret), K_(tenant_id), K(ls_stat)); + } + } // end for + if (OB_FAIL(ret)) { + (void) inner_reset_cache_(); + } + } + + if (OB_SUCC(ret)) { + loaded_timestamp_ = ObTimeUtility::current_time(); + } + } + LOG_INFO("[NON PARTITIONED TABLET CACHE] reload cache", + KR(ret), K_(tenant_id), "cost", ObTimeUtility::current_time() - start_time); + return ret; +} + +int ObNonPartitionedTableTabletCache::alloc_tablet( + const common::ObIArray &avaliable_ls_ids, + share::ObLSID &ls_id) +{ + int ret = OB_SUCCESS; + ls_id.reset(); + lib::ObMutexGuard guard(mutex_); + if (OB_UNLIKELY(avaliable_ls_ids.empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", KR(ret), K(avaliable_ls_ids.count())); + } else if (should_reload_cache_(avaliable_ls_ids)) { + if (OB_FAIL(reload_cache_(avaliable_ls_ids))) { + LOG_WARN("fail to reload cache", KR(ret), K_(tenant_id), K(avaliable_ls_ids)); + } + } + // find ls which has min tablet cnt + if (OB_SUCC(ret)) { + int64_t min_tablet_cnt = INT64_MAX; + int64_t pos = OB_INVALID_INDEX; + for (int64_t i = 0; OB_SUCC(ret) && i < cache_.count(); i++) { + if (min_tablet_cnt > cache_.at(i).get_tablet_cnt()) { + min_tablet_cnt = cache_.at(i).get_tablet_cnt(); + pos = i; + } + } // end for + if (OB_UNLIKELY(OB_INVALID_INDEX == pos + || pos >= cache_.count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to find ls has min tablet cnt", + KR(ret), K_(tenant_id), K(pos), K_(cache)); + } else { + Pair &target_pair = cache_.at(pos); + const int64_t tablet_cnt = target_pair.get_tablet_cnt() + 1; + ls_id = target_pair.get_ls_id(); + target_pair.set_tablet_cnt(tablet_cnt); + } + } + (void) dump_cache_(); + return ret; +} + +void ObNonPartitionedTableTabletCache::dump_cache_() +{ + const int64_t DUMP_INTERVAL = 10 * 60 * 1000 * 1000L; // 10min + const int64_t current_time = ObTimeUtility::current_time(); + if (current_time - dump_timestamp_ >= DUMP_INTERVAL) { + LOG_INFO("[NON PARTITIONED TABLET CACHE] dump cache", K_(tenant_id), + K_(loaded_timestamp), K_(dump_timestamp), K_(cache)); + dump_timestamp_ = current_time; + } +} + +ObNonPartitionedTableTabletAllocator::ObNonPartitionedTableTabletAllocator() + : rwlock_(), + allocator_(ObMemAttr(OB_SYS_TENANT_ID, "NonPartTenCac", ObCtxIds::SCHEMA_SERVICE)), + tenant_cache_(), + sql_proxy_(NULL), + inited_(false) +{ +} + +ObNonPartitionedTableTabletAllocator::~ObNonPartitionedTableTabletAllocator() +{ + destroy(); +} + +int ObNonPartitionedTableTabletAllocator::init(common::ObMySQLProxy &sql_proxy) +{ + int ret = OB_SUCCESS; + SpinWLockGuard guard(rwlock_); + if (inited_) { + ret = OB_INIT_TWICE; + LOG_WARN("init twice", KR(ret)); + } else { + const int64_t BUCKET_NUM = 1024; + if (OB_FAIL(tenant_cache_.create(BUCKET_NUM, "NonPartTenMap", "NonPartTenMap"))) { + LOG_WARN("fail to create hash map", KR(ret)); + } else { + sql_proxy_ = &sql_proxy; + inited_ = true; + } + } + return ret; +} + +void ObNonPartitionedTableTabletAllocator::destroy() +{ + SpinWLockGuard guard(rwlock_); + if (inited_) { + FOREACH(it, tenant_cache_) { + if (OB_NOT_NULL(it->second)) { + (it->second)->~ObNonPartitionedTableTabletCache(); + it->second = NULL; + } + } + tenant_cache_.destroy(); + allocator_.reset(); + sql_proxy_ = NULL; + inited_ = false; + } +} + +void ObNonPartitionedTableTabletAllocator::reset_all_cache() +{ + int ret = OB_SUCCESS; + SpinRLockGuard guard(rwlock_); + if (inited_) { + FOREACH(it, tenant_cache_) { + if (OB_NOT_NULL(it->second)) { + (void) (it->second)->reset_cache(); + } + } + } +} + +int ObNonPartitionedTableTabletAllocator::reset_cache( + const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + SpinRLockGuard guard(rwlock_); + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else { + ObNonPartitionedTableTabletCache *cache = NULL; + if (OB_FAIL(tenant_cache_.get_refactored(tenant_id, cache))) { + if (OB_HASH_NOT_EXIST != ret) { + LOG_WARN("fail to get refactored", KR(ret), K(tenant_id)); + } else { + // tenant not in cache, just skip + ret = OB_SUCCESS; + } + } else if (OB_ISNULL(cache)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("cache is null", KR(ret), K(tenant_id)); + } else { + (void) cache->reset_cache(); + } + } + return ret; +} + +int ObNonPartitionedTableTabletAllocator::try_init_cache_( + const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + SpinWLockGuard guard(rwlock_); + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (OB_ISNULL(sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("sql_proxy is null", KR(ret)); + } else { + ObNonPartitionedTableTabletCache *cache = NULL; + if (OB_FAIL(tenant_cache_.get_refactored(tenant_id, cache))) { + if (OB_HASH_NOT_EXIST != ret) { + LOG_WARN("fail to get cache", KR(ret), K(tenant_id)); + } else { + ret = OB_SUCCESS; + cache = NULL; + void *buf = NULL; + if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObNonPartitionedTableTabletCache)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to alloc memory", KR(ret)); + } else if (FALSE_IT(cache = new (buf) ObNonPartitionedTableTabletCache(tenant_id, *sql_proxy_))) { + } else if (OB_FAIL(tenant_cache_.set_refactored(tenant_id, cache))) { + LOG_WARN("fail to set cache", KR(ret), K(tenant_id)); + } + } + } else { + // cache exist, just skip + } + } + return ret; +} + +int ObNonPartitionedTableTabletAllocator::alloc_tablet( + const uint64_t tenant_id, + const common::ObIArray &avaliable_ls_ids, + share::ObLSID &ls_id) +{ + int ret = OB_SUCCESS; + ls_id.reset(); + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id + || avaliable_ls_ids.empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", KR(ret), K(tenant_id), K(avaliable_ls_ids.count())); + } else if (OB_FAIL(try_init_cache_(tenant_id))) { + LOG_WARN("try to init cache failed", KR(ret), K(tenant_id)); + } else { + { + SpinRLockGuard guard(rwlock_); + ObNonPartitionedTableTabletCache *cache = NULL; + if (OB_FAIL(tenant_cache_.get_refactored(tenant_id, cache))) { + LOG_WARN("fail to get refactored", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(cache)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("cache is null", KR(ret)); + } else if (OB_FAIL(cache->alloc_tablet(avaliable_ls_ids, ls_id))) { + LOG_WARN("fail to alloc tablet", KR(ret), K(tenant_id)); + } + } + if (OB_SUCC(ret)) { + // try update ls stat + ObBalanceGroupLSStat ls_stat; + const ObBalanceGroupID bg_id(0, 0); // for non-partitioned table + const ObString bg_name(rootserver::ObBalanceGroup::NON_PART_BG_NAME); + const int64_t inc_tablet_cnt = 1; + const int64_t default_timeout = GCONF.internal_sql_execute_timeout; + ObBalanceGroupLSStatOperator op; + if (OB_FAIL(op.init(sql_proxy_))) { + LOG_WARN("fail to init ObBalanceGroupLSStatOperator", KR(ret)); + } else if (OB_FAIL(ls_stat.build(tenant_id, bg_id, ls_id, inc_tablet_cnt, bg_name))) { + LOG_WARN("fail to build ls_stat", KR(ret), K(tenant_id), K(ls_id)); + } else if (OB_FAIL(op.inc_balance_group_ls_stat( + default_timeout, *sql_proxy_, tenant_id, ls_stat))) { + LOG_WARN("fail to inc ls stat", KR(ret), K(tenant_id), K(ls_stat)); + } + } + } + return ret; +} diff --git a/src/rootserver/parallel_ddl/ob_tablet_balance_allocator.h b/src/rootserver/parallel_ddl/ob_tablet_balance_allocator.h new file mode 100644 index 0000000000..1589aa25d4 --- /dev/null +++ b/src/rootserver/parallel_ddl/ob_tablet_balance_allocator.h @@ -0,0 +1,115 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#ifndef OCEANBASE_ROOTSERVER_OB_TABLET_BALANCE_ALLOCATOR_H_ +#define OCEANBASE_ROOTSERVER_OB_TABLET_BALANCE_ALLOCATOR_H_ + +#include "lib/container/ob_array.h" +#include "lib/hash/ob_pointer_hashmap.h" +#include "lib/mysqlclient/ob_mysql_proxy.h" +#include "share/ob_ls_id.h" +namespace oceanbase +{ +namespace share +{ +namespace schema +{ + +class ObNonPartitionedTableTabletCache +{ +public: + class Pair { + public: + Pair() : ls_id_(), tablet_cnt_(0) {} + Pair(const share::ObLSID &ls_id, const int64_t tablet_cnt) + : ls_id_(ls_id), tablet_cnt_(tablet_cnt) {} + ~Pair() {} + ObLSID get_ls_id() const { return ls_id_; } + int64_t get_tablet_cnt() const { return tablet_cnt_; } + void set_tablet_cnt(const int64_t tablet_cnt) { tablet_cnt_ = tablet_cnt; } + TO_STRING_KV(K_(ls_id), K_(tablet_cnt)); + private: + share::ObLSID ls_id_; + int64_t tablet_cnt_; + }; +public: + ObNonPartitionedTableTabletCache() = delete; + ObNonPartitionedTableTabletCache(const uint64_t tenant_id, + common::ObMySQLProxy &sql_proxy); + ~ObNonPartitionedTableTabletCache() {} + + void reset_cache(); + int alloc_tablet(const common::ObIArray &avaliable_ls_ids, + share::ObLSID &ls_id); +private: + void inner_reset_cache_(); + bool should_reload_cache_( + const common::ObIArray &avaliable_ls_ids); + int reload_cache_( + const common::ObIArray &avaliable_ls_ids); + void dump_cache_(); +private: + const static int64_t PAGE_SIZE = 1024; + const static int64_t ARRAY_BLOCK_SIZE = 256; +private: + lib::ObMutex mutex_; + uint64_t tenant_id_; + common::ObMySQLProxy &sql_proxy_; + common::ObArenaAllocator allocator_; + common::ObArray cache_; + int64_t loaded_timestamp_; + int64_t dump_timestamp_; + DISALLOW_COPY_AND_ASSIGN(ObNonPartitionedTableTabletCache); +}; + +/* + * alloc ls for non-partition table's tablet in parallel ddl + * + * when ls changed or non parallel ddl execute, cache will be automatically updated or reset. + * + * Related cache may be inaccurate in the following cases: + * 1. transfer tablet occur + * 2. backgroup try_statistic_balance_group_status_() occur + * 3. ls changed + * 4. alloc ls success but parallel ddl execute failed + * + */ +class ObNonPartitionedTableTabletAllocator +{ +public: + ObNonPartitionedTableTabletAllocator(); + ~ObNonPartitionedTableTabletAllocator(); + + int init(common::ObMySQLProxy &sql_proxy); + void destroy(); + + void reset_all_cache(); + int reset_cache(const uint64_t tenant_id); + + int alloc_tablet(const uint64_t tenant_id, + const common::ObIArray &avaliable_ls_ids, + share::ObLSID &ls_id); +private: + int try_init_cache_(const uint64_t tenant_id); +private: + common::SpinRWLock rwlock_; + common::ObArenaAllocator allocator_; + // tenant_cache_ won't be erased() + common::hash::ObHashMap tenant_cache_; + common::ObMySQLProxy *sql_proxy_; + bool inited_; +}; + +} // end schema +} // end share +} // end oceanbase + +#endif//OCEANBASE_ROOTSERVER_OB_TABLET_BALANCE_ALLOCATOR_H_