From f96bc6257324ca329bf55907d36782c4a15aa7b3 Mon Sep 17 00:00:00 2001 From: yinzhijian <373141588@qq.com> Date: Mon, 28 Mar 2022 10:03:21 +0800 Subject: [PATCH] [feature](balance) Support balance between disks on a single BE (#8553) Current situation of Doris is that the cluster is balanced, but the disks of a backend may be unbalanced. for example, backend A have two disks: disk1 and disk2, disk1's usage is 98%, but disk2's usage is only 40%. disk1 is unable to take more data, therefore only one disk of backend A can take new data, the available write throughput of backend A is only half of its ability, and we can not resolve this through load or partition rebalance now. So we introduce disk rebalancer, disk rebalancer is different from other rebalancer(load or partition) which take care of cluster-wide data balancing. it takes care about backend-wide data balancing. [For more details see #8550](https://github.com/apache/incubator-doris/issues/8550) --- be/src/agent/task_worker_pool.cpp | 13 +- be/src/agent/task_worker_pool.h | 2 +- be/src/common/config.h | 5 + .../task/engine_storage_migration_task.cpp | 334 +++++++++++++----- .../olap/task/engine_storage_migration_task.h | 20 +- be/test/olap/CMakeLists.txt | 1 + .../engine_storage_migration_task_test.cpp | 302 ++++++++++++++++ docs/.vuepress/sidebar/en.js | 2 + docs/.vuepress/sidebar/zh-CN.js | 2 + .../ADMIN CANCEL REBALANCE DISK.md | 51 +++ .../Administration/ADMIN REBALANCE DISK.md | 52 +++ .../ADMIN CANCEL REBALANCE DISK.md | 52 +++ .../Administration/ADMIN REBALANCE DISK.md | 54 +++ fe/fe-core/src/main/cup/sql_parser.cup | 20 +- .../AdminCancelRebalanceDiskStmt.java | 73 ++++ .../analysis/AdminRebalanceDiskStmt.java | 79 +++++ .../doris/clone/BackendLoadStatistic.java | 58 ++- .../apache/doris/clone/DiskRebalancer.java | 334 ++++++++++++++++++ .../org/apache/doris/clone/Rebalancer.java | 33 +- .../apache/doris/clone/TabletSchedCtx.java | 65 ++++ .../apache/doris/clone/TabletScheduler.java | 123 ++++++- .../java/org/apache/doris/common/Config.java | 6 + .../org/apache/doris/master/MasterImpl.java | 17 +- .../apache/doris/master/ReportHandler.java | 4 +- .../java/org/apache/doris/qe/DdlExecutor.java | 6 + .../doris/task/StorageMediaMigrationTask.java | 15 + fe/fe-core/src/main/jflex/sql_scanner.flex | 2 + .../AdminCancelRebalanceDiskStmtTest.java | 82 +++++ .../analysis/AdminRebalanceDiskStmtTest.java | 83 +++++ .../apache/doris/clone/DiskRebalanceTest.java | 262 ++++++++++++++ .../org/apache/doris/clone/RebalanceTest.java | 29 +- .../doris/clone/RebalancerTestUtil.java | 32 +- .../org/apache/doris/task/AgentTaskTest.java | 15 + 33 files changed, 2106 insertions(+), 122 deletions(-) create mode 100644 be/test/olap/engine_storage_migration_task_test.cpp create mode 100644 docs/en/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE DISK.md create mode 100644 docs/en/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md create mode 100644 docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE DISK.md create mode 100644 docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md create mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/analysis/AdminRebalanceDiskStmtTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 29219fc61d..a10d2c4a78 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -941,8 +941,8 @@ void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() { TStatusCode::type status_code = TStatusCode::OK; // check request and get info TabletSharedPtr tablet; - DataDir* dest_store; - if (_check_migrate_requset(storage_medium_migrate_req, tablet, &dest_store) != + DataDir* dest_store = nullptr; + if (_check_migrate_request(storage_medium_migrate_req, tablet, &dest_store) != OLAP_SUCCESS) { status_code = TStatusCode::RUNTIME_ERROR; } else { @@ -953,7 +953,7 @@ void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() { << ", signature: " << agent_task_req.signature; status_code = TStatusCode::RUNTIME_ERROR; } else { - LOG(INFO) << "storage media migrate success. status:" << res << "," + LOG(INFO) << "storage media migrate success. status:" << res << ", signature:" << agent_task_req.signature; } } @@ -974,7 +974,7 @@ void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() { } } -OLAPStatus TaskWorkerPool::_check_migrate_requset(const TStorageMediumMigrateReq& req, +OLAPStatus TaskWorkerPool::_check_migrate_request(const TStorageMediumMigrateReq& req, TabletSharedPtr& tablet, DataDir** dest_store) { int64_t tablet_id = req.tablet_id; int32_t schema_hash = req.schema_hash; @@ -1020,6 +1020,11 @@ OLAPStatus TaskWorkerPool::_check_migrate_requset(const TStorageMediumMigrateReq *dest_store = stores[0]; } + if (tablet->data_dir()->path() == (*dest_store)->path()) { + LOG(INFO) << "tablet is already on specified path. " + << "path=" << tablet->data_dir()->path(); + return OLAP_REQUEST_FAILED; + } // check disk capacity int64_t tablet_size = tablet->tablet_footprint(); diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 3cffbc7af0..4181d0ce50 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -208,7 +208,7 @@ private: Status _move_dir(const TTabletId tablet_id, const TSchemaHash schema_hash, const std::string& src, int64_t job_id, bool overwrite); - OLAPStatus _check_migrate_requset(const TStorageMediumMigrateReq& req, TabletSharedPtr& tablet, + OLAPStatus _check_migrate_request(const TStorageMediumMigrateReq& req, TabletSharedPtr& tablet, DataDir** dest_store); // random sleep 1~second seconds diff --git a/be/src/common/config.h b/be/src/common/config.h index 438a4a345f..fe92b6b3ea 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -332,6 +332,11 @@ CONF_Int32(min_tablet_migration_threads, "1"); CONF_Int32(max_tablet_migration_threads, "1"); CONF_mInt32(finished_migration_tasks_size, "10000"); +// If size less than this, the remaining rowsets will be force to complete +CONF_mInt32(migration_remaining_size_threshold_mb, "10"); +// If the task runs longer than this time, the task will be terminated, in seconds. +// tablet max size / migration min speed * factor = 10GB / 1MBps * 2 = 20480 seconds +CONF_mInt32(migration_task_timeout_secs, "20480"); // Port to start debug webserver on CONF_Int32(webserver_port, "8040"); diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index 486730affa..a3724a2415 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -17,6 +17,8 @@ #include "olap/task/engine_storage_migration_task.h" +#include + #include "olap/snapshot_manager.h" #include "olap/tablet_meta_manager.h" @@ -24,146 +26,298 @@ namespace doris { using std::stringstream; +const int CHECK_TXNS_MAX_WAIT_TIME_SECS = 60; + EngineStorageMigrationTask::EngineStorageMigrationTask(const TabletSharedPtr& tablet, DataDir* dest_store) - : _tablet(tablet), _dest_store(dest_store) {} + : _tablet(tablet), _dest_store(dest_store) { + _task_start_time = time(nullptr); + } OLAPStatus EngineStorageMigrationTask::execute() { return _migrate(); } -OLAPStatus EngineStorageMigrationTask::_migrate() { +OLAPStatus EngineStorageMigrationTask::_get_versions(int32_t start_version, int32_t* end_version, + std::vector *consistent_rowsets) { + ReadLock rdlock(_tablet->get_header_lock()); + const RowsetSharedPtr last_version = _tablet->rowset_with_max_version(); + if (last_version == nullptr) { + LOG(WARNING) << "failed to get rowset with max version, tablet=" + << _tablet->full_name(); + return OLAP_ERR_VERSION_NOT_EXIST; + } + + *end_version = last_version->end_version(); + if (*end_version < start_version) { + // rowsets are empty + VLOG_DEBUG << "consistent rowsets empty. tablet=" << _tablet->full_name() + << ", start_version=" << start_version << ", end_version=" << *end_version; + return OLAP_SUCCESS; + } + _tablet->capture_consistent_rowsets(Version(start_version, *end_version), consistent_rowsets); + if (consistent_rowsets->empty()) { + LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << _tablet->full_name() + << ", version=" << *end_version; + return OLAP_ERR_VERSION_NOT_EXIST; + } + return OLAP_SUCCESS; +} + +bool EngineStorageMigrationTask::_is_timeout() { + int64_t time_elapsed = time(nullptr) - _task_start_time; + if (time_elapsed > config::migration_task_timeout_secs) { + LOG(WARNING) << "migration failed due to timeout, time_eplapsed=" << time_elapsed + << ", tablet=" << _tablet->full_name(); + return true; + } + return false; +} + +OLAPStatus EngineStorageMigrationTask::_check_running_txns() { + // need hold migration lock outside + int64_t partition_id; + std::set transaction_ids; + // check if this tablet has related running txns. if yes, can not do migration. + StorageEngine::instance()->txn_manager()->get_tablet_related_txns( + _tablet->tablet_id(), _tablet->schema_hash(), _tablet->tablet_uid(), &partition_id, &transaction_ids); + if (transaction_ids.size() > 0) { + return OLAP_ERR_HEADER_HAS_PENDING_DATA; + } + return OLAP_SUCCESS; +} + +OLAPStatus EngineStorageMigrationTask::_check_running_txns_until_timeout(UniqueWriteLock* migration_wlock) { + // caller should not hold migration lock, and 'migration_wlock' should not be nullptr + // ownership of the migration_wlock is transferred to the caller if check succ + DCHECK_NE(migration_wlock, nullptr); + OLAPStatus res = OLAP_SUCCESS; + int try_times = 1; + do { + // to avoid invalid loops, the lock is guaranteed to be acquired here + UniqueWriteLock wlock(_tablet->get_migration_lock()); + res = _check_running_txns(); + if (res == OLAP_SUCCESS) { + // transfer the lock to the caller + *migration_wlock = std::move(wlock); + return res; + } + LOG(INFO) << "check running txns fail, try again until timeout." + << " tablet=" << _tablet->full_name() + << ", try times=" << try_times + << ", res=" << res; + // unlock and sleep for a while, try again + wlock.unlock(); + sleep(std::min(config::sleep_one_second * try_times, CHECK_TXNS_MAX_WAIT_TIME_SECS)); + ++try_times; + } while (!_is_timeout()); + return res; +} + +OLAPStatus EngineStorageMigrationTask::_gen_and_write_header_to_hdr_file( + uint64_t shard, + const std::string& full_path, + const std::vector& consistent_rowsets) { + // need hold migration lock and push lock outside + OLAPStatus res = OLAP_SUCCESS; int64_t tablet_id = _tablet->tablet_id(); int32_t schema_hash = _tablet->schema_hash(); + TabletMetaSharedPtr new_tablet_meta(new (std::nothrow) TabletMeta()); + { + ReadLock rdlock(_tablet->get_header_lock()); + _generate_new_header(shard, consistent_rowsets, new_tablet_meta); + } + std::string new_meta_file = full_path + "/" + std::to_string(tablet_id) + ".hdr"; + res = new_tablet_meta->save(new_meta_file); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "failed to save meta to path: " << new_meta_file; + return res; + } + + // reset tablet id and rowset id + res = TabletMeta::reset_tablet_uid(new_meta_file); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "errors while set tablet uid: '" << new_meta_file; + return res; + } + // it will change rowset id and its create time + // rowset create time is useful when load tablet from meta to check which tablet is the tablet to load + res = SnapshotManager::instance()->convert_rowset_ids(full_path, tablet_id, schema_hash); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "failed to convert rowset id when do storage migration" + << " path = " << full_path; + return res; + } + return res; +} + +OLAPStatus EngineStorageMigrationTask::_reload_tablet( + const std::string& full_path) { + // need hold migration lock and push lock outside + OLAPStatus res = OLAP_SUCCESS; + int64_t tablet_id = _tablet->tablet_id(); + int32_t schema_hash = _tablet->schema_hash(); + res = StorageEngine::instance()->tablet_manager()->load_tablet_from_dir( + _dest_store, tablet_id, schema_hash, full_path, false); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "failed to load tablet from new path. tablet_id=" << tablet_id + << " schema_hash=" << schema_hash << " path = " << full_path; + return res; + } + + // if old tablet finished schema change, then the schema change status of the new tablet is DONE + // else the schema change status of the new tablet is FAILED + TabletSharedPtr new_tablet = + StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash); + if (new_tablet == nullptr) { + LOG(WARNING) << "tablet not found. tablet_id=" << tablet_id + << " schema_hash=" << schema_hash; + return OLAP_ERR_TABLE_NOT_FOUND; + } + return res; +} + +// if the size less than threshold, return true +bool EngineStorageMigrationTask::_is_rowsets_size_less_than_threshold( + const std::vector& consistent_rowsets) { + size_t total_size = 0; + for (const auto& rs : consistent_rowsets) { + total_size += rs->index_disk_size() + rs->data_disk_size(); + } + if (total_size < config::migration_remaining_size_threshold_mb) { + return true; + } + return false; +} + +OLAPStatus EngineStorageMigrationTask::_migrate() { + int64_t tablet_id = _tablet->tablet_id(); LOG(INFO) << "begin to process tablet migrate. " << "tablet_id=" << tablet_id << ", dest_store=" << _dest_store->path(); DorisMetrics::instance()->storage_migrate_requests_total->increment(1); + int32_t start_version = 0; + int32_t end_version = 0; + std::vector consistent_rowsets; // try hold migration lock first OLAPStatus res = OLAP_SUCCESS; - UniqueWriteLock migration_wlock(_tablet->get_migration_lock(), std::try_to_lock); - if (!migration_wlock.owns_lock()) { - return OLAP_ERR_RWLOCK_ERROR; - } - - // check if this tablet has related running txns. if yes, can not do migration. - int64_t partition_id; - std::set transaction_ids; - StorageEngine::instance()->txn_manager()->get_tablet_related_txns( - tablet_id, schema_hash, _tablet->tablet_uid(), &partition_id, &transaction_ids); - if (transaction_ids.size() > 0) { - LOG(WARNING) << "could not migration because has unfinished txns, " - << " tablet=" << _tablet->full_name(); - return OLAP_ERR_HEADER_HAS_PENDING_DATA; - } - - std::lock_guard lock(_tablet->get_push_lock()); - // TODO(ygl): the tablet should not under schema change or rollup or load - do { - std::vector consistent_rowsets; - { - ReadLock rdlock(_tablet->get_header_lock()); - // get all versions to be migrate - const RowsetSharedPtr last_version = _tablet->rowset_with_max_version(); - if (last_version == nullptr) { - res = OLAP_ERR_VERSION_NOT_EXIST; - LOG(WARNING) << "failed to get rowset with max version, tablet=" - << _tablet->full_name(); - break; - } - int32_t end_version = last_version->end_version(); - res = _tablet->capture_consistent_rowsets(Version(0, end_version), &consistent_rowsets); - if (consistent_rowsets.empty()) { - res = OLAP_ERR_VERSION_NOT_EXIST; - LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << _tablet->full_name() - << ", version=" << end_version; - break; - } + uint64_t shard = 0; + string full_path; + { + UniqueWriteLock migration_wlock(_tablet->get_migration_lock(), std::try_to_lock); + if (!migration_wlock.owns_lock()) { + return OLAP_ERR_RWLOCK_ERROR; } - uint64_t shard = 0; + // check if this tablet has related running txns. if yes, can not do migration. + res = _check_running_txns(); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "could not migration because has unfinished txns, " + << " tablet=" << _tablet->full_name(); + return res; + } + + std::lock_guard lock(_tablet->get_push_lock()); + // get versions to be migrate + res = _get_versions(start_version, &end_version, &consistent_rowsets); + if (res != OLAP_SUCCESS) { + return res; + } + + // TODO(ygl): the tablet should not under schema change or rollup or load res = _dest_store->get_shard(&shard); if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to get shard from store: " << _dest_store->path(); - break; + return res; } FilePathDescStream root_path_desc_s; root_path_desc_s << _dest_store->path_desc() << DATA_PREFIX << "/" << shard; FilePathDesc full_path_desc = SnapshotManager::instance()->get_schema_hash_full_path( _tablet, root_path_desc_s.path_desc()); - string full_path = full_path_desc.filepath; + full_path = full_path_desc.filepath; // if dir already exist then return err, it should not happen. // should not remove the dir directly, for safety reason. if (FileUtils::check_exist(full_path)) { LOG(INFO) << "schema hash path already exist, skip this path. " - << "full_path=" << full_path; - res = OLAP_ERR_FILE_ALREADY_EXIST; - break; + << "full_path=" << full_path; + return OLAP_ERR_FILE_ALREADY_EXIST; } Status st = FileUtils::create_dir(full_path); if (!st.ok()) { res = OLAP_ERR_CANNOT_CREATE_DIR; LOG(WARNING) << "fail to create path. path=" << full_path - << ", error:" << st.to_string(); - break; + << ", error:" << st.to_string(); + return res; } + } + std::vector temp_consistent_rowsets(consistent_rowsets); + do { // migrate all index and data files but header file - res = _copy_index_and_data_files(full_path, consistent_rowsets); + res = _copy_index_and_data_files(full_path, temp_consistent_rowsets); if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to copy index and data files when migrate. res=" << res; break; } + UniqueWriteLock migration_wlock; + res = _check_running_txns_until_timeout(&migration_wlock); + if (res != OLAP_SUCCESS) { + break; + } + std::lock_guard lock(_tablet->get_push_lock()); + start_version = end_version; + // clear temp rowsets before get remaining versions + temp_consistent_rowsets.clear(); + // get remaining versions + res = _get_versions(end_version + 1, &end_version, &temp_consistent_rowsets); + if (res != OLAP_SUCCESS) { + break; + } + if (start_version < end_version) { + // we have remaining versions to be migrated + consistent_rowsets.insert(consistent_rowsets.end(), + temp_consistent_rowsets.begin(), temp_consistent_rowsets.end()); + LOG(INFO) << "we have remaining versions to be migrated. start_version=" + << start_version << " end_version=" << end_version; + // if the remaining size is less than config::migration_remaining_size_threshold_mb(default 10MB), + // we take the lock to complete it to avoid long-term competition with other tasks + if (_is_rowsets_size_less_than_threshold(temp_consistent_rowsets)) { + // force to copy the remaining data and index + res = _copy_index_and_data_files(full_path, temp_consistent_rowsets); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "fail to copy the remaining index and data files when migrate. res=" << res; + break; + } + } else { + if (_is_timeout()) { + res = OLAP_ERR_HEADER_HAS_PENDING_DATA; + break; + } + // there is too much remaining data here. + // in order not to affect other tasks, release the lock and then copy it + continue; + } + } // generate new tablet meta and write to hdr file - TabletMetaSharedPtr new_tablet_meta(new (std::nothrow) TabletMeta()); - { - ReadLock rdlock(_tablet->get_header_lock()); - _generate_new_header(shard, consistent_rowsets, new_tablet_meta); - } - std::string new_meta_file = full_path + "/" + std::to_string(tablet_id) + ".hdr"; - res = new_tablet_meta->save(new_meta_file); + res = _gen_and_write_header_to_hdr_file(shard, full_path, consistent_rowsets); + if (res != OLAP_SUCCESS) { + break; + } + res = _reload_tablet(full_path); if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to save meta to path: " << new_meta_file; break; } - // reset tablet id and rowset id - res = TabletMeta::reset_tablet_uid(new_meta_file); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "errors while set tablet uid: '" << new_meta_file; - break; - } - // it will change rowset id and its create time - // rowset create time is useful when load tablet from meta to check which tablet is the tablet to load - res = SnapshotManager::instance()->convert_rowset_ids(full_path, tablet_id, schema_hash); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to convert rowset id when do storage migration" - << " path = " << full_path; - break; - } + break; + } while (true); - res = StorageEngine::instance()->tablet_manager()->load_tablet_from_dir( - _dest_store, tablet_id, schema_hash, full_path, false); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to load tablet from new path. tablet_id=" << tablet_id - << " schema_hash=" << schema_hash << " path = " << full_path; - break; - } - - // if old tablet finished schema change, then the schema change status of the new tablet is DONE - // else the schema change status of the new tablet is FAILED - TabletSharedPtr new_tablet = - StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash); - if (new_tablet == nullptr) { - LOG(WARNING) << "tablet not found. tablet_id=" << tablet_id - << " schema_hash=" << schema_hash; - res = OLAP_ERR_TABLE_NOT_FOUND; - break; - } - } while (0); + if (res != OLAP_SUCCESS) { + // we should remove the dir directly for avoid disk full of junk data, and it's safe to remove + FileUtils::remove_all(full_path); + } return res; } diff --git a/be/src/olap/task/engine_storage_migration_task.h b/be/src/olap/task/engine_storage_migration_task.h index 3009f4c127..ffd42a459c 100644 --- a/be/src/olap/task/engine_storage_migration_task.h +++ b/be/src/olap/task/engine_storage_migration_task.h @@ -36,6 +36,24 @@ public: private: OLAPStatus _migrate(); + // check if task is timeout + bool _is_timeout(); + OLAPStatus _get_versions(int32_t start_version, + int32_t* end_version, + std::vector *consistent_rowsets); + OLAPStatus _check_running_txns(); + // caller should not hold migration lock, and 'migration_wlock' should not be nullptr + // ownership of the migration lock is transferred to the caller if check succ + OLAPStatus _check_running_txns_until_timeout(UniqueWriteLock* migration_wlock); + + // if the size less than threshold, return true + bool _is_rowsets_size_less_than_threshold(const std::vector& consistent_rowsets); + + OLAPStatus _gen_and_write_header_to_hdr_file( + uint64_t shard, + const std::string& full_path, + const std::vector& consistent_rowsets); + OLAPStatus _reload_tablet(const std::string& full_path); void _generate_new_header(uint64_t new_shard, const std::vector& consistent_rowsets, @@ -52,7 +70,7 @@ private: TabletSharedPtr _tablet; // destination data dir DataDir* _dest_store; - + int64_t _task_start_time; }; // EngineTask } // namespace doris diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index 5e4d15f777..81a551f921 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -26,6 +26,7 @@ ADD_BE_TEST(row_block_test) ADD_BE_TEST(row_block_v2_test) ADD_BE_TEST(bit_field_test) ADD_BE_TEST(byte_buffer_test) +ADD_BE_TEST(engine_storage_migration_task_test) ADD_BE_TEST(run_length_byte_test) ADD_BE_TEST(run_length_integer_test) ADD_BE_TEST(stream_index_test) diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp new file mode 100644 index 0000000000..68c88f853f --- /dev/null +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/task/engine_storage_migration_task.h" + +#include +#include + +#include + +#include "gen_cpp/Descriptors_types.h" +#include "gen_cpp/PaloInternalService_types.h" +#include "gen_cpp/Types_types.h" +#include "olap/delta_writer.h" +#include "olap/field.h" +#include "olap/options.h" +#include "olap/storage_engine.h" +#include "olap/tablet.h" +#include "olap/tablet_meta_manager.h" +#include "olap/utils.h" +#include "runtime/descriptor_helper.h" +#include "runtime/exec_env.h" +#include "runtime/mem_pool.h" +#include "runtime/mem_tracker.h" +#include "runtime/tuple.h" +#include "util/file_utils.h" +#include "util/logging.h" + +namespace doris { + +static const uint32_t MAX_PATH_LEN = 1024; + +StorageEngine* k_engine = nullptr; +std::string path1; +std::string path2; + +void set_up() { + char buffer[MAX_PATH_LEN]; + ASSERT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); + path1 = std::string(buffer) + "/data_test_1"; + path2 = std::string(buffer) + "/data_test_2"; + config::storage_root_path = path1 + ";" + path2; + FileUtils::remove_all(path1); + FileUtils::create_dir(path1); + + FileUtils::remove_all(path2); + FileUtils::create_dir(path2); + std::vector paths; + paths.emplace_back(path1, -1); + paths.emplace_back(path2, -1); + + doris::EngineOptions options; + options.store_paths = paths; + Status s = doris::StorageEngine::open(options, &k_engine); + ASSERT_TRUE(s.ok()) << s.to_string(); + + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_storage_engine(k_engine); + k_engine->start_bg_threads(); +} + +void tear_down() { + if (k_engine != nullptr) { + k_engine->stop(); + delete k_engine; + k_engine = nullptr; + } + ASSERT_EQ(system("rm -rf ./data_test_1"), 0); + ASSERT_EQ(system("rm -rf ./data_test_2"), 0); + FileUtils::remove_all(std::string(getenv("DORIS_HOME")) + UNUSED_PREFIX); +} + +void create_tablet_request_with_sequence_col(int64_t tablet_id, int32_t schema_hash, + TCreateTabletReq* request) { + request->tablet_id = tablet_id; + request->__set_version(1); + request->tablet_schema.schema_hash = schema_hash; + request->tablet_schema.short_key_column_count = 2; + request->tablet_schema.keys_type = TKeysType::UNIQUE_KEYS; + request->tablet_schema.storage_type = TStorageType::COLUMN; + request->tablet_schema.__set_sequence_col_idx(2); + + TColumn k1; + k1.column_name = "k1"; + k1.__set_is_key(true); + k1.column_type.type = TPrimitiveType::TINYINT; + request->tablet_schema.columns.push_back(k1); + + TColumn k2; + k2.column_name = "k2"; + k2.__set_is_key(true); + k2.column_type.type = TPrimitiveType::SMALLINT; + request->tablet_schema.columns.push_back(k2); + + TColumn sequence_col; + sequence_col.column_name = SEQUENCE_COL; + sequence_col.__set_is_key(false); + sequence_col.column_type.type = TPrimitiveType::INT; + sequence_col.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(sequence_col); + + TColumn v1; + v1.column_name = "v1"; + v1.__set_is_key(false); + v1.column_type.type = TPrimitiveType::DATETIME; + v1.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(v1); +} + +TDescriptorTable create_descriptor_tablet_with_sequence_col() { + TDescriptorTableBuilder dtb; + TTupleDescriptorBuilder tuple_builder; + + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_TINYINT).column_name("k1").column_pos(0).build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_SMALLINT).column_name("k2").column_pos(1).build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name(SEQUENCE_COL) + .column_pos(2) + .build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_DATETIME).column_name("v1").column_pos(3).build()); + tuple_builder.build(&dtb); + + return dtb.desc_tbl(); +} + +class TestEngineStorageMigrationTask : public ::testing::Test { +public: + TestEngineStorageMigrationTask() {} + ~TestEngineStorageMigrationTask() {} + + void SetUp() { + // Create local data dir for StorageEngine. + std::cout << "setup" << std::endl; + } + + void TearDown() { + // Remove all dir. + std::cout << "tear down" << std::endl; + //doris::tear_down(); + //ASSERT_EQ(OLAP_SUCCESS, remove_all_dir(config::storage_root_path)); + } +}; + +TEST_F(TestEngineStorageMigrationTask, write_and_migration) { + TCreateTabletReq request; + create_tablet_request_with_sequence_col(10005, 270068377, &request); + OLAPStatus res = k_engine->create_tablet(request); + ASSERT_EQ(OLAP_SUCCESS, res); + + TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col(); + ObjectPool obj_pool; + DescriptorTbl* desc_tbl = nullptr; + DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); + TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); + const std::vector& slots = tuple_desc->slots(); + + PUniqueId load_id; + load_id.set_hi(0); + load_id.set_lo(0); + WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003, + 30003, load_id, tuple_desc, &(tuple_desc->slots())}; + DeltaWriter* delta_writer = nullptr; + DeltaWriter::open(&write_req, &delta_writer); + ASSERT_NE(delta_writer, nullptr); + + MemTracker tracker; + MemPool pool(&tracker); + // Tuple 1 + { + Tuple* tuple = reinterpret_cast(pool.allocate(tuple_desc->byte_size())); + memset(tuple, 0, tuple_desc->byte_size()); + *(int8_t*)(tuple->get_slot(slots[0]->tuple_offset())) = 123; + *(int16_t*)(tuple->get_slot(slots[1]->tuple_offset())) = 456; + *(int32_t*)(tuple->get_slot(slots[2]->tuple_offset())) = 1; + ((DateTimeValue*)(tuple->get_slot(slots[3]->tuple_offset()))) + ->from_date_str("2020-07-16 19:39:43", 19); + + res = delta_writer->write(tuple); + ASSERT_EQ(OLAP_SUCCESS, res); + } + + res = delta_writer->close(); + ASSERT_EQ(OLAP_SUCCESS, res); + res = delta_writer->close_wait(nullptr, false); + ASSERT_EQ(OLAP_SUCCESS, res); + + // publish version success + TabletSharedPtr tablet = + k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash); + std::cout << "before publish, tablet row nums:" << tablet->num_rows() << std::endl; + OlapMeta* meta = tablet->data_dir()->get_meta(); + Version version; + version.first = tablet->rowset_with_max_version()->end_version() + 1; + version.second = tablet->rowset_with_max_version()->end_version() + 1; + std::cout << "start to add rowset version:" << version.first << "-" << version.second + << std::endl; + std::map tablet_related_rs; + StorageEngine::instance()->txn_manager()->get_txn_related_tablets( + write_req.txn_id, write_req.partition_id, &tablet_related_rs); + for (auto& tablet_rs : tablet_related_rs) { + std::cout << "start to publish txn" << std::endl; + RowsetSharedPtr rowset = tablet_rs.second; + res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, + write_req.tablet_id, write_req.schema_hash, + tablet_rs.first.tablet_uid, version); + ASSERT_EQ(OLAP_SUCCESS, res); + std::cout << "start to add inc rowset:" << rowset->rowset_id() + << ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first + << "-" << rowset->version().second << std::endl; + res = tablet->add_inc_rowset(rowset); + ASSERT_EQ(OLAP_SUCCESS, res); + } + ASSERT_EQ(1, tablet->num_rows()); + // we should sleep 1 second for the migrated tablet has different time with the current tablet + sleep(1); + + // test case 1 + // prepare + DataDir* dest_store = nullptr; + if (tablet->data_dir()->path() == path1) { + dest_store = StorageEngine::instance()->get_store(path2); + } else if (tablet->data_dir()->path() == path2) { + dest_store = StorageEngine::instance()->get_store(path1); + } + ASSERT_NE(dest_store, nullptr); + std::cout << "dest store:" << dest_store->path() << std::endl; + // migrating + EngineStorageMigrationTask engine_task(tablet, dest_store); + res = engine_task.execute(); + ASSERT_EQ(OLAP_SUCCESS, res); + // reget the tablet from manager after migration + auto tablet_id = 10005; + auto schema_hash = 270068377; + TabletSharedPtr tablet2 = k_engine->tablet_manager()->get_tablet(tablet_id, schema_hash); + // check path + ASSERT_EQ(tablet2->data_dir()->path(), dest_store->path()); + // check rows + ASSERT_EQ(1, tablet2->num_rows()); + // tablet2 should not equal to tablet + ASSERT_NE(tablet2, tablet); + + // test case 2 + // migrate tablet2 back to the tablet's path + // sleep 1 second for update time + sleep(1); + dest_store = StorageEngine::instance()->get_store(tablet->data_dir()->path()); + ASSERT_NE(dest_store, nullptr); + ASSERT_NE(dest_store->path(), tablet2->data_dir()->path()); + std::cout << "dest store:" << dest_store->path() << std::endl; + EngineStorageMigrationTask engine_task2(tablet2, dest_store); + res = engine_task2.execute(); + ASSERT_EQ(OLAP_SUCCESS, res); + TabletSharedPtr tablet3 = k_engine->tablet_manager()->get_tablet(tablet_id, schema_hash); + // check path + ASSERT_EQ(tablet3->data_dir()->path(), tablet->data_dir()->path()); + // check rows + ASSERT_EQ(1, tablet3->num_rows()); + // orgi_tablet should not equal to new_tablet and tablet + ASSERT_NE(tablet3, tablet2); + ASSERT_NE(tablet3, tablet); + // test case 2 end + + res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash); + ASSERT_EQ(OLAP_SUCCESS, res); + delete delta_writer; +} + +} // namespace doris + +int main(int argc, char** argv) { + std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; + if (!doris::config::init(conffile.c_str(), false)) { + fprintf(stderr, "error read config file. \n"); + return -1; + } + int ret = doris::OLAP_SUCCESS; + testing::InitGoogleTest(&argc, argv); + doris::CpuInfo::init(); + doris::set_up(); + ret = RUN_ALL_TESTS(); + doris::tear_down(); + google::protobuf::ShutdownProtobufLibrary(); + return ret; +} diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index 896805f6c2..b96c51e7ac 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -578,10 +578,12 @@ module.exports = [ directoryPath: "Administration/", initialOpenGroupIndex: -1, children: [ + "ADMIN CANCEL REBALANCE DISK", "ADMIN CANCEL REPAIR", "ADMIN CLEAN TRASH", "ADMIN CHECK TABLET", "ADMIN COMPACT", + "ADMIN REBALANCE DISK", "ADMIN REPAIR", "ADMIN SET CONFIG", "ADMIN SET REPLICA STATUS", diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index 30c5fc29c3..fc7c6bd9d4 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -591,10 +591,12 @@ module.exports = [ directoryPath: "Administration/", initialOpenGroupIndex: -1, children: [ + "ADMIN CANCEL REBALANCE DISK", "ADMIN CANCEL REPAIR", "ADMIN CLEAN TRASH", "ADMIN CHECK TABLET", "ADMIN COMPACT", + "ADMIN REBALANCE DISK", "ADMIN REPAIR", "ADMIN SET CONFIG", "ADMIN SET REPLICA STATUS", diff --git a/docs/en/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE DISK.md b/docs/en/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE DISK.md new file mode 100644 index 0000000000..475e266306 --- /dev/null +++ b/docs/en/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE DISK.md @@ -0,0 +1,51 @@ +--- +{ + "title": "ADMIN CANCEL REBALANCE DISK", + "language": "en" +} +--- + + + +# ADMIN CANCEL REBALANCE DISK +## Description + +This statement is used to cancel rebalancing disks of specified backends with high priority + +Grammar: + +ADMIN CANCEL REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1", "BackendHost2:BackendHeartBeatPort2", ...)]; + +Explain: + +1. This statement only indicates that the system no longer rebalance disks of specified backends with high priority. The system will still rebalance disks by default scheduling. + +## example + +1. Cancel High Priority Disk Rebalance of all of backends of the cluster + +ADMIN CANCEL REBALANCE DISK; + +2. Cancel High Priority Disk Rebalance of specified backends + +ADMIN CANCEL REBALANCE DISK ON ("192.168.1.1:1234", "192.168.1.2:1234"); + +## keyword +ADMIN,CANCEL,REBALANCE DISK diff --git a/docs/en/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md b/docs/en/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md new file mode 100644 index 0000000000..6e1c1aaa34 --- /dev/null +++ b/docs/en/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md @@ -0,0 +1,52 @@ +--- +{ + "title": "ADMIN REBALANCE DISK", + "language": "en" +} +--- + + + +# ADMIN REBALANCE DISK +## Description + +This statement is used to try to rebalance disks of the specified backends first, no matter if the cluster is balanced + +Grammar: + +ADMIN REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1", "BackendHost2:BackendHeartBeatPort2", ...)]; + +Explain: + +1. This statement only means that the system attempts to rebalance disks of specified backends with high priority, no matter if the cluster is balanced. +2. The default timeout is 24 hours. Timeout means that the system will no longer rebalance disks of specified backends with high priority. The command settings need to be reused. + +## example + +1. Attempt to rebalance disks of all backends + +ADMIN REBALANCE DISK; + +2. Attempt to rebalance disks oof the specified backends + +ADMIN REBALANCE DISK ON ("192.168.1.1:1234", "192.168.1.2:1234"); + +## keyword +ADMIN,REBALANCE,DISK diff --git a/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE DISK.md b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE DISK.md new file mode 100644 index 0000000000..e6978107c9 --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE DISK.md @@ -0,0 +1,52 @@ +--- +{ + "title": "ADMIN CANCEL REBALANCE DISK", + "language": "zh-CN" +} +--- + + + +# ADMIN CANCEL REBALANCE DISK +## description + + 该语句用于取消优先均衡BE的磁盘 + + 语法: + + ADMIN CANCEL REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1", "BackendHost2:BackendHeartBeatPort2", ...)]; + + 说明: + + 1. 该语句仅表示系统不再优先均衡指定BE的磁盘数据。系统仍会以默认调度方式均衡BE的磁盘数据。 + +## example + + 1. 取消集群所有BE的优先磁盘均衡 + + ADMIN CANCEL REBALANCE DISK; + + 2. 取消指定BE的优先磁盘均衡 + + ADMIN CANCEL REBALANCE DISK ON ("192.168.1.1:1234", "192.168.1.2:1234"); + +## keyword + ADMIN,CANCEL,REBALANCE,DISK + diff --git a/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md new file mode 100644 index 0000000000..0bb78f5379 --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md @@ -0,0 +1,54 @@ +--- +{ + "title": "ADMIN REBALANCE DISK", + "language": "zh-CN" +} +--- + + + +# ADMIN REBALANCE DISK +## description + + 该语句用于尝试优先均衡指定的BE磁盘数据 + + 语法: + + ADMIN REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1", "BackendHost2:BackendHeartBeatPort2", ...)]; + + 说明: + + 1. 该语句表示让系统尝试优先均衡指定BE的磁盘数据,不受限于集群是否均衡。 + 2. 默认的 timeout 是 24小时。超时意味着系统将不再优先均衡指定的BE磁盘数据。需要重新使用该命令设置。 + 3. 指定BE的磁盘数据均衡后,该BE的优先级将会失效。 + +## example + + 1. 尝试优先均衡集群内的所有BE + + ADMIN REBALANCE DISK; + + 2. 尝试优先均衡指定BE + + ADMIN REBALANCE DISK ON ("192.168.1.1:1234", "192.168.1.2:1234"); + +## keyword + ADMIN,REBALANCE,DISK + diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index da303888fb..2bf9da4afc 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -241,7 +241,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED, KW_COMPACT, KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_CONVERT, KW_COUNT, KW_CREATE, KW_CREATION, KW_CROSS, KW_CUBE, KW_CURRENT, KW_CURRENT_USER, KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DAY, KW_DECIMAL, KW_DECOMMISSION, KW_DEFAULT, KW_DESC, KW_DESCRIBE, - KW_DELETE, KW_UPDATE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE, + KW_DELETE, KW_UPDATE, KW_DISK, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE, KW_ELSE, KW_ENABLE, KW_ENCRYPTKEY, KW_ENCRYPTKEYS, KW_END, KW_ENGINE, KW_ENGINES, KW_ENTER, KW_ERRORS, KW_EVENTS, KW_EXCEPT, KW_EXCLUDE, KW_EXISTS, KW_EXPORT, KW_EXTENDED, KW_EXTERNAL, KW_EXTRACT, KW_FALSE, KW_FEATURE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM, KW_FIELDS, KW_FILE, KW_FILTER, KW_FIRST, KW_FLOAT, KW_FOR, KW_FORCE, KW_FORMAT, KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION, KW_FUNCTIONS, @@ -260,7 +260,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A KW_PLUGIN, KW_PLUGINS, KW_PROC, KW_PROCEDURE, KW_PROCESSLIST, KW_PROFILE, KW_PROPERTIES, KW_PROPERTY, KW_QUERY, KW_QUOTA, - KW_RANDOM, KW_RANGE, KW_READ, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RELEASE, KW_RENAME, + KW_RANDOM, KW_RANGE, KW_READ, KW_REBALANCE, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RELEASE, KW_RENAME, KW_REPAIR, KW_REPEATABLE, KW_REPOSITORY, KW_REPOSITORIES, KW_REPLACE, KW_REPLACE_IF_NOT_NULL, KW_REPLICA, KW_RESOURCE, KW_RESOURCES, KW_RESTORE, KW_RETURNS, KW_RESUME, KW_REVOKE, KW_RIGHT, KW_ROLE, KW_ROLES, KW_ROLLBACK, KW_ROLLUP, KW_ROUTINE, KW_ROW, KW_ROWS, KW_S3, KW_SCHEMA, KW_SCHEMAS, KW_SECOND, KW_SELECT, KW_SEMI, KW_SERIALIZABLE, KW_SESSION, KW_SET, KW_SETS, KW_SHOW, KW_SIGNED, @@ -5297,6 +5297,22 @@ admin_stmt ::= {: RESULT = new AdminCheckTabletsStmt(tabletIds, properties); :} + | KW_ADMIN KW_REBALANCE KW_DISK KW_ON LPAREN string_list:backends RPAREN + {: + RESULT = new AdminRebalanceDiskStmt(backends); + :} + | KW_ADMIN KW_REBALANCE KW_DISK + {: + RESULT = new AdminRebalanceDiskStmt(null); + :} + | KW_ADMIN KW_CANCEL KW_REBALANCE KW_DISK KW_ON LPAREN string_list:backends RPAREN + {: + RESULT = new AdminCancelRebalanceDiskStmt(backends); + :} + | KW_ADMIN KW_CANCEL KW_REBALANCE KW_DISK + {: + RESULT = new AdminCancelRebalanceDiskStmt(null); + :} | KW_ADMIN KW_CLEAN KW_TRASH KW_ON LPAREN string_list:backends RPAREN {: RESULT = new AdminCleanTrashStmt(backends); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java new file mode 100644 index 0000000000..3e9bab3f21 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.mysql.privilege.PrivPredicate; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AdminCancelRebalanceDiskStmt extends DdlStmt { + private List backends = Lists.newArrayList(); + + public AdminCancelRebalanceDiskStmt(List backends) { + ImmutableMap backendsInfo = Catalog.getCurrentSystemInfo().getIdToBackend(); + Map backendsID = new HashMap(); + for (Backend backend : backendsInfo.values()) { + backendsID.put(String.valueOf(backend.getHost()) + ":" + String.valueOf(backend.getHeartbeatPort()), backend.getId()); + } + if (backends == null) { + for (Backend backend : backendsInfo.values()) { + this.backends.add(backend); + } + } else { + for (String backend : backends) { + if (backendsID.get(backend) != null) { + this.backends.add(backendsInfo.get(backendsID.get(backend))); + backendsID.remove(backend); // avoid repetition + } + } + } + } + + public List getBackends() { + return backends; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.NO_FORWARD; + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java new file mode 100644 index 0000000000..c8f0aa6b76 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.mysql.privilege.PrivPredicate; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AdminRebalanceDiskStmt extends DdlStmt { + private List backends = Lists.newArrayList(); + private long timeoutS = 0; + + public AdminRebalanceDiskStmt(List backends) { + ImmutableMap backendsInfo = Catalog.getCurrentSystemInfo().getIdToBackend(); + Map backendsID = new HashMap(); + for (Backend backend : backendsInfo.values()) { + backendsID.put(String.valueOf(backend.getHost()) + ":" + String.valueOf(backend.getHeartbeatPort()), backend.getId()); + } + if (backends == null) { + for (Backend backend : backendsInfo.values()) { + this.backends.add(backend); + } + } else { + for (String backend : backends) { + if (backendsID.get(backend) != null) { + this.backends.add(backendsInfo.get(backendsID.get(backend))); + backendsID.remove(backend); // avoid repetition + } + } + } + timeoutS = 24 * 3600; // default 24 hours + } + + public List getBackends() { + return backends; + } + + public long getTimeoutS() { + return timeoutS; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.NO_FORWARD; + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java index 8c575d8cb9..f01c0175c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java @@ -246,8 +246,8 @@ public class BackendLoadStatistic { } } - LOG.debug("classify path by load. storage: {} avg used percent: {}. low/mid/high: {}/{}/{}", - avgUsedPercent, medium, lowCounter, midCounter, highCounter); + LOG.debug("classify path by load. be id: {} storage: {} avg used percent: {}. low/mid/high: {}/{}/{}", + beId, medium, avgUsedPercent, lowCounter, midCounter, highCounter); } public void calcScore(Map avgClusterUsedCapacityPercentMap, @@ -315,6 +315,60 @@ public class BackendLoadStatistic { return status; } + /* + * Check whether the backend can be more balance if we migrate a tablet with size 'tabletSize' from + * `srcPath` to 'destPath' + * 1. recalculate the load score of src and dest path after migrate the tablet. + * 2. if the summary of the diff between the new score and average score becomes smaller, we consider it + * as more balance. + */ + public boolean isMoreBalanced(long srcPath, long destPath, long tabletId, long tabletSize, + TStorageMedium medium) { + long totalCapacity = 0; + long totalUsedCapacity = 0; + RootPathLoadStatistic srcPathStat = null; + RootPathLoadStatistic destPathStat = null; + for (RootPathLoadStatistic pathStat : pathStatistics) { + if (pathStat.getStorageMedium() == medium) { + totalCapacity += pathStat.getCapacityB(); + totalUsedCapacity += pathStat.getUsedCapacityB(); + if (pathStat.getPathHash() == srcPath) { + srcPathStat = pathStat; + } else if (pathStat.getPathHash() == destPath) { + destPathStat = pathStat; + } + } + } + if (srcPathStat == null || destPathStat == null) { + LOG.info("migrate {}(size: {}) from {} to {} failed, medium: {}, src or dest path stat does not exist.", + tabletId, tabletSize, srcPath, destPath, medium); + return false; + } + double avgUsedPercent = totalCapacity == 0 ? 0.0 : totalUsedCapacity / (double) totalCapacity; + double currentSrcPathScore = srcPathStat.getCapacityB() == 0 + ? 0.0 : srcPathStat.getUsedCapacityB() / (double) srcPathStat.getCapacityB(); + double currentDestPathScore = destPathStat.getCapacityB() == 0 + ? 0.0 : destPathStat.getUsedCapacityB() / (double) destPathStat.getCapacityB(); + + double newSrcPathScore = srcPathStat.getCapacityB() == 0 + ? 0.0 : (srcPathStat.getUsedCapacityB() - tabletSize) / (double) srcPathStat.getCapacityB(); + double newDestPathScore = destPathStat.getCapacityB() == 0 + ? 0.0 : (destPathStat.getUsedCapacityB() + tabletSize) / (double) destPathStat.getCapacityB(); + + double currentDiff = Math.abs(currentSrcPathScore - avgUsedPercent) + + Math.abs(currentDestPathScore - avgUsedPercent); + double newDiff = Math.abs(newSrcPathScore - avgUsedPercent) + Math.abs(newDestPathScore - avgUsedPercent); + + LOG.debug("after migrate {}(size: {}) from {} to {}, medium: {}, the load score changed." + + " src: {} -> {}, dest: {}->{}, average score: {}. current diff: {}, new diff: {}," + + " more balanced: {}", + tabletId, tabletSize, srcPath, destPath, medium, currentSrcPathScore, newSrcPathScore, + currentDestPathScore, newDestPathScore, avgUsedPercent, currentDiff, newDiff, + (newDiff < currentDiff)); + + return newDiff < currentDiff; + } + public boolean hasAvailDisk() { for (RootPathLoadStatistic rootPathLoadStatistic : pathStatistics) { if (rootPathLoadStatistic.getDiskState() == DiskState.ONLINE) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java new file mode 100644 index 0000000000..f32e31a7ef --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java @@ -0,0 +1,334 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.clone.SchedException.Status; +import org.apache.doris.clone.TabletSchedCtx.Priority; +import org.apache.doris.clone.TabletSchedCtx.BalanceType; +import org.apache.doris.clone.TabletScheduler.PathSlot; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TStorageMedium; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/* + + * This DiskBalancer is different from other Balancers which takes care of cluster-wide data balancing. + * This DiskBalancer chooses a backend and moves tablet from one disk to another. + * DiskRebalancer strategy: + * 1. only works while the cluster is balanced(which means the cluster has no high and mid load backends) + * 1.1 if user has given prio backends, then select tablets from prio backends no matter cluster is balanced or not. + * 2. selecting alternative tablets from mid load backends, and return them to tablet scheduler. + * 3. given a tablet which has src path(disk), find a path(disk) to migration. + */ +public class DiskRebalancer extends Rebalancer { + private static final Logger LOG = LogManager.getLogger(DiskRebalancer.class); + + public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) { + super(infoService, invertedIndex); + } + + public List filterByPrioBackends(List bes) { + List stats = Lists.newArrayList(); + for (BackendLoadStatistic backend : bes) { + long backendId = backend.getBeId(); + Long timeoutS = prioBackends.getOrDefault(backendId, 0L); + if (timeoutS != 0) { + if (timeoutS > System.currentTimeMillis()) { + // remove backends from prio if timeout + prioBackends.remove(backendId); + continue; + } + stats.add(backend); + } + } + return stats; + } + + // true means BE has low and high paths for balance after reclassification + private boolean checkAndReclassifyPaths(Set pathLow, Set pathMid, Set pathHigh) { + if (pathLow.isEmpty() && pathHigh.isEmpty()) { + // balanced + return false; + } + if (pathLow.isEmpty()) { + // mid => low + pathLow.addAll(pathMid); + } else if (pathHigh.isEmpty()) { + // mid => high + pathHigh.addAll(pathMid); + } + if (pathLow.isEmpty() || pathHigh.isEmpty()) { + // check again + return false; + } + return true; + } + + /* + * Try to select alternative tablets to balance the disks. + * 1. Classify the backend into low, mid and high class by load score. + * 2. Try to select tablets from mid load backends. + * 1. Here we only select alternative tablets, without considering selected tablets' status, + * and whether it is benefit for balance (All these will be checked in tablet scheduler) + * 2. Only select tablets from 'mid' backends. + * 3. Only select tablets from 'high' paths. + * 3. Try to select tablets from prio backends. + * + * Here we only select tablets from mid load node, do not set its dest, all this will be set + * when this tablet is being scheduled in tablet scheduler. + * + * NOTICE that we may select any available tablets here, ignore their state. + * The state will be checked when being scheduled in tablet scheduler. + */ + @Override + protected List selectAlternativeTabletsForCluster( + ClusterLoadStatistic clusterStat, TStorageMedium medium) { + String clusterName = clusterStat.getClusterName(); + List alternativeTablets = Lists.newArrayList(); + + // get classification of backends + List lowBEs = Lists.newArrayList(); + List midBEs = Lists.newArrayList(); + List highBEs = Lists.newArrayList(); + clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium); + + if (!(lowBEs.isEmpty() && highBEs.isEmpty())) { + // the cluster is not balanced + if (prioBackends.isEmpty()) { + LOG.info("cluster is not balanced: {} with medium: {}. skip", clusterName, medium); + return alternativeTablets; + } else { + // prioBEs are not empty, we only schedule prioBEs' disk balance task + midBEs.addAll(lowBEs); + midBEs.addAll(highBEs); + midBEs = filterByPrioBackends(midBEs); + } + } + + // first we should check if mid backends is available. + // if all mid backends is not available, we should not start balance + if (midBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) { + LOG.info("all mid load backends is dead: {} with medium: {}. skip", + lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium); + return alternativeTablets; + } + + if (midBEs.stream().noneMatch(BackendLoadStatistic::hasAvailDisk)) { + LOG.info("all mid load backends {} have no available disk with medium: {}. skip", + lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium); + return alternativeTablets; + } + + Set unbalancedBEs = Sets.newHashSet(); + // choose tablets from backends randomly. + Collections.shuffle(midBEs); + for (int i = midBEs.size() - 1; i >= 0; i--) { + BackendLoadStatistic beStat = midBEs.get(i); + + // classify the paths. + Set pathLow = Sets.newHashSet(); + Set pathMid = Sets.newHashSet(); + Set pathHigh = Sets.newHashSet(); + // we only select tablets from available high load path + beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium); + // check if BE has low and high paths for balance after reclassification + if (!checkAndReclassifyPaths(pathLow, pathMid, pathHigh)) { + continue; + } + + // get all tablets on this backend, and shuffle them for random selection + List tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(beStat.getBeId(), medium); + Collections.shuffle(tabletIds); + + // for each path, we try to select at most BALANCE_SLOT_NUM_FOR_PATH tablets + Map remainingPaths = Maps.newHashMap(); + for (Long pathHash : pathHigh) { + remainingPaths.put(pathHash, TabletScheduler.BALANCE_SLOT_NUM_FOR_PATH); + } + + if (remainingPaths.isEmpty()) { + return alternativeTablets; + } + + // select tablet from shuffled tablets + for (Long tabletId : tabletIds) { + Replica replica = invertedIndex.getReplica(tabletId, beStat.getBeId()); + if (replica == null) { + continue; + } + // ignore empty replicas as they do not make disk more balance. (disk usage) + if (replica.getDataSize() == 0) { + continue; + } + + // check if replica's is on 'high' path. + // and only select it if the selected tablets num of this path + // does not exceed the limit (BALANCE_SLOT_NUM_FOR_PATH). + long replicaPathHash = replica.getPathHash(); + if (remainingPaths.containsKey(replicaPathHash)) { + TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId); + if (tabletMeta == null) { + continue; + } + + TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, clusterName, + tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(), + tabletMeta.getIndexId(), tabletId, null /* replica alloc is not used for balance*/, + System.currentTimeMillis()); + // we set temp src here to simplify completeSchedCtx method, and avoid take slot here + tabletCtx.setTempSrc(replica); + tabletCtx.setTag(clusterStat.getTag()); + if (prioBackends.containsKey(beStat.getBeId())) { + // priority of balance task of prio BE is NORMAL + tabletCtx.setOrigPriority(Priority.NORMAL); + } else { + // balance task's default priority is LOW + tabletCtx.setOrigPriority(Priority.LOW); + } + // we must set balanceType to DISK_BALANCE for create migration task + tabletCtx.setBalanceType(BalanceType.DISK_BALANCE); + + alternativeTablets.add(tabletCtx); + unbalancedBEs.add(beStat.getBeId()); + // update remaining paths + int remaining = remainingPaths.get(replicaPathHash) - 1; + if (remaining <= 0) { + remainingPaths.remove(replicaPathHash); + } else { + remainingPaths.put(replicaPathHash, remaining); + } + } + } + } // end for mid backends + + // remove balanced BEs from prio backends + prioBackends.keySet().removeIf(id -> !unbalancedBEs.contains(id)); + LOG.info("select alternative tablets for cluster: {}, medium: {}, num: {}, detail: {}", + clusterName, medium, alternativeTablets.size(), + alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray()); + return alternativeTablets; + } + + /* + * Create a StorageMediaMigrationTask of this selected tablet for balance. + * 1. Check if the cluster is balanced. if not, the balance will be cancelled. + * 2. Check if the src replica still on high load path. If not, the balance will be cancelled. + * 3. Select a low load path from this backend as destination. + */ + @Override + public void completeSchedCtx(TabletSchedCtx tabletCtx, Map backendsWorkingSlots) throws SchedException { + ClusterLoadStatistic clusterStat = statisticMap.get(tabletCtx.getCluster(), tabletCtx.getTag()); + if (clusterStat == null) { + throw new SchedException(Status.UNRECOVERABLE, "cluster does not exist"); + } + if (tabletCtx.getTempSrcBackendId() == -1 || tabletCtx.getTempSrcPathHash() == -1) { + throw new SchedException(Status.UNRECOVERABLE, + "src does not appear to be set correctly, something goes wrong"); + } + Replica replica = invertedIndex.getReplica(tabletCtx.getTabletId(), tabletCtx.getTempSrcBackendId()); + // check src replica still there + if (replica == null || replica.getPathHash() != tabletCtx.getTempSrcPathHash()) { + throw new SchedException(Status.UNRECOVERABLE, "src replica may be rebalanced"); + } + // ignore empty replicas as they do not make disk more balance + if (replica.getDataSize() == 0) { + throw new SchedException(Status.UNRECOVERABLE, "size of src replica is zero"); + } + // check src slot + PathSlot slot = backendsWorkingSlots.get(replica.getBackendId()); + if (slot == null) { + LOG.debug("BE does not have slot: {}", replica.getBackendId()); + throw new SchedException(Status.UNRECOVERABLE, "unable to take src slot"); + } + long pathHash = slot.takeBalanceSlot(replica.getPathHash()); + if (pathHash == -1) { + throw new SchedException(Status.UNRECOVERABLE, "unable to take src slot"); + } + // after take src slot, we can set src replica now + tabletCtx.setSrc(replica); + + BackendLoadStatistic beStat = clusterStat.getBackendLoadStatistic(replica.getBackendId()); + if (!beStat.isAvailable()) { + throw new SchedException(Status.UNRECOVERABLE, "the backend is not available"); + } + // classify the paths. + // If src path is 'high', then we can select path from 'low' and 'mid' + // If src path is 'mid', then we can only select path from 'low' + // If src path is 'low', then we have nothing to do + Set pathLow = Sets.newHashSet(); + Set pathMid = Sets.newHashSet(); + Set pathHigh = Sets.newHashSet(); + beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium()); + if (pathHigh.contains(replica.getPathHash())) { + pathLow.addAll(pathMid); + } else if (!pathMid.contains(replica.getPathHash())) { + throw new SchedException(Status.UNRECOVERABLE, "src path is low load"); + } + // check if this migration task can make the be's disks more balance. + List availPaths = Lists.newArrayList(); + BalanceStatus bs; + if ((bs = beStat.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), availPaths, + false /* not supplement */)) != BalanceStatus.OK) { + LOG.debug("tablet not fit in BE {}, reason: {}", beStat.getBeId(), bs.getErrMsgs()); + throw new SchedException(Status.UNRECOVERABLE, "tablet not fit in BE"); + } + // Select a low load path as destination. + boolean setDest = false; + for (RootPathLoadStatistic stat : availPaths) { + // check if avail path is src path + if (stat.getPathHash() == replica.getPathHash()) { + continue; + } + // check if avail path is low path + if (!pathLow.contains(stat.getPathHash())) { + LOG.debug("the path :{} is not low load", stat.getPathHash()); + continue; + } + if (!beStat.isMoreBalanced(tabletCtx.getSrcPathHash(), stat.getPathHash(), + tabletCtx.getTabletId(), tabletCtx.getTabletSize(), tabletCtx.getStorageMedium())) { + LOG.debug("the path :{} can not make more balance", stat.getPathHash()); + continue; + } + long destPathHash = slot.takeBalanceSlot(stat.getPathHash()); + if (destPathHash == -1) { + throw new SchedException(Status.UNRECOVERABLE, "unable to take dest slot"); + } + tabletCtx.setDest(beStat.getBeId(), destPathHash, stat.getPath()); + setDest = true; + break; + } + + if (!setDest) { + throw new SchedException(Status.UNRECOVERABLE, "unable to find low load path"); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java index 1fca40652c..a7177c2f54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java @@ -20,11 +20,13 @@ package org.apache.doris.clone; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.resource.Tag; +import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; -import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTask; import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Maps; import com.google.common.collect.Lists; import com.google.common.collect.Table; @@ -50,6 +52,8 @@ public abstract class Rebalancer { protected Table statisticMap = HashBasedTable.create(); protected TabletInvertedIndex invertedIndex; protected SystemInfoService infoService; + // be id -> end time of prio + protected Map prioBackends = Maps.newConcurrentMap(); public Rebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) { this.infoService = infoService; @@ -71,10 +75,14 @@ public abstract class Rebalancer { protected abstract List selectAlternativeTabletsForCluster( ClusterLoadStatistic clusterStat, TStorageMedium medium); - public void createBalanceTask(TabletSchedCtx tabletCtx, Map backendsWorkingSlots, - AgentBatchTask batchTask) throws SchedException { + public AgentTask createBalanceTask(TabletSchedCtx tabletCtx, Map backendsWorkingSlots) + throws SchedException { completeSchedCtx(tabletCtx, backendsWorkingSlots); - batchTask.addTask(tabletCtx.createCloneReplicaAndTask()); + if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.BE_BALANCE) { + return tabletCtx.createCloneReplicaAndTask(); + } else { + return tabletCtx.createStorageMediaMigrationTask(); + } } // Before createCloneReplicaAndTask, we need to complete the TabletSchedCtx. @@ -93,4 +101,21 @@ public abstract class Rebalancer { public void updateLoadStatistic(Table statisticMap) { this.statisticMap = statisticMap; } + + public void addPrioBackends(List backends, long timeoutS) { + long currentTimeMillis = System.currentTimeMillis(); + for (Backend backend : backends) { + prioBackends.put(backend.getId(), currentTimeMillis + timeoutS); + } + } + + public void removePrioBackends(List backends) { + for (Backend backend : backends) { + prioBackends.remove(backend.getId()); + } + } + + public boolean hasPrioBackends() { + return !prioBackends.isEmpty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 6610b484ae..60e8080fa8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -40,6 +40,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.CloneTask; +import org.apache.doris.task.StorageMediaMigrationTask; import org.apache.doris.thrift.TBackend; import org.apache.doris.thrift.TFinishTaskRequest; import org.apache.doris.thrift.TStatusCode; @@ -108,6 +109,10 @@ public class TabletSchedCtx implements Comparable { BALANCE, REPAIR } + public enum BalanceType { + BE_BALANCE, DISK_BALANCE + } + public enum Priority { LOW, NORMAL, @@ -141,6 +146,7 @@ public class TabletSchedCtx implements Comparable { } private Type type; + private BalanceType balanceType; /* * origPriority is the origin priority being set when this tablet being added to scheduler. @@ -193,11 +199,16 @@ public class TabletSchedCtx implements Comparable { private Replica srcReplica = null; private long srcPathHash = -1; + // for disk balance to keep src path, and avoid take slot on selectAlternativeTabletsForCluster + private Replica tempSrcReplica = null; private long destBackendId = -1; private long destPathHash = -1; + // for disk balance to set migration task's datadir + private String destPath = null; private String errMsg = null; private CloneTask cloneTask = null; + private StorageMediaMigrationTask storageMediaMigrationTask = null; // statistics gathered from clone task report // the total size of clone files and the total cost time in ms. @@ -227,6 +238,7 @@ public class TabletSchedCtx implements Comparable { this.infoService = Catalog.getCurrentSystemInfo(); this.state = State.PENDING; this.replicaAlloc = replicaAlloc; + this.balanceType = BalanceType.BE_BALANCE; } public ReplicaAllocation getReplicaAlloc() { @@ -249,6 +261,14 @@ public class TabletSchedCtx implements Comparable { return type; } + public void setBalanceType(BalanceType type) { + this.balanceType = type; + } + + public BalanceType getBalanceType() { + return balanceType; + } + public Priority getOrigPriority() { return origPriority; } @@ -380,6 +400,11 @@ public class TabletSchedCtx implements Comparable { this.destBackendId = destBeId; this.destPathHash = destPathHash; } + + public void setDest(Long destBeId, long destPathHash, String destPath) { + setDest(destBeId, destPathHash); + this.destPath = destPath; + } public void setErrMsg(String errMsg) { this.errMsg = errMsg; @@ -414,6 +439,24 @@ public class TabletSchedCtx implements Comparable { this.srcPathHash = srcReplica.getPathHash(); } + public void setTempSrc(Replica srcReplica) { + this.tempSrcReplica = srcReplica; + } + + public long getTempSrcBackendId() { + if (tempSrcReplica != null) { + return tempSrcReplica.getBackendId(); + } + return -1; + } + + public long getTempSrcPathHash() { + if (tempSrcReplica != null) { + return tempSrcReplica.getPathHash(); + } + return -1; + } + public long getDestBackendId() { return destBackendId; } @@ -422,6 +465,10 @@ public class TabletSchedCtx implements Comparable { return destPathHash; } + public String getDestPath() { + return destPath; + } + // database lock should be held. public long getTabletSize() { long max = Long.MIN_VALUE; @@ -687,6 +734,9 @@ public class TabletSchedCtx implements Comparable { } } + if (storageMediaMigrationTask != null) { + AgentTaskQueue.removeTask(storageMediaMigrationTask.getBackendId(), TTaskType.STORAGE_MEDIUM_MIGRATE, storageMediaMigrationTask.getSignature()); + } if (cloneTask != null) { AgentTaskQueue.removeTask(cloneTask.getBackendId(), TTaskType.CLONE, cloneTask.getSignature()); @@ -729,13 +779,28 @@ public class TabletSchedCtx implements Comparable { this.srcPathHash = -1; this.destBackendId = -1; this.destPathHash = -1; + this.destPath = null; this.cloneTask = null; + this.storageMediaMigrationTask = null; } } public void deleteReplica(Replica replica) { tablet.deleteReplicaByBackendId(replica.getBackendId()); } + + public StorageMediaMigrationTask createStorageMediaMigrationTask() throws SchedException { + storageMediaMigrationTask = new StorageMediaMigrationTask(getSrcBackendId(), getTabletId(), + getSchemaHash(), getStorageMedium()); + if (destPath == null || destPath.isEmpty()) { + throw new SchedException(Status.UNRECOVERABLE, + "backend " + srcReplica.getBackendId() + ", dest path is empty"); + } + storageMediaMigrationTask.setDataDir(destPath); + this.taskTimeoutMs = getApproximateTimeoutMs(); + this.state = State.RUNNING; + return storageMediaMigrationTask; + } // database lock should be held. public CloneTask createCloneReplicaAndTask() throws SchedException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 1f80cada9e..68ab63a851 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -17,6 +17,8 @@ package org.apache.doris.clone; +import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt; +import org.apache.doris.analysis.AdminRebalanceDiskStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.ColocateTableIndex.GroupId; @@ -51,7 +53,9 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.CloneTask; import org.apache.doris.task.DropReplicaTask; +import org.apache.doris.task.StorageMediaMigrationTask; import org.apache.doris.thrift.TFinishTaskRequest; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.TransactionState; @@ -101,13 +105,14 @@ public class TabletScheduler extends MasterDaemon { private static final long SCHEDULE_INTERVAL_MS = 1000; // 1s - public static final int BALANCE_SLOT_NUM_FOR_PATH = 2; + // 1 slot for reduce unnecessary balance task, provided a more accurate estimate of capacity + public static final int BALANCE_SLOT_NUM_FOR_PATH = 1; /* * Tablet is added to pendingTablets as well it's id in allTabletIds. * TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletIds when * handling a tablet. - * Tablet' id can only be removed after the clone task is done(timeout, cancelled or finished). + * Tablet' id can only be removed after the clone task or migration task is done(timeout, cancelled or finished). * So if a tablet's id is still in allTabletIds, TabletChecker can not add tablet to TabletScheduler. * * pendingTablets + runningTablets = allTabletIds @@ -135,6 +140,7 @@ public class TabletScheduler extends MasterDaemon { private ColocateTableIndex colocateTableIndex; private TabletSchedulerStat stat; private Rebalancer rebalancer; + private Rebalancer diskRebalancer; // result of adding a tablet to pendingTablets public enum AddResult { @@ -157,6 +163,8 @@ public class TabletScheduler extends MasterDaemon { } else { this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex); } + // if rebalancer can not get new task, then use diskRebalancer to get task + this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex); } public TabletSchedulerStat getStat() { @@ -244,6 +252,14 @@ public class TabletScheduler extends MasterDaemon { return allTabletIds.contains(tabletId); } + public synchronized void rebalanceDisk(AdminRebalanceDiskStmt stmt) { + diskRebalancer.addPrioBackends(stmt.getBackends(), stmt.getTimeoutS()); + } + + public synchronized void cancelRebalanceDisk(AdminCancelRebalanceDiskStmt stmt) { + diskRebalancer.removePrioBackends(stmt.getBackends()); + } + /** * Iterate current tablets, change their priority to VERY_HIGH if necessary. */ @@ -300,6 +316,7 @@ public class TabletScheduler extends MasterDaemon { updateClusterLoadStatistic(); rebalancer.updateLoadStatistic(statisticMap); + diskRebalancer.updateLoadStatistic(statisticMap); adjustPriorities(); @@ -463,7 +480,6 @@ public class TabletScheduler extends MasterDaemon { * Try to schedule a single tablet. */ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException { - LOG.debug("schedule tablet: {}, type: {}, status: {}", tabletCtx.getTabletId(), tabletCtx.getType(), tabletCtx.getTabletStatus()); long currentTime = System.currentTimeMillis(); tabletCtx.setLastSchedTime(currentTime); tabletCtx.setLastVisitedTime(currentTime); @@ -561,6 +577,11 @@ public class TabletScheduler extends MasterDaemon { throw new SchedException(Status.UNRECOVERABLE, "tablet is unhealthy when doing balance"); } + // for disk balance more accutely, we only schedule tablet when has lastly stat info about disk + if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE && + tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.DISK_BALANCE) { + checkDiskBalanceLastSuccTime(tabletCtx.getTempSrcBackendId(), tabletCtx.getTempSrcPathHash()); + } // we do not concern priority here. // once we take the tablet out of priority queue, priority is meaningless. tabletCtx.setTablet(tablet); @@ -574,6 +595,25 @@ public class TabletScheduler extends MasterDaemon { } } + private void checkDiskBalanceLastSuccTime(long beId, long pathHash) throws SchedException { + PathSlot pathSlot = backendsWorkingSlots.get(beId); + if (pathSlot == null) { + throw new SchedException(Status.UNRECOVERABLE, "path slot does not exist"); + } + long succTime = pathSlot.getDiskBalanceLastSuccTime(pathHash); + if (succTime > lastStatUpdateTime) { + throw new SchedException(Status.UNRECOVERABLE, "stat info is outdated"); + } + } + + public void updateDiskBalanceLastSuccTime(long beId, long pathHash) { + PathSlot pathSlot = backendsWorkingSlots.get(beId); + if (pathSlot == null) { + return; + } + pathSlot.updateDiskBalanceLastSuccTime(pathHash); + } + private void handleTabletByTypeAndStatus(TabletStatus status, TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException { if (tabletCtx.getType() == Type.REPAIR) { @@ -1189,6 +1229,21 @@ public class TabletScheduler extends MasterDaemon { for (TabletSchedCtx tabletCtx : alternativeTablets) { addTablet(tabletCtx, false); } + if (Config.disable_disk_balance) { + LOG.info("disk balance is disabled. skip selecting tablets for disk balance"); + return; + } + List diskBalanceTablets = Lists.newArrayList(); + // if default rebalancer can not get new task or user given prio BEs, then use disk rebalancer to get task + if (diskRebalancer.hasPrioBackends() || alternativeTablets.isEmpty()) { + diskBalanceTablets = diskRebalancer.selectAlternativeTablets(); + } + for (TabletSchedCtx tabletCtx : diskBalanceTablets) { + // add if task from prio backend or cluster is balanced + if (alternativeTablets.isEmpty() || tabletCtx.getOrigPriority() == TabletSchedCtx.Priority.NORMAL) { + addTablet(tabletCtx, false); + } + } } /** @@ -1196,7 +1251,18 @@ public class TabletScheduler extends MasterDaemon { */ private void doBalance(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException { stat.counterBalanceSchedule.incrementAndGet(); - rebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots, batchTask); + AgentTask task = null; + if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.DISK_BALANCE) { + task = diskRebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots); + checkDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash()); + checkDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash()); + } else if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.BE_BALANCE) { + task = rebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots); + } else { + throw new SchedException(Status.UNRECOVERABLE, + "unknown balance type: " + tabletCtx.getBalanceType().toString()); + } + batchTask.addTask(task); } // choose a path on a backend which is fit for the tablet @@ -1347,7 +1413,7 @@ public class TabletScheduler extends MasterDaemon { // get next batch of tablets from queue. private synchronized List getNextTabletCtxBatch() { List list = Lists.newArrayList(); - int count = Math.max(MIN_BATCH_NUM, getCurrentAvailableSlotNum()); + int count = Math.min(MIN_BATCH_NUM, getCurrentAvailableSlotNum()); while (count > 0) { TabletSchedCtx tablet = pendingTablets.poll(); if (tablet == null) { @@ -1368,6 +1434,29 @@ public class TabletScheduler extends MasterDaemon { return total; } + public boolean finishStorageMediaMigrationTask(StorageMediaMigrationTask migrationTask, + TFinishTaskRequest request) { + long tabletId = migrationTask.getTabletId(); + TabletSchedCtx tabletCtx = takeRunningTablets(tabletId); + if (tabletCtx == null) { + // tablet does not exist, the task may be created by ReportHandler.tabletReport(ssd => hdd) + LOG.warn("tablet info does not exist: {}", tabletId); + return true; + } + if (tabletCtx.getBalanceType() != TabletSchedCtx.BalanceType.DISK_BALANCE) { + // this should not happen + LOG.warn("task type is not as excepted. tablet {}", tabletId); + return true; + } + if (request.getTaskStatus().getStatusCode() == TStatusCode.OK) { + // if we have a success task, then stat must be refreshed before schedule a new task + updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash()); + updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash()); + } + // we need this function to free slot for this migration task + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, "finished"); + return true; + } /** * return true if we want to remove the clone task from AgentTaskQueue */ @@ -1379,6 +1468,11 @@ public class TabletScheduler extends MasterDaemon { // tablet does not exist, no need to keep task. return true; } + if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.DISK_BALANCE) { + // this should not happen + LOG.warn("task type is not as excepted. tablet {}", tabletId); + return true; + } Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING, tabletCtx.getState()); try { @@ -1706,6 +1800,22 @@ public class TabletScheduler extends MasterDaemon { slot.balanceSlot++; slot.rectify(); } + + public synchronized void updateDiskBalanceLastSuccTime(long pathHash) { + Slot slot = pathSlots.get(pathHash); + if (slot == null) { + return; + } + slot.diskBalanceLastSuccTime = System.currentTimeMillis(); + } + + public synchronized long getDiskBalanceLastSuccTime(long pathHash) { + Slot slot = pathSlots.get(pathHash); + if (slot == null) { + return 0L; + } + return slot.diskBalanceLastSuccTime; + } } public List> getSlotsInfo() { @@ -1726,6 +1836,9 @@ public class TabletScheduler extends MasterDaemon { public long totalCopySize = 0; public long totalCopyTimeMs = 0; + // for disk balance + public long diskBalanceLastSuccTime = 0; + public Slot(int total) { this.total = total; this.available = total; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index a9a8b9e04f..1551676910 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1133,6 +1133,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static boolean disable_balance = false; + /** + * if set to true, TabletScheduler will not do disk balance. + */ + @ConfField(mutable = true, masterOnly = true) + public static boolean disable_disk_balance = false; + // if the number of scheduled tablets in TabletScheduler exceed max_scheduling_tablets // skip checking. @ConfField(mutable = true, masterOnly = true) diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 5230f9f40a..f0ae6a3ee2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -47,6 +47,7 @@ import org.apache.doris.task.DownloadTask; import org.apache.doris.task.PublishVersionTask; import org.apache.doris.task.PushTask; import org.apache.doris.task.SnapshotTask; +import org.apache.doris.task.StorageMediaMigrationTask; import org.apache.doris.task.UpdateTabletMetaInfoTask; import org.apache.doris.task.UploadTask; import org.apache.doris.thrift.TBackend; @@ -115,8 +116,8 @@ public class MasterImpl { AgentTask task = AgentTaskQueue.getTask(backendId, taskType, signature); if (task == null) { - if (taskType != TTaskType.DROP && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE - && taskType != TTaskType.RELEASE_SNAPSHOT && taskType != TTaskType.CLEAR_TRANSACTION_TASK) { + if (taskType != TTaskType.DROP && taskType != TTaskType.RELEASE_SNAPSHOT + && taskType != TTaskType.CLEAR_TRANSACTION_TASK) { String errMsg = "cannot find task. type: " + taskType + ", backendId: " + backendId + ", signature: " + signature; LOG.warn(errMsg); @@ -137,7 +138,8 @@ public class MasterImpl { if (taskType != TTaskType.MAKE_SNAPSHOT && taskType != TTaskType.UPLOAD && taskType != TTaskType.DOWNLOAD && taskType != TTaskType.MOVE && taskType != TTaskType.CLONE && taskType != TTaskType.PUBLISH_VERSION - && taskType != TTaskType.CREATE && taskType != TTaskType.UPDATE_TABLET_META_INFO) { + && taskType != TTaskType.CREATE && taskType != TTaskType.UPDATE_TABLET_META_INFO + && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE) { return result; } } @@ -175,6 +177,9 @@ public class MasterImpl { case CLONE: finishClone(task, request); break; + case STORAGE_MEDIUM_MIGRATE: + finishStorageMediumMigrate(task, request); + break; case CHECK_CONSISTENCY: finishConsistencyCheck(task, request); break; @@ -699,6 +704,12 @@ public class MasterImpl { AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.CLONE, task.getSignature()); } + private void finishStorageMediumMigrate(AgentTask task, TFinishTaskRequest request) { + StorageMediaMigrationTask migrationTask = (StorageMediaMigrationTask) task; + Catalog.getCurrentCatalog().getTabletScheduler().finishStorageMediaMigrationTask(migrationTask, request); + AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.STORAGE_MEDIUM_MIGRATE, task.getSignature()); + } + private void finishConsistencyCheck(AgentTask task, TFinishTaskRequest request) { CheckConsistencyTask checkConsistencyTask = (CheckConsistencyTask) task; diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index ea4abd9c09..e1f1051315 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -360,10 +360,12 @@ public class ReportHandler extends Daemon { // 1. CREATE // 2. SYNC DELETE // 3. CHECK_CONSISTENCY + // 4. STORAGE_MDEIUM_MIGRATE if (task.getTaskType() == TTaskType.CREATE || (task.getTaskType() == TTaskType.PUSH && ((PushTask) task).getPushType() == TPushType.DELETE && ((PushTask) task).isSyncDelete()) - || task.getTaskType() == TTaskType.CHECK_CONSISTENCY) { + || task.getTaskType() == TTaskType.CHECK_CONSISTENCY + || task.getTaskType() == TTaskType.STORAGE_MEDIUM_MIGRATE) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index c66349a2ce..c35a27b8f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -21,6 +21,8 @@ import org.apache.doris.analysis.AdminCancelRepairTableStmt; import org.apache.doris.analysis.AdminCheckTabletsStmt; import org.apache.doris.analysis.AdminCleanTrashStmt; import org.apache.doris.analysis.AdminCompactTableStmt; +import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt; +import org.apache.doris.analysis.AdminRebalanceDiskStmt; import org.apache.doris.analysis.AdminRepairTableStmt; import org.apache.doris.analysis.AdminSetConfigStmt; import org.apache.doris.analysis.AdminSetReplicaStatusStmt; @@ -281,6 +283,10 @@ public class DdlExecutor { catalog.getSyncJobManager().stopSyncJob((StopSyncJobStmt) ddlStmt); } else if (ddlStmt instanceof AdminCleanTrashStmt) { catalog.cleanTrash((AdminCleanTrashStmt) ddlStmt); + } else if (ddlStmt instanceof AdminRebalanceDiskStmt) { + catalog.getTabletScheduler().rebalanceDisk((AdminRebalanceDiskStmt) ddlStmt); + } else if (ddlStmt instanceof AdminCancelRebalanceDiskStmt) { + catalog.getTabletScheduler().cancelRebalanceDisk((AdminCancelRebalanceDiskStmt) ddlStmt); } else if (ddlStmt instanceof CreateSqlBlockRuleStmt) { catalog.getSqlBlockRuleMgr().createSqlBlockRule((CreateSqlBlockRuleStmt) ddlStmt); } else if (ddlStmt instanceof AlterSqlBlockRuleStmt) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java index 72aef5a4d9..1ddbde77de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java @@ -21,10 +21,14 @@ import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageMediumMigrateReq; import org.apache.doris.thrift.TTaskType; +import com.google.common.base.Strings; + public class StorageMediaMigrationTask extends AgentTask { private int schemaHash; private TStorageMedium toStorageMedium; + // if dataDir is specified, the toStorageMedium is meaning less + private String dataDir; public StorageMediaMigrationTask(long backendId, long tabletId, int schemaHash, TStorageMedium toStorageMedium) { @@ -36,9 +40,20 @@ public class StorageMediaMigrationTask extends AgentTask { public TStorageMediumMigrateReq toThrift() { TStorageMediumMigrateReq request = new TStorageMediumMigrateReq(tabletId, schemaHash, toStorageMedium); + if (!Strings.isNullOrEmpty(dataDir)) { + request.setDataDir(dataDir); + } return request; } + public String getDataDir() { + return dataDir; + } + + public void setDataDir(String dataDir) { + this.dataDir = dataDir; + } + public int getSchemaHash() { return schemaHash; } diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index a2eeba952c..7ab3c7ed0b 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -171,6 +171,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("distinctpcsa", new Integer(SqlParserSymbols.KW_DISTINCTPCSA)); keywordMap.put("distributed", new Integer(SqlParserSymbols.KW_DISTRIBUTED)); keywordMap.put("distribution", new Integer(SqlParserSymbols.KW_DISTRIBUTION)); + keywordMap.put("disk", new Integer(SqlParserSymbols.KW_DISK)); keywordMap.put("dynamic", new Integer(SqlParserSymbols.KW_DYNAMIC)); keywordMap.put("div", new Integer(SqlParserSymbols.KW_DIV)); keywordMap.put("double", new Integer(SqlParserSymbols.KW_DOUBLE)); @@ -319,6 +320,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("range", new Integer(SqlParserSymbols.KW_RANGE)); keywordMap.put("read", new Integer(SqlParserSymbols.KW_READ)); keywordMap.put("real", new Integer(SqlParserSymbols.KW_DOUBLE)); + keywordMap.put("rebalance", new Integer(SqlParserSymbols.KW_REBALANCE)); keywordMap.put("recover", new Integer(SqlParserSymbols.KW_RECOVER)); keywordMap.put("refresh", new Integer(SqlParserSymbols.KW_REFRESH)); keywordMap.put("regexp", new Integer(SqlParserSymbols.KW_REGEXP)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java new file mode 100644 index 0000000000..8e92567adf --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.clone.RebalancerTestUtil; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.mysql.privilege.MockedAuth; +import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +import mockit.Mocked; + +public class AdminCancelRebalanceDiskStmtTest { + + private static Analyzer analyzer; + + @Mocked + private PaloAuth auth; + @Mocked + private ConnectContext ctx; + + @Before() + public void setUp() { + Config.disable_cluster_feature = false; + analyzer = AccessTestUtil.fetchAdminAnalyzer(true); + MockedAuth.mockedAuth(auth); + MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1"); + + List beIds = Lists.newArrayList(10001L, 10002L, 10003L, 10004L); + beIds.forEach(id -> Catalog.getCurrentSystemInfo().addBackend(RebalancerTestUtil.createBackend(id, 2048, 0))); + } + + @Test + public void testParticularBackends() throws AnalysisException { + List backends = Lists.newArrayList( + "192.168.0.10003:9051", "192.168.0.10004:9051", "192.168.0.10005:9051", "192.168.0.10006:9051"); + final AdminCancelRebalanceDiskStmt stmt = new AdminCancelRebalanceDiskStmt(backends); + stmt.analyze(analyzer); + Assert.assertEquals(2, stmt.getBackends().size()); + } + + @Test + public void testEmpty() throws AnalysisException { + List backends = Lists.newArrayList(); + final AdminCancelRebalanceDiskStmt stmt = new AdminCancelRebalanceDiskStmt(backends); + stmt.analyze(analyzer); + Assert.assertEquals(0, stmt.getBackends().size()); + } + + @Test + public void testNull() throws AnalysisException { + final AdminCancelRebalanceDiskStmt stmt = new AdminCancelRebalanceDiskStmt(null); + stmt.analyze(analyzer); + Assert.assertEquals(4, stmt.getBackends().size()); + } + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminRebalanceDiskStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminRebalanceDiskStmtTest.java new file mode 100644 index 0000000000..e83693ceda --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminRebalanceDiskStmtTest.java @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.clone.RebalancerTestUtil; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.mysql.privilege.MockedAuth; +import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +//import java.util.ArrayList; +import java.util.List; + +import mockit.Mocked; + +public class AdminRebalanceDiskStmtTest { + + private static Analyzer analyzer; + + @Mocked + private PaloAuth auth; + @Mocked + private ConnectContext ctx; + + @Before() + public void setUp() { + Config.disable_cluster_feature = false; + analyzer = AccessTestUtil.fetchAdminAnalyzer(true); + MockedAuth.mockedAuth(auth); + MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1"); + + List beIds = Lists.newArrayList(10001L, 10002L, 10003L, 10004L); + beIds.forEach(id -> Catalog.getCurrentSystemInfo().addBackend(RebalancerTestUtil.createBackend(id, 2048, 0))); + } + + @Test + public void testParticularBackends() throws AnalysisException { + List backends = Lists.newArrayList( + "192.168.0.10003:9051", "192.168.0.10004:9051", "192.168.0.10005:9051", "192.168.0.10006:9051"); + final AdminRebalanceDiskStmt stmt = new AdminRebalanceDiskStmt(backends); + stmt.analyze(analyzer); + Assert.assertEquals(2, stmt.getBackends().size()); + } + + @Test + public void testEmpty() throws AnalysisException { + List backends = Lists.newArrayList(); + final AdminRebalanceDiskStmt stmt = new AdminRebalanceDiskStmt(backends); + stmt.analyze(analyzer); + Assert.assertEquals(0, stmt.getBackends().size()); + } + + @Test + public void testNull() throws AnalysisException { + final AdminRebalanceDiskStmt stmt = new AdminRebalanceDiskStmt(null); + stmt.analyze(analyzer); + Assert.assertEquals(4, stmt.getBackends().size()); + } + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java new file mode 100644 index 0000000000..1d7cb00a39 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java @@ -0,0 +1,262 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DataProperty; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DiskInfo; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.ReplicaAllocation; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.clone.TabletScheduler.PathSlot; +import org.apache.doris.resource.Tag; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTask; +import org.apache.doris.task.StorageMediaMigrationTask; +import org.apache.doris.thrift.TStorageMedium; +import org.apache.doris.thrift.TStorageType; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Lists; +import com.google.common.collect.Table; +import com.google.common.collect.Maps; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.config.Configurator; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +//import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import mockit.Delegate; +import mockit.Expectations; +import mockit.Mocked; +//import static com.google.common.collect.MoreCollectors.onlyElement; + +public class DiskRebalanceTest { + private static final Logger LOG = LogManager.getLogger(DiskRebalanceTest.class); + + @Mocked + private Catalog catalog; + + private long id = 10086; + + private Database db; + private OlapTable olapTable; + + private final SystemInfoService systemInfoService = new SystemInfoService(); + private final TabletInvertedIndex invertedIndex = new TabletInvertedIndex(); + private Table statisticMap; + + @Before + public void setUp() throws Exception { + db = new Database(1, "test db"); + db.setClusterName(SystemInfoService.DEFAULT_CLUSTER); + new Expectations() { + { + catalog.getDbIds(); + minTimes = 0; + result = db.getId(); + + catalog.getDbNullable(anyLong); + minTimes = 0; + result = db; + + catalog.getDbOrException(anyLong, (Function) any); + minTimes = 0; + result = db; + + Catalog.getCurrentCatalogJournalVersion(); + minTimes = 0; + result = FeConstants.meta_version; + + catalog.getNextId(); + minTimes = 0; + result = new Delegate() { + long a() { + return id++; + } + }; + + Catalog.getCurrentSystemInfo(); + minTimes = 0; + result = systemInfoService; + + Catalog.getCurrentInvertedIndex(); + minTimes = 0; + result = invertedIndex; + + Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId(); + result = 111; + + Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(anyLong, anyLong, (List) any); + result = true; + } + }; + // Test mock validation + Assert.assertEquals(111, Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId()); + Assert.assertTrue(Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(1, 2, Lists.newArrayList(3L))); + } + + private void generateStatisticMap() { + ClusterLoadStatistic loadStatistic = new ClusterLoadStatistic(SystemInfoService.DEFAULT_CLUSTER, + Tag.DEFAULT_BACKEND_TAG, systemInfoService, invertedIndex); + loadStatistic.init(); + statisticMap = HashBasedTable.create(); + statisticMap.put(SystemInfoService.DEFAULT_CLUSTER, Tag.DEFAULT_BACKEND_TAG, loadStatistic); + } + + private void createPartitionsForTable(OlapTable olapTable, MaterializedIndex index, Long partitionCount) { + // partition id start from 31 + LongStream.range(0, partitionCount).forEach(idx -> { + long id = 31 + idx; + Partition partition = new Partition(id, "p" + idx, index, new HashDistributionInfo()); + olapTable.addPartition(partition); + olapTable.getPartitionInfo().addPartition(id, new DataProperty(TStorageMedium.HDD), + ReplicaAllocation.DEFAULT_ALLOCATION, false); + }); + } + + @Test + public void testDiskRebalancerWithSameUsageDisk() { + // init system + List beIds = Lists.newArrayList(10001L, 10002L, 10003L); + beIds.forEach(id -> systemInfoService.addBackend(RebalancerTestUtil.createBackend(id, 2048, Lists.newArrayList(512L,512L), 2))); + + olapTable = new OlapTable(2, "fake table", new ArrayList<>(), KeysType.DUP_KEYS, + new RangePartitionInfo(), new HashDistributionInfo()); + db.createTable(olapTable); + + // 1 table, 3 partitions p0,p1,p2 + MaterializedIndex materializedIndex = new MaterializedIndex(olapTable.getId(), null); + createPartitionsForTable(olapTable, materializedIndex, 3L); + olapTable.setIndexMeta(materializedIndex.getId(), "fake index", Lists.newArrayList(new Column()), + 0, 0, (short) 0, TStorageType.COLUMN, KeysType.DUP_KEYS); + + // Tablet distribution: we add them to olapTable & build invertedIndex manually + // all of tablets are in first path of it's backend + RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p0", TStorageMedium.HDD, + 50000, Lists.newArrayList(10001L, 10002L, 10003L)); + + RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p1", TStorageMedium.HDD, + 60000, Lists.newArrayList(10001L, 10002L, 10003L)); + + RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p2", TStorageMedium.HDD, + 70000, Lists.newArrayList(10001L, 10002L, 10003L)); + + // case start + Configurator.setLevel("org.apache.doris.clone.DiskRebalancer", Level.DEBUG); + + Rebalancer rebalancer = new DiskRebalancer(Catalog.getCurrentSystemInfo(), Catalog.getCurrentInvertedIndex()); + generateStatisticMap(); + rebalancer.updateLoadStatistic(statisticMap); + List alternativeTablets = rebalancer.selectAlternativeTablets(); + // check alternativeTablets; + Assert.assertTrue(alternativeTablets.isEmpty()); + } + + @Test + public void testDiskRebalancerWithDiffUsageDisk() { + // init system + systemInfoService.addBackend(RebalancerTestUtil.createBackend(10001L, 2048, Lists.newArrayList(1024L), 1)); + systemInfoService.addBackend(RebalancerTestUtil.createBackend(10002L, 2048, Lists.newArrayList(1024L, 512L), 2)); + systemInfoService.addBackend(RebalancerTestUtil.createBackend(10003L, 2048, Lists.newArrayList(1024L, 512L, 513L), 3)); + + olapTable = new OlapTable(2, "fake table", new ArrayList<>(), KeysType.DUP_KEYS, + new RangePartitionInfo(), new HashDistributionInfo()); + db.createTable(olapTable); + + // 1 table, 3 partitions p0,p1,p2 + MaterializedIndex materializedIndex = new MaterializedIndex(olapTable.getId(), null); + createPartitionsForTable(olapTable, materializedIndex, 3L); + olapTable.setIndexMeta(materializedIndex.getId(), "fake index", Lists.newArrayList(new Column()), + 0, 0, (short) 0, TStorageType.COLUMN, KeysType.DUP_KEYS); + + // Tablet distribution: we add them to olapTable & build invertedIndex manually + // all of tablets are in first path of it's backend + RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p0", TStorageMedium.HDD, + 50000, Lists.newArrayList(10001L, 10002L, 10003L), Lists.newArrayList(0L, 100L, 300L)); + + RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p1", TStorageMedium.HDD, + 60000, Lists.newArrayList(10001L, 10002L, 10003L), Lists.newArrayList(50L, 0L, 200L)); + + RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p2", TStorageMedium.HDD, + 70000, Lists.newArrayList(10001L, 10002L, 10003L), Lists.newArrayList(100L, 200L, 0L)); + + // case start + Configurator.setLevel("org.apache.doris.clone.DiskRebalancer", Level.DEBUG); + + Rebalancer rebalancer = new DiskRebalancer(Catalog.getCurrentSystemInfo(), Catalog.getCurrentInvertedIndex()); + generateStatisticMap(); + rebalancer.updateLoadStatistic(statisticMap); + List alternativeTablets = rebalancer.selectAlternativeTablets(); + // check alternativeTablets; + Assert.assertEquals(2, alternativeTablets.size()); + Map backendsWorkingSlots = Maps.newConcurrentMap(); + for (Backend be : Catalog.getCurrentSystemInfo().getClusterBackends(SystemInfoService.DEFAULT_CLUSTER)) { + if (!backendsWorkingSlots.containsKey(be.getId())) { + List pathHashes = be.getDisks().values().stream().map(DiskInfo::getPathHash).collect(Collectors.toList()); + PathSlot slot = new PathSlot(pathHashes, Config.schedule_slot_num_per_path); + backendsWorkingSlots.put(be.getId(), slot); + } + } + + for (TabletSchedCtx tabletCtx : alternativeTablets) { + LOG.info("try to schedule tablet {}", tabletCtx.getTabletId()); + try { + tabletCtx.setStorageMedium(TStorageMedium.HDD); + tabletCtx.setTablet(olapTable.getPartition(tabletCtx.getPartitionId()).getIndex(tabletCtx.getIndexId()).getTablet(tabletCtx.getTabletId())); + tabletCtx.setVersionInfo(1, 1); + tabletCtx.setSchemaHash(olapTable.getSchemaHashByIndexId(tabletCtx.getIndexId())); + tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // rebalance tablet should be healthy first + + AgentTask task = rebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots); + if (tabletCtx.getTabletSize() == 0) { + Assert.fail("no exception"); + } else { + Assert.assertTrue(task instanceof StorageMediaMigrationTask); + } + } catch (SchedException e) { + LOG.info("schedule tablet {} failed: {}", tabletCtx.getTabletId(), e.getMessage()); + } + } + } + +} + diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java index fc4dd7dd04..e94fa8655d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java @@ -36,6 +36,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.resource.Tag; +import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; @@ -182,6 +183,31 @@ public class RebalanceTest { }); } + @Test + public void testPrioBackends() { + Rebalancer rebalancer = new DiskRebalancer(Catalog.getCurrentSystemInfo(), Catalog.getCurrentInvertedIndex()); + // add + { + List backends = Lists.newArrayList(); + for (int i = 0; i < 3; i++) { + backends.add(RebalancerTestUtil.createBackend(10086 + i, 2048, 0)); + } + rebalancer.addPrioBackends(backends, 1000); + Assert.assertTrue(rebalancer.hasPrioBackends()); + } + + // remove + for (int i = 0; i < 3; i++) { + List backends = Lists.newArrayList(RebalancerTestUtil.createBackend(10086 + i, 2048, 0)); + rebalancer.removePrioBackends(backends); + if (i == 2) { + Assert.assertFalse(rebalancer.hasPrioBackends()); + } else { + Assert.assertTrue(rebalancer.hasPrioBackends()); + } + } + } + @Test public void testPartitionRebalancer() { Configurator.setLevel("org.apache.doris.clone.PartitionRebalancer", Level.DEBUG); @@ -218,7 +244,8 @@ public class RebalanceTest { tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // rebalance tablet should be healthy first // createCloneReplicaAndTask, create replica will change invertedIndex too. - rebalancer.createBalanceTask(tabletCtx, tabletScheduler.getBackendsWorkingSlots(), batchTask); + AgentTask task = rebalancer.createBalanceTask(tabletCtx, tabletScheduler.getBackendsWorkingSlots()); + batchTask.addTask(task); } catch (SchedException e) { LOG.warn("schedule tablet {} failed: {}", tabletCtx.getTabletId(), e.getMessage()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java index 4b3e4c693b..5e02a51ffc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java @@ -19,6 +19,7 @@ package org.apache.doris.clone; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.collect.Lists; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.MaterializedIndex; @@ -40,14 +41,20 @@ public class RebalancerTestUtil { // Add only one path, PathHash:id public static Backend createBackend(long id, long totalCap, long usedCap) { + return createBackend(id, totalCap, Lists.newArrayList(usedCap), 1); + } + // size of usedCaps should equal to diskNum + public static Backend createBackend(long id, long totalCap, List usedCaps, int diskNum) { // ip:port won't be checked Backend be = new Backend(id, "192.168.0." + id, 9051); Map disks = Maps.newHashMap(); - DiskInfo diskInfo = new DiskInfo("/path1"); - diskInfo.setPathHash(id); - diskInfo.setTotalCapacityB(totalCap); - diskInfo.setDataUsedCapacityB(usedCap); - disks.put(diskInfo.getRootPath(), diskInfo); + for (int i = 0; i < diskNum; i++) { + DiskInfo diskInfo = new DiskInfo("/path" + (i + 1)); + diskInfo.setPathHash(id + i); + diskInfo.setTotalCapacityB(totalCap); + diskInfo.setDataUsedCapacityB(usedCaps.get(i)); + disks.put(diskInfo.getRootPath(), diskInfo); + } be.setDisks(ImmutableMap.copyOf(disks)); be.setAlive(true); be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER); @@ -59,28 +66,37 @@ public class RebalancerTestUtil { // Only use the partition's baseIndex for simplicity public static void createTablet(TabletInvertedIndex invertedIndex, Database db, OlapTable olapTable, String partitionName, TStorageMedium medium, int tabletId, List beIds) { + createTablet(invertedIndex, db, olapTable, partitionName, medium, tabletId, beIds, null); + } + public static void createTablet(TabletInvertedIndex invertedIndex, Database db, OlapTable olapTable, String partitionName, TStorageMedium medium, + int tabletId, List beIds, List replicaSizes) { Partition partition = olapTable.getPartition(partitionName); MaterializedIndex baseIndex = partition.getBaseIndex(); int schemaHash = olapTable.getSchemaHashByIndexId(baseIndex.getId()); TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), partition.getId(), baseIndex.getId(), - schemaHash, medium); + schemaHash, medium); Tablet tablet = new Tablet(tabletId); // add tablet to olapTable olapTable.getPartition("p0").getBaseIndex().addTablet(tablet, tabletMeta); - createReplicasAndAddToIndex(invertedIndex, tabletMeta, tablet, beIds); + createReplicasAndAddToIndex(invertedIndex, tabletMeta, tablet, beIds, replicaSizes); } // Create replicas on backends which are numbered in beIds. // The tablet & replicas will be added to invertedIndex. - public static void createReplicasAndAddToIndex(TabletInvertedIndex invertedIndex, TabletMeta tabletMeta, Tablet tablet, List beIds) { + public static void createReplicasAndAddToIndex(TabletInvertedIndex invertedIndex, TabletMeta tabletMeta, + Tablet tablet, List beIds, List replicaSizes) { invertedIndex.addTablet(tablet.getId(), tabletMeta); IntStream.range(0, beIds.size()).forEach(i -> { Replica replica = new Replica(tablet.getId() + i, beIds.get(i), Replica.ReplicaState.NORMAL, 1, tabletMeta.getOldSchemaHash()); // We've set pathHash to beId for simplicity replica.setPathHash(beIds.get(i)); + if (replicaSizes != null) { + // for disk rebalancer, every beId corresponding to a replicaSize + replica.updateStat(replicaSizes.get(i), 0); + } // isRestore set true, to avoid modifying Catalog.getCurrentInvertedIndex tablet.addReplica(replica, true); invertedIndex.addReplica(tablet.getId(), replica); diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java index 375c86fc01..c1e7d42dbd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -89,6 +89,7 @@ public class AgentTaskTest { private AgentTask rollupTask; private AgentTask schemaChangeTask; private AgentTask cancelDeleteTask; + private AgentTask storageMediaMigrationTask; @Before public void setUp() throws AnalysisException { @@ -140,6 +141,11 @@ public class AgentTaskTest { new SchemaChangeTask(null, backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, columns, schemaHash2, schemaHash1, shortKeyNum, storageType, null, 0, TKeysType.AGG_KEYS); + + // storageMediaMigrationTask + storageMediaMigrationTask = + new StorageMediaMigrationTask(backendId1, tabletId1, schemaHash1, TStorageMedium.HDD); + ((StorageMediaMigrationTask) storageMediaMigrationTask).setDataDir("/home/a"); } @Test @@ -211,6 +217,15 @@ public class AgentTaskTest { Assert.assertEquals(TTaskType.SCHEMA_CHANGE, request6.getTaskType()); Assert.assertEquals(schemaChangeTask.getSignature(), request6.getSignature()); Assert.assertNotNull(request6.getAlterTabletReq()); + + // storageMediaMigrationTask + TAgentTaskRequest request7 = + (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, storageMediaMigrationTask); + Assert.assertEquals(TTaskType.STORAGE_MEDIUM_MIGRATE, request7.getTaskType()); + Assert.assertEquals(storageMediaMigrationTask.getSignature(), request7.getSignature()); + Assert.assertNotNull(request7.getStorageMediumMigrateReq()); + Assert.assertTrue(request7.getStorageMediumMigrateReq().isSetDataDir()); + Assert.assertEquals(request7.getStorageMediumMigrateReq().getDataDir(), "/home/a"); } @Test