diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 29219fc61d..a10d2c4a78 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -941,8 +941,8 @@ void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() { TStatusCode::type status_code = TStatusCode::OK; // check request and get info TabletSharedPtr tablet; - DataDir* dest_store; - if (_check_migrate_requset(storage_medium_migrate_req, tablet, &dest_store) != + DataDir* dest_store = nullptr; + if (_check_migrate_request(storage_medium_migrate_req, tablet, &dest_store) != OLAP_SUCCESS) { status_code = TStatusCode::RUNTIME_ERROR; } else { @@ -953,7 +953,7 @@ void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() { << ", signature: " << agent_task_req.signature; status_code = TStatusCode::RUNTIME_ERROR; } else { - LOG(INFO) << "storage media migrate success. status:" << res << "," + LOG(INFO) << "storage media migrate success. status:" << res << ", signature:" << agent_task_req.signature; } } @@ -974,7 +974,7 @@ void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() { } } -OLAPStatus TaskWorkerPool::_check_migrate_requset(const TStorageMediumMigrateReq& req, +OLAPStatus TaskWorkerPool::_check_migrate_request(const TStorageMediumMigrateReq& req, TabletSharedPtr& tablet, DataDir** dest_store) { int64_t tablet_id = req.tablet_id; int32_t schema_hash = req.schema_hash; @@ -1020,6 +1020,11 @@ OLAPStatus TaskWorkerPool::_check_migrate_requset(const TStorageMediumMigrateReq *dest_store = stores[0]; } + if (tablet->data_dir()->path() == (*dest_store)->path()) { + LOG(INFO) << "tablet is already on specified path. " + << "path=" << tablet->data_dir()->path(); + return OLAP_REQUEST_FAILED; + } // check disk capacity int64_t tablet_size = tablet->tablet_footprint(); diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 3cffbc7af0..4181d0ce50 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -208,7 +208,7 @@ private: Status _move_dir(const TTabletId tablet_id, const TSchemaHash schema_hash, const std::string& src, int64_t job_id, bool overwrite); - OLAPStatus _check_migrate_requset(const TStorageMediumMigrateReq& req, TabletSharedPtr& tablet, + OLAPStatus _check_migrate_request(const TStorageMediumMigrateReq& req, TabletSharedPtr& tablet, DataDir** dest_store); // random sleep 1~second seconds diff --git a/be/src/common/config.h b/be/src/common/config.h index 438a4a345f..fe92b6b3ea 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -332,6 +332,11 @@ CONF_Int32(min_tablet_migration_threads, "1"); CONF_Int32(max_tablet_migration_threads, "1"); CONF_mInt32(finished_migration_tasks_size, "10000"); +// If size less than this, the remaining rowsets will be force to complete +CONF_mInt32(migration_remaining_size_threshold_mb, "10"); +// If the task runs longer than this time, the task will be terminated, in seconds. +// tablet max size / migration min speed * factor = 10GB / 1MBps * 2 = 20480 seconds +CONF_mInt32(migration_task_timeout_secs, "20480"); // Port to start debug webserver on CONF_Int32(webserver_port, "8040"); diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index 486730affa..a3724a2415 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -17,6 +17,8 @@ #include "olap/task/engine_storage_migration_task.h" +#include + #include "olap/snapshot_manager.h" #include "olap/tablet_meta_manager.h" @@ -24,146 +26,298 @@ namespace doris { using std::stringstream; +const int CHECK_TXNS_MAX_WAIT_TIME_SECS = 60; + EngineStorageMigrationTask::EngineStorageMigrationTask(const TabletSharedPtr& tablet, DataDir* dest_store) - : _tablet(tablet), _dest_store(dest_store) {} + : _tablet(tablet), _dest_store(dest_store) { + _task_start_time = time(nullptr); + } OLAPStatus EngineStorageMigrationTask::execute() { return _migrate(); } -OLAPStatus EngineStorageMigrationTask::_migrate() { +OLAPStatus EngineStorageMigrationTask::_get_versions(int32_t start_version, int32_t* end_version, + std::vector *consistent_rowsets) { + ReadLock rdlock(_tablet->get_header_lock()); + const RowsetSharedPtr last_version = _tablet->rowset_with_max_version(); + if (last_version == nullptr) { + LOG(WARNING) << "failed to get rowset with max version, tablet=" + << _tablet->full_name(); + return OLAP_ERR_VERSION_NOT_EXIST; + } + + *end_version = last_version->end_version(); + if (*end_version < start_version) { + // rowsets are empty + VLOG_DEBUG << "consistent rowsets empty. tablet=" << _tablet->full_name() + << ", start_version=" << start_version << ", end_version=" << *end_version; + return OLAP_SUCCESS; + } + _tablet->capture_consistent_rowsets(Version(start_version, *end_version), consistent_rowsets); + if (consistent_rowsets->empty()) { + LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << _tablet->full_name() + << ", version=" << *end_version; + return OLAP_ERR_VERSION_NOT_EXIST; + } + return OLAP_SUCCESS; +} + +bool EngineStorageMigrationTask::_is_timeout() { + int64_t time_elapsed = time(nullptr) - _task_start_time; + if (time_elapsed > config::migration_task_timeout_secs) { + LOG(WARNING) << "migration failed due to timeout, time_eplapsed=" << time_elapsed + << ", tablet=" << _tablet->full_name(); + return true; + } + return false; +} + +OLAPStatus EngineStorageMigrationTask::_check_running_txns() { + // need hold migration lock outside + int64_t partition_id; + std::set transaction_ids; + // check if this tablet has related running txns. if yes, can not do migration. + StorageEngine::instance()->txn_manager()->get_tablet_related_txns( + _tablet->tablet_id(), _tablet->schema_hash(), _tablet->tablet_uid(), &partition_id, &transaction_ids); + if (transaction_ids.size() > 0) { + return OLAP_ERR_HEADER_HAS_PENDING_DATA; + } + return OLAP_SUCCESS; +} + +OLAPStatus EngineStorageMigrationTask::_check_running_txns_until_timeout(UniqueWriteLock* migration_wlock) { + // caller should not hold migration lock, and 'migration_wlock' should not be nullptr + // ownership of the migration_wlock is transferred to the caller if check succ + DCHECK_NE(migration_wlock, nullptr); + OLAPStatus res = OLAP_SUCCESS; + int try_times = 1; + do { + // to avoid invalid loops, the lock is guaranteed to be acquired here + UniqueWriteLock wlock(_tablet->get_migration_lock()); + res = _check_running_txns(); + if (res == OLAP_SUCCESS) { + // transfer the lock to the caller + *migration_wlock = std::move(wlock); + return res; + } + LOG(INFO) << "check running txns fail, try again until timeout." + << " tablet=" << _tablet->full_name() + << ", try times=" << try_times + << ", res=" << res; + // unlock and sleep for a while, try again + wlock.unlock(); + sleep(std::min(config::sleep_one_second * try_times, CHECK_TXNS_MAX_WAIT_TIME_SECS)); + ++try_times; + } while (!_is_timeout()); + return res; +} + +OLAPStatus EngineStorageMigrationTask::_gen_and_write_header_to_hdr_file( + uint64_t shard, + const std::string& full_path, + const std::vector& consistent_rowsets) { + // need hold migration lock and push lock outside + OLAPStatus res = OLAP_SUCCESS; int64_t tablet_id = _tablet->tablet_id(); int32_t schema_hash = _tablet->schema_hash(); + TabletMetaSharedPtr new_tablet_meta(new (std::nothrow) TabletMeta()); + { + ReadLock rdlock(_tablet->get_header_lock()); + _generate_new_header(shard, consistent_rowsets, new_tablet_meta); + } + std::string new_meta_file = full_path + "/" + std::to_string(tablet_id) + ".hdr"; + res = new_tablet_meta->save(new_meta_file); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "failed to save meta to path: " << new_meta_file; + return res; + } + + // reset tablet id and rowset id + res = TabletMeta::reset_tablet_uid(new_meta_file); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "errors while set tablet uid: '" << new_meta_file; + return res; + } + // it will change rowset id and its create time + // rowset create time is useful when load tablet from meta to check which tablet is the tablet to load + res = SnapshotManager::instance()->convert_rowset_ids(full_path, tablet_id, schema_hash); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "failed to convert rowset id when do storage migration" + << " path = " << full_path; + return res; + } + return res; +} + +OLAPStatus EngineStorageMigrationTask::_reload_tablet( + const std::string& full_path) { + // need hold migration lock and push lock outside + OLAPStatus res = OLAP_SUCCESS; + int64_t tablet_id = _tablet->tablet_id(); + int32_t schema_hash = _tablet->schema_hash(); + res = StorageEngine::instance()->tablet_manager()->load_tablet_from_dir( + _dest_store, tablet_id, schema_hash, full_path, false); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "failed to load tablet from new path. tablet_id=" << tablet_id + << " schema_hash=" << schema_hash << " path = " << full_path; + return res; + } + + // if old tablet finished schema change, then the schema change status of the new tablet is DONE + // else the schema change status of the new tablet is FAILED + TabletSharedPtr new_tablet = + StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash); + if (new_tablet == nullptr) { + LOG(WARNING) << "tablet not found. tablet_id=" << tablet_id + << " schema_hash=" << schema_hash; + return OLAP_ERR_TABLE_NOT_FOUND; + } + return res; +} + +// if the size less than threshold, return true +bool EngineStorageMigrationTask::_is_rowsets_size_less_than_threshold( + const std::vector& consistent_rowsets) { + size_t total_size = 0; + for (const auto& rs : consistent_rowsets) { + total_size += rs->index_disk_size() + rs->data_disk_size(); + } + if (total_size < config::migration_remaining_size_threshold_mb) { + return true; + } + return false; +} + +OLAPStatus EngineStorageMigrationTask::_migrate() { + int64_t tablet_id = _tablet->tablet_id(); LOG(INFO) << "begin to process tablet migrate. " << "tablet_id=" << tablet_id << ", dest_store=" << _dest_store->path(); DorisMetrics::instance()->storage_migrate_requests_total->increment(1); + int32_t start_version = 0; + int32_t end_version = 0; + std::vector consistent_rowsets; // try hold migration lock first OLAPStatus res = OLAP_SUCCESS; - UniqueWriteLock migration_wlock(_tablet->get_migration_lock(), std::try_to_lock); - if (!migration_wlock.owns_lock()) { - return OLAP_ERR_RWLOCK_ERROR; - } - - // check if this tablet has related running txns. if yes, can not do migration. - int64_t partition_id; - std::set transaction_ids; - StorageEngine::instance()->txn_manager()->get_tablet_related_txns( - tablet_id, schema_hash, _tablet->tablet_uid(), &partition_id, &transaction_ids); - if (transaction_ids.size() > 0) { - LOG(WARNING) << "could not migration because has unfinished txns, " - << " tablet=" << _tablet->full_name(); - return OLAP_ERR_HEADER_HAS_PENDING_DATA; - } - - std::lock_guard lock(_tablet->get_push_lock()); - // TODO(ygl): the tablet should not under schema change or rollup or load - do { - std::vector consistent_rowsets; - { - ReadLock rdlock(_tablet->get_header_lock()); - // get all versions to be migrate - const RowsetSharedPtr last_version = _tablet->rowset_with_max_version(); - if (last_version == nullptr) { - res = OLAP_ERR_VERSION_NOT_EXIST; - LOG(WARNING) << "failed to get rowset with max version, tablet=" - << _tablet->full_name(); - break; - } - int32_t end_version = last_version->end_version(); - res = _tablet->capture_consistent_rowsets(Version(0, end_version), &consistent_rowsets); - if (consistent_rowsets.empty()) { - res = OLAP_ERR_VERSION_NOT_EXIST; - LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << _tablet->full_name() - << ", version=" << end_version; - break; - } + uint64_t shard = 0; + string full_path; + { + UniqueWriteLock migration_wlock(_tablet->get_migration_lock(), std::try_to_lock); + if (!migration_wlock.owns_lock()) { + return OLAP_ERR_RWLOCK_ERROR; } - uint64_t shard = 0; + // check if this tablet has related running txns. if yes, can not do migration. + res = _check_running_txns(); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "could not migration because has unfinished txns, " + << " tablet=" << _tablet->full_name(); + return res; + } + + std::lock_guard lock(_tablet->get_push_lock()); + // get versions to be migrate + res = _get_versions(start_version, &end_version, &consistent_rowsets); + if (res != OLAP_SUCCESS) { + return res; + } + + // TODO(ygl): the tablet should not under schema change or rollup or load res = _dest_store->get_shard(&shard); if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to get shard from store: " << _dest_store->path(); - break; + return res; } FilePathDescStream root_path_desc_s; root_path_desc_s << _dest_store->path_desc() << DATA_PREFIX << "/" << shard; FilePathDesc full_path_desc = SnapshotManager::instance()->get_schema_hash_full_path( _tablet, root_path_desc_s.path_desc()); - string full_path = full_path_desc.filepath; + full_path = full_path_desc.filepath; // if dir already exist then return err, it should not happen. // should not remove the dir directly, for safety reason. if (FileUtils::check_exist(full_path)) { LOG(INFO) << "schema hash path already exist, skip this path. " - << "full_path=" << full_path; - res = OLAP_ERR_FILE_ALREADY_EXIST; - break; + << "full_path=" << full_path; + return OLAP_ERR_FILE_ALREADY_EXIST; } Status st = FileUtils::create_dir(full_path); if (!st.ok()) { res = OLAP_ERR_CANNOT_CREATE_DIR; LOG(WARNING) << "fail to create path. path=" << full_path - << ", error:" << st.to_string(); - break; + << ", error:" << st.to_string(); + return res; } + } + std::vector temp_consistent_rowsets(consistent_rowsets); + do { // migrate all index and data files but header file - res = _copy_index_and_data_files(full_path, consistent_rowsets); + res = _copy_index_and_data_files(full_path, temp_consistent_rowsets); if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to copy index and data files when migrate. res=" << res; break; } + UniqueWriteLock migration_wlock; + res = _check_running_txns_until_timeout(&migration_wlock); + if (res != OLAP_SUCCESS) { + break; + } + std::lock_guard lock(_tablet->get_push_lock()); + start_version = end_version; + // clear temp rowsets before get remaining versions + temp_consistent_rowsets.clear(); + // get remaining versions + res = _get_versions(end_version + 1, &end_version, &temp_consistent_rowsets); + if (res != OLAP_SUCCESS) { + break; + } + if (start_version < end_version) { + // we have remaining versions to be migrated + consistent_rowsets.insert(consistent_rowsets.end(), + temp_consistent_rowsets.begin(), temp_consistent_rowsets.end()); + LOG(INFO) << "we have remaining versions to be migrated. start_version=" + << start_version << " end_version=" << end_version; + // if the remaining size is less than config::migration_remaining_size_threshold_mb(default 10MB), + // we take the lock to complete it to avoid long-term competition with other tasks + if (_is_rowsets_size_less_than_threshold(temp_consistent_rowsets)) { + // force to copy the remaining data and index + res = _copy_index_and_data_files(full_path, temp_consistent_rowsets); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "fail to copy the remaining index and data files when migrate. res=" << res; + break; + } + } else { + if (_is_timeout()) { + res = OLAP_ERR_HEADER_HAS_PENDING_DATA; + break; + } + // there is too much remaining data here. + // in order not to affect other tasks, release the lock and then copy it + continue; + } + } // generate new tablet meta and write to hdr file - TabletMetaSharedPtr new_tablet_meta(new (std::nothrow) TabletMeta()); - { - ReadLock rdlock(_tablet->get_header_lock()); - _generate_new_header(shard, consistent_rowsets, new_tablet_meta); - } - std::string new_meta_file = full_path + "/" + std::to_string(tablet_id) + ".hdr"; - res = new_tablet_meta->save(new_meta_file); + res = _gen_and_write_header_to_hdr_file(shard, full_path, consistent_rowsets); + if (res != OLAP_SUCCESS) { + break; + } + res = _reload_tablet(full_path); if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to save meta to path: " << new_meta_file; break; } - // reset tablet id and rowset id - res = TabletMeta::reset_tablet_uid(new_meta_file); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "errors while set tablet uid: '" << new_meta_file; - break; - } - // it will change rowset id and its create time - // rowset create time is useful when load tablet from meta to check which tablet is the tablet to load - res = SnapshotManager::instance()->convert_rowset_ids(full_path, tablet_id, schema_hash); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to convert rowset id when do storage migration" - << " path = " << full_path; - break; - } + break; + } while (true); - res = StorageEngine::instance()->tablet_manager()->load_tablet_from_dir( - _dest_store, tablet_id, schema_hash, full_path, false); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to load tablet from new path. tablet_id=" << tablet_id - << " schema_hash=" << schema_hash << " path = " << full_path; - break; - } - - // if old tablet finished schema change, then the schema change status of the new tablet is DONE - // else the schema change status of the new tablet is FAILED - TabletSharedPtr new_tablet = - StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash); - if (new_tablet == nullptr) { - LOG(WARNING) << "tablet not found. tablet_id=" << tablet_id - << " schema_hash=" << schema_hash; - res = OLAP_ERR_TABLE_NOT_FOUND; - break; - } - } while (0); + if (res != OLAP_SUCCESS) { + // we should remove the dir directly for avoid disk full of junk data, and it's safe to remove + FileUtils::remove_all(full_path); + } return res; } diff --git a/be/src/olap/task/engine_storage_migration_task.h b/be/src/olap/task/engine_storage_migration_task.h index 3009f4c127..ffd42a459c 100644 --- a/be/src/olap/task/engine_storage_migration_task.h +++ b/be/src/olap/task/engine_storage_migration_task.h @@ -36,6 +36,24 @@ public: private: OLAPStatus _migrate(); + // check if task is timeout + bool _is_timeout(); + OLAPStatus _get_versions(int32_t start_version, + int32_t* end_version, + std::vector *consistent_rowsets); + OLAPStatus _check_running_txns(); + // caller should not hold migration lock, and 'migration_wlock' should not be nullptr + // ownership of the migration lock is transferred to the caller if check succ + OLAPStatus _check_running_txns_until_timeout(UniqueWriteLock* migration_wlock); + + // if the size less than threshold, return true + bool _is_rowsets_size_less_than_threshold(const std::vector& consistent_rowsets); + + OLAPStatus _gen_and_write_header_to_hdr_file( + uint64_t shard, + const std::string& full_path, + const std::vector& consistent_rowsets); + OLAPStatus _reload_tablet(const std::string& full_path); void _generate_new_header(uint64_t new_shard, const std::vector& consistent_rowsets, @@ -52,7 +70,7 @@ private: TabletSharedPtr _tablet; // destination data dir DataDir* _dest_store; - + int64_t _task_start_time; }; // EngineTask } // namespace doris diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index 5e4d15f777..81a551f921 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -26,6 +26,7 @@ ADD_BE_TEST(row_block_test) ADD_BE_TEST(row_block_v2_test) ADD_BE_TEST(bit_field_test) ADD_BE_TEST(byte_buffer_test) +ADD_BE_TEST(engine_storage_migration_task_test) ADD_BE_TEST(run_length_byte_test) ADD_BE_TEST(run_length_integer_test) ADD_BE_TEST(stream_index_test) diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp new file mode 100644 index 0000000000..68c88f853f --- /dev/null +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/task/engine_storage_migration_task.h" + +#include +#include + +#include + +#include "gen_cpp/Descriptors_types.h" +#include "gen_cpp/PaloInternalService_types.h" +#include "gen_cpp/Types_types.h" +#include "olap/delta_writer.h" +#include "olap/field.h" +#include "olap/options.h" +#include "olap/storage_engine.h" +#include "olap/tablet.h" +#include "olap/tablet_meta_manager.h" +#include "olap/utils.h" +#include "runtime/descriptor_helper.h" +#include "runtime/exec_env.h" +#include "runtime/mem_pool.h" +#include "runtime/mem_tracker.h" +#include "runtime/tuple.h" +#include "util/file_utils.h" +#include "util/logging.h" + +namespace doris { + +static const uint32_t MAX_PATH_LEN = 1024; + +StorageEngine* k_engine = nullptr; +std::string path1; +std::string path2; + +void set_up() { + char buffer[MAX_PATH_LEN]; + ASSERT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); + path1 = std::string(buffer) + "/data_test_1"; + path2 = std::string(buffer) + "/data_test_2"; + config::storage_root_path = path1 + ";" + path2; + FileUtils::remove_all(path1); + FileUtils::create_dir(path1); + + FileUtils::remove_all(path2); + FileUtils::create_dir(path2); + std::vector paths; + paths.emplace_back(path1, -1); + paths.emplace_back(path2, -1); + + doris::EngineOptions options; + options.store_paths = paths; + Status s = doris::StorageEngine::open(options, &k_engine); + ASSERT_TRUE(s.ok()) << s.to_string(); + + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_storage_engine(k_engine); + k_engine->start_bg_threads(); +} + +void tear_down() { + if (k_engine != nullptr) { + k_engine->stop(); + delete k_engine; + k_engine = nullptr; + } + ASSERT_EQ(system("rm -rf ./data_test_1"), 0); + ASSERT_EQ(system("rm -rf ./data_test_2"), 0); + FileUtils::remove_all(std::string(getenv("DORIS_HOME")) + UNUSED_PREFIX); +} + +void create_tablet_request_with_sequence_col(int64_t tablet_id, int32_t schema_hash, + TCreateTabletReq* request) { + request->tablet_id = tablet_id; + request->__set_version(1); + request->tablet_schema.schema_hash = schema_hash; + request->tablet_schema.short_key_column_count = 2; + request->tablet_schema.keys_type = TKeysType::UNIQUE_KEYS; + request->tablet_schema.storage_type = TStorageType::COLUMN; + request->tablet_schema.__set_sequence_col_idx(2); + + TColumn k1; + k1.column_name = "k1"; + k1.__set_is_key(true); + k1.column_type.type = TPrimitiveType::TINYINT; + request->tablet_schema.columns.push_back(k1); + + TColumn k2; + k2.column_name = "k2"; + k2.__set_is_key(true); + k2.column_type.type = TPrimitiveType::SMALLINT; + request->tablet_schema.columns.push_back(k2); + + TColumn sequence_col; + sequence_col.column_name = SEQUENCE_COL; + sequence_col.__set_is_key(false); + sequence_col.column_type.type = TPrimitiveType::INT; + sequence_col.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(sequence_col); + + TColumn v1; + v1.column_name = "v1"; + v1.__set_is_key(false); + v1.column_type.type = TPrimitiveType::DATETIME; + v1.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(v1); +} + +TDescriptorTable create_descriptor_tablet_with_sequence_col() { + TDescriptorTableBuilder dtb; + TTupleDescriptorBuilder tuple_builder; + + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_TINYINT).column_name("k1").column_pos(0).build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_SMALLINT).column_name("k2").column_pos(1).build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name(SEQUENCE_COL) + .column_pos(2) + .build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_DATETIME).column_name("v1").column_pos(3).build()); + tuple_builder.build(&dtb); + + return dtb.desc_tbl(); +} + +class TestEngineStorageMigrationTask : public ::testing::Test { +public: + TestEngineStorageMigrationTask() {} + ~TestEngineStorageMigrationTask() {} + + void SetUp() { + // Create local data dir for StorageEngine. + std::cout << "setup" << std::endl; + } + + void TearDown() { + // Remove all dir. + std::cout << "tear down" << std::endl; + //doris::tear_down(); + //ASSERT_EQ(OLAP_SUCCESS, remove_all_dir(config::storage_root_path)); + } +}; + +TEST_F(TestEngineStorageMigrationTask, write_and_migration) { + TCreateTabletReq request; + create_tablet_request_with_sequence_col(10005, 270068377, &request); + OLAPStatus res = k_engine->create_tablet(request); + ASSERT_EQ(OLAP_SUCCESS, res); + + TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col(); + ObjectPool obj_pool; + DescriptorTbl* desc_tbl = nullptr; + DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); + TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); + const std::vector& slots = tuple_desc->slots(); + + PUniqueId load_id; + load_id.set_hi(0); + load_id.set_lo(0); + WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003, + 30003, load_id, tuple_desc, &(tuple_desc->slots())}; + DeltaWriter* delta_writer = nullptr; + DeltaWriter::open(&write_req, &delta_writer); + ASSERT_NE(delta_writer, nullptr); + + MemTracker tracker; + MemPool pool(&tracker); + // Tuple 1 + { + Tuple* tuple = reinterpret_cast(pool.allocate(tuple_desc->byte_size())); + memset(tuple, 0, tuple_desc->byte_size()); + *(int8_t*)(tuple->get_slot(slots[0]->tuple_offset())) = 123; + *(int16_t*)(tuple->get_slot(slots[1]->tuple_offset())) = 456; + *(int32_t*)(tuple->get_slot(slots[2]->tuple_offset())) = 1; + ((DateTimeValue*)(tuple->get_slot(slots[3]->tuple_offset()))) + ->from_date_str("2020-07-16 19:39:43", 19); + + res = delta_writer->write(tuple); + ASSERT_EQ(OLAP_SUCCESS, res); + } + + res = delta_writer->close(); + ASSERT_EQ(OLAP_SUCCESS, res); + res = delta_writer->close_wait(nullptr, false); + ASSERT_EQ(OLAP_SUCCESS, res); + + // publish version success + TabletSharedPtr tablet = + k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash); + std::cout << "before publish, tablet row nums:" << tablet->num_rows() << std::endl; + OlapMeta* meta = tablet->data_dir()->get_meta(); + Version version; + version.first = tablet->rowset_with_max_version()->end_version() + 1; + version.second = tablet->rowset_with_max_version()->end_version() + 1; + std::cout << "start to add rowset version:" << version.first << "-" << version.second + << std::endl; + std::map tablet_related_rs; + StorageEngine::instance()->txn_manager()->get_txn_related_tablets( + write_req.txn_id, write_req.partition_id, &tablet_related_rs); + for (auto& tablet_rs : tablet_related_rs) { + std::cout << "start to publish txn" << std::endl; + RowsetSharedPtr rowset = tablet_rs.second; + res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, + write_req.tablet_id, write_req.schema_hash, + tablet_rs.first.tablet_uid, version); + ASSERT_EQ(OLAP_SUCCESS, res); + std::cout << "start to add inc rowset:" << rowset->rowset_id() + << ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first + << "-" << rowset->version().second << std::endl; + res = tablet->add_inc_rowset(rowset); + ASSERT_EQ(OLAP_SUCCESS, res); + } + ASSERT_EQ(1, tablet->num_rows()); + // we should sleep 1 second for the migrated tablet has different time with the current tablet + sleep(1); + + // test case 1 + // prepare + DataDir* dest_store = nullptr; + if (tablet->data_dir()->path() == path1) { + dest_store = StorageEngine::instance()->get_store(path2); + } else if (tablet->data_dir()->path() == path2) { + dest_store = StorageEngine::instance()->get_store(path1); + } + ASSERT_NE(dest_store, nullptr); + std::cout << "dest store:" << dest_store->path() << std::endl; + // migrating + EngineStorageMigrationTask engine_task(tablet, dest_store); + res = engine_task.execute(); + ASSERT_EQ(OLAP_SUCCESS, res); + // reget the tablet from manager after migration + auto tablet_id = 10005; + auto schema_hash = 270068377; + TabletSharedPtr tablet2 = k_engine->tablet_manager()->get_tablet(tablet_id, schema_hash); + // check path + ASSERT_EQ(tablet2->data_dir()->path(), dest_store->path()); + // check rows + ASSERT_EQ(1, tablet2->num_rows()); + // tablet2 should not equal to tablet + ASSERT_NE(tablet2, tablet); + + // test case 2 + // migrate tablet2 back to the tablet's path + // sleep 1 second for update time + sleep(1); + dest_store = StorageEngine::instance()->get_store(tablet->data_dir()->path()); + ASSERT_NE(dest_store, nullptr); + ASSERT_NE(dest_store->path(), tablet2->data_dir()->path()); + std::cout << "dest store:" << dest_store->path() << std::endl; + EngineStorageMigrationTask engine_task2(tablet2, dest_store); + res = engine_task2.execute(); + ASSERT_EQ(OLAP_SUCCESS, res); + TabletSharedPtr tablet3 = k_engine->tablet_manager()->get_tablet(tablet_id, schema_hash); + // check path + ASSERT_EQ(tablet3->data_dir()->path(), tablet->data_dir()->path()); + // check rows + ASSERT_EQ(1, tablet3->num_rows()); + // orgi_tablet should not equal to new_tablet and tablet + ASSERT_NE(tablet3, tablet2); + ASSERT_NE(tablet3, tablet); + // test case 2 end + + res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash); + ASSERT_EQ(OLAP_SUCCESS, res); + delete delta_writer; +} + +} // namespace doris + +int main(int argc, char** argv) { + std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; + if (!doris::config::init(conffile.c_str(), false)) { + fprintf(stderr, "error read config file. \n"); + return -1; + } + int ret = doris::OLAP_SUCCESS; + testing::InitGoogleTest(&argc, argv); + doris::CpuInfo::init(); + doris::set_up(); + ret = RUN_ALL_TESTS(); + doris::tear_down(); + google::protobuf::ShutdownProtobufLibrary(); + return ret; +} diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index 896805f6c2..b96c51e7ac 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -578,10 +578,12 @@ module.exports = [ directoryPath: "Administration/", initialOpenGroupIndex: -1, children: [ + "ADMIN CANCEL REBALANCE DISK", "ADMIN CANCEL REPAIR", "ADMIN CLEAN TRASH", "ADMIN CHECK TABLET", "ADMIN COMPACT", + "ADMIN REBALANCE DISK", "ADMIN REPAIR", "ADMIN SET CONFIG", "ADMIN SET REPLICA STATUS", diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index 30c5fc29c3..fc7c6bd9d4 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -591,10 +591,12 @@ module.exports = [ directoryPath: "Administration/", initialOpenGroupIndex: -1, children: [ + "ADMIN CANCEL REBALANCE DISK", "ADMIN CANCEL REPAIR", "ADMIN CLEAN TRASH", "ADMIN CHECK TABLET", "ADMIN COMPACT", + "ADMIN REBALANCE DISK", "ADMIN REPAIR", "ADMIN SET CONFIG", "ADMIN SET REPLICA STATUS", diff --git a/docs/en/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE DISK.md b/docs/en/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE DISK.md new file mode 100644 index 0000000000..475e266306 --- /dev/null +++ b/docs/en/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE DISK.md @@ -0,0 +1,51 @@ +--- +{ + "title": "ADMIN CANCEL REBALANCE DISK", + "language": "en" +} +--- + + + +# ADMIN CANCEL REBALANCE DISK +## Description + +This statement is used to cancel rebalancing disks of specified backends with high priority + +Grammar: + +ADMIN CANCEL REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1", "BackendHost2:BackendHeartBeatPort2", ...)]; + +Explain: + +1. This statement only indicates that the system no longer rebalance disks of specified backends with high priority. The system will still rebalance disks by default scheduling. + +## example + +1. Cancel High Priority Disk Rebalance of all of backends of the cluster + +ADMIN CANCEL REBALANCE DISK; + +2. Cancel High Priority Disk Rebalance of specified backends + +ADMIN CANCEL REBALANCE DISK ON ("192.168.1.1:1234", "192.168.1.2:1234"); + +## keyword +ADMIN,CANCEL,REBALANCE DISK diff --git a/docs/en/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md b/docs/en/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md new file mode 100644 index 0000000000..6e1c1aaa34 --- /dev/null +++ b/docs/en/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md @@ -0,0 +1,52 @@ +--- +{ + "title": "ADMIN REBALANCE DISK", + "language": "en" +} +--- + + + +# ADMIN REBALANCE DISK +## Description + +This statement is used to try to rebalance disks of the specified backends first, no matter if the cluster is balanced + +Grammar: + +ADMIN REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1", "BackendHost2:BackendHeartBeatPort2", ...)]; + +Explain: + +1. This statement only means that the system attempts to rebalance disks of specified backends with high priority, no matter if the cluster is balanced. +2. The default timeout is 24 hours. Timeout means that the system will no longer rebalance disks of specified backends with high priority. The command settings need to be reused. + +## example + +1. Attempt to rebalance disks of all backends + +ADMIN REBALANCE DISK; + +2. Attempt to rebalance disks oof the specified backends + +ADMIN REBALANCE DISK ON ("192.168.1.1:1234", "192.168.1.2:1234"); + +## keyword +ADMIN,REBALANCE,DISK diff --git a/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE DISK.md b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE DISK.md new file mode 100644 index 0000000000..e6978107c9 --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE DISK.md @@ -0,0 +1,52 @@ +--- +{ + "title": "ADMIN CANCEL REBALANCE DISK", + "language": "zh-CN" +} +--- + + + +# ADMIN CANCEL REBALANCE DISK +## description + + 该语句用于取消优先均衡BE的磁盘 + + 语法: + + ADMIN CANCEL REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1", "BackendHost2:BackendHeartBeatPort2", ...)]; + + 说明: + + 1. 该语句仅表示系统不再优先均衡指定BE的磁盘数据。系统仍会以默认调度方式均衡BE的磁盘数据。 + +## example + + 1. 取消集群所有BE的优先磁盘均衡 + + ADMIN CANCEL REBALANCE DISK; + + 2. 取消指定BE的优先磁盘均衡 + + ADMIN CANCEL REBALANCE DISK ON ("192.168.1.1:1234", "192.168.1.2:1234"); + +## keyword + ADMIN,CANCEL,REBALANCE,DISK + diff --git a/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md new file mode 100644 index 0000000000..0bb78f5379 --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md @@ -0,0 +1,54 @@ +--- +{ + "title": "ADMIN REBALANCE DISK", + "language": "zh-CN" +} +--- + + + +# ADMIN REBALANCE DISK +## description + + 该语句用于尝试优先均衡指定的BE磁盘数据 + + 语法: + + ADMIN REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1", "BackendHost2:BackendHeartBeatPort2", ...)]; + + 说明: + + 1. 该语句表示让系统尝试优先均衡指定BE的磁盘数据,不受限于集群是否均衡。 + 2. 默认的 timeout 是 24小时。超时意味着系统将不再优先均衡指定的BE磁盘数据。需要重新使用该命令设置。 + 3. 指定BE的磁盘数据均衡后,该BE的优先级将会失效。 + +## example + + 1. 尝试优先均衡集群内的所有BE + + ADMIN REBALANCE DISK; + + 2. 尝试优先均衡指定BE + + ADMIN REBALANCE DISK ON ("192.168.1.1:1234", "192.168.1.2:1234"); + +## keyword + ADMIN,REBALANCE,DISK + diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index da303888fb..2bf9da4afc 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -241,7 +241,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED, KW_COMPACT, KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_CONVERT, KW_COUNT, KW_CREATE, KW_CREATION, KW_CROSS, KW_CUBE, KW_CURRENT, KW_CURRENT_USER, KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DAY, KW_DECIMAL, KW_DECOMMISSION, KW_DEFAULT, KW_DESC, KW_DESCRIBE, - KW_DELETE, KW_UPDATE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE, + KW_DELETE, KW_UPDATE, KW_DISK, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE, KW_ELSE, KW_ENABLE, KW_ENCRYPTKEY, KW_ENCRYPTKEYS, KW_END, KW_ENGINE, KW_ENGINES, KW_ENTER, KW_ERRORS, KW_EVENTS, KW_EXCEPT, KW_EXCLUDE, KW_EXISTS, KW_EXPORT, KW_EXTENDED, KW_EXTERNAL, KW_EXTRACT, KW_FALSE, KW_FEATURE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM, KW_FIELDS, KW_FILE, KW_FILTER, KW_FIRST, KW_FLOAT, KW_FOR, KW_FORCE, KW_FORMAT, KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION, KW_FUNCTIONS, @@ -260,7 +260,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A KW_PLUGIN, KW_PLUGINS, KW_PROC, KW_PROCEDURE, KW_PROCESSLIST, KW_PROFILE, KW_PROPERTIES, KW_PROPERTY, KW_QUERY, KW_QUOTA, - KW_RANDOM, KW_RANGE, KW_READ, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RELEASE, KW_RENAME, + KW_RANDOM, KW_RANGE, KW_READ, KW_REBALANCE, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RELEASE, KW_RENAME, KW_REPAIR, KW_REPEATABLE, KW_REPOSITORY, KW_REPOSITORIES, KW_REPLACE, KW_REPLACE_IF_NOT_NULL, KW_REPLICA, KW_RESOURCE, KW_RESOURCES, KW_RESTORE, KW_RETURNS, KW_RESUME, KW_REVOKE, KW_RIGHT, KW_ROLE, KW_ROLES, KW_ROLLBACK, KW_ROLLUP, KW_ROUTINE, KW_ROW, KW_ROWS, KW_S3, KW_SCHEMA, KW_SCHEMAS, KW_SECOND, KW_SELECT, KW_SEMI, KW_SERIALIZABLE, KW_SESSION, KW_SET, KW_SETS, KW_SHOW, KW_SIGNED, @@ -5297,6 +5297,22 @@ admin_stmt ::= {: RESULT = new AdminCheckTabletsStmt(tabletIds, properties); :} + | KW_ADMIN KW_REBALANCE KW_DISK KW_ON LPAREN string_list:backends RPAREN + {: + RESULT = new AdminRebalanceDiskStmt(backends); + :} + | KW_ADMIN KW_REBALANCE KW_DISK + {: + RESULT = new AdminRebalanceDiskStmt(null); + :} + | KW_ADMIN KW_CANCEL KW_REBALANCE KW_DISK KW_ON LPAREN string_list:backends RPAREN + {: + RESULT = new AdminCancelRebalanceDiskStmt(backends); + :} + | KW_ADMIN KW_CANCEL KW_REBALANCE KW_DISK + {: + RESULT = new AdminCancelRebalanceDiskStmt(null); + :} | KW_ADMIN KW_CLEAN KW_TRASH KW_ON LPAREN string_list:backends RPAREN {: RESULT = new AdminCleanTrashStmt(backends); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java new file mode 100644 index 0000000000..3e9bab3f21 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.mysql.privilege.PrivPredicate; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AdminCancelRebalanceDiskStmt extends DdlStmt { + private List backends = Lists.newArrayList(); + + public AdminCancelRebalanceDiskStmt(List backends) { + ImmutableMap backendsInfo = Catalog.getCurrentSystemInfo().getIdToBackend(); + Map backendsID = new HashMap(); + for (Backend backend : backendsInfo.values()) { + backendsID.put(String.valueOf(backend.getHost()) + ":" + String.valueOf(backend.getHeartbeatPort()), backend.getId()); + } + if (backends == null) { + for (Backend backend : backendsInfo.values()) { + this.backends.add(backend); + } + } else { + for (String backend : backends) { + if (backendsID.get(backend) != null) { + this.backends.add(backendsInfo.get(backendsID.get(backend))); + backendsID.remove(backend); // avoid repetition + } + } + } + } + + public List getBackends() { + return backends; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.NO_FORWARD; + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java new file mode 100644 index 0000000000..c8f0aa6b76 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.mysql.privilege.PrivPredicate; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AdminRebalanceDiskStmt extends DdlStmt { + private List backends = Lists.newArrayList(); + private long timeoutS = 0; + + public AdminRebalanceDiskStmt(List backends) { + ImmutableMap backendsInfo = Catalog.getCurrentSystemInfo().getIdToBackend(); + Map backendsID = new HashMap(); + for (Backend backend : backendsInfo.values()) { + backendsID.put(String.valueOf(backend.getHost()) + ":" + String.valueOf(backend.getHeartbeatPort()), backend.getId()); + } + if (backends == null) { + for (Backend backend : backendsInfo.values()) { + this.backends.add(backend); + } + } else { + for (String backend : backends) { + if (backendsID.get(backend) != null) { + this.backends.add(backendsInfo.get(backendsID.get(backend))); + backendsID.remove(backend); // avoid repetition + } + } + } + timeoutS = 24 * 3600; // default 24 hours + } + + public List getBackends() { + return backends; + } + + public long getTimeoutS() { + return timeoutS; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.NO_FORWARD; + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java index 8c575d8cb9..f01c0175c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java @@ -246,8 +246,8 @@ public class BackendLoadStatistic { } } - LOG.debug("classify path by load. storage: {} avg used percent: {}. low/mid/high: {}/{}/{}", - avgUsedPercent, medium, lowCounter, midCounter, highCounter); + LOG.debug("classify path by load. be id: {} storage: {} avg used percent: {}. low/mid/high: {}/{}/{}", + beId, medium, avgUsedPercent, lowCounter, midCounter, highCounter); } public void calcScore(Map avgClusterUsedCapacityPercentMap, @@ -315,6 +315,60 @@ public class BackendLoadStatistic { return status; } + /* + * Check whether the backend can be more balance if we migrate a tablet with size 'tabletSize' from + * `srcPath` to 'destPath' + * 1. recalculate the load score of src and dest path after migrate the tablet. + * 2. if the summary of the diff between the new score and average score becomes smaller, we consider it + * as more balance. + */ + public boolean isMoreBalanced(long srcPath, long destPath, long tabletId, long tabletSize, + TStorageMedium medium) { + long totalCapacity = 0; + long totalUsedCapacity = 0; + RootPathLoadStatistic srcPathStat = null; + RootPathLoadStatistic destPathStat = null; + for (RootPathLoadStatistic pathStat : pathStatistics) { + if (pathStat.getStorageMedium() == medium) { + totalCapacity += pathStat.getCapacityB(); + totalUsedCapacity += pathStat.getUsedCapacityB(); + if (pathStat.getPathHash() == srcPath) { + srcPathStat = pathStat; + } else if (pathStat.getPathHash() == destPath) { + destPathStat = pathStat; + } + } + } + if (srcPathStat == null || destPathStat == null) { + LOG.info("migrate {}(size: {}) from {} to {} failed, medium: {}, src or dest path stat does not exist.", + tabletId, tabletSize, srcPath, destPath, medium); + return false; + } + double avgUsedPercent = totalCapacity == 0 ? 0.0 : totalUsedCapacity / (double) totalCapacity; + double currentSrcPathScore = srcPathStat.getCapacityB() == 0 + ? 0.0 : srcPathStat.getUsedCapacityB() / (double) srcPathStat.getCapacityB(); + double currentDestPathScore = destPathStat.getCapacityB() == 0 + ? 0.0 : destPathStat.getUsedCapacityB() / (double) destPathStat.getCapacityB(); + + double newSrcPathScore = srcPathStat.getCapacityB() == 0 + ? 0.0 : (srcPathStat.getUsedCapacityB() - tabletSize) / (double) srcPathStat.getCapacityB(); + double newDestPathScore = destPathStat.getCapacityB() == 0 + ? 0.0 : (destPathStat.getUsedCapacityB() + tabletSize) / (double) destPathStat.getCapacityB(); + + double currentDiff = Math.abs(currentSrcPathScore - avgUsedPercent) + + Math.abs(currentDestPathScore - avgUsedPercent); + double newDiff = Math.abs(newSrcPathScore - avgUsedPercent) + Math.abs(newDestPathScore - avgUsedPercent); + + LOG.debug("after migrate {}(size: {}) from {} to {}, medium: {}, the load score changed." + + " src: {} -> {}, dest: {}->{}, average score: {}. current diff: {}, new diff: {}," + + " more balanced: {}", + tabletId, tabletSize, srcPath, destPath, medium, currentSrcPathScore, newSrcPathScore, + currentDestPathScore, newDestPathScore, avgUsedPercent, currentDiff, newDiff, + (newDiff < currentDiff)); + + return newDiff < currentDiff; + } + public boolean hasAvailDisk() { for (RootPathLoadStatistic rootPathLoadStatistic : pathStatistics) { if (rootPathLoadStatistic.getDiskState() == DiskState.ONLINE) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java new file mode 100644 index 0000000000..f32e31a7ef --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java @@ -0,0 +1,334 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.clone.SchedException.Status; +import org.apache.doris.clone.TabletSchedCtx.Priority; +import org.apache.doris.clone.TabletSchedCtx.BalanceType; +import org.apache.doris.clone.TabletScheduler.PathSlot; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TStorageMedium; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/* + + * This DiskBalancer is different from other Balancers which takes care of cluster-wide data balancing. + * This DiskBalancer chooses a backend and moves tablet from one disk to another. + * DiskRebalancer strategy: + * 1. only works while the cluster is balanced(which means the cluster has no high and mid load backends) + * 1.1 if user has given prio backends, then select tablets from prio backends no matter cluster is balanced or not. + * 2. selecting alternative tablets from mid load backends, and return them to tablet scheduler. + * 3. given a tablet which has src path(disk), find a path(disk) to migration. + */ +public class DiskRebalancer extends Rebalancer { + private static final Logger LOG = LogManager.getLogger(DiskRebalancer.class); + + public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) { + super(infoService, invertedIndex); + } + + public List filterByPrioBackends(List bes) { + List stats = Lists.newArrayList(); + for (BackendLoadStatistic backend : bes) { + long backendId = backend.getBeId(); + Long timeoutS = prioBackends.getOrDefault(backendId, 0L); + if (timeoutS != 0) { + if (timeoutS > System.currentTimeMillis()) { + // remove backends from prio if timeout + prioBackends.remove(backendId); + continue; + } + stats.add(backend); + } + } + return stats; + } + + // true means BE has low and high paths for balance after reclassification + private boolean checkAndReclassifyPaths(Set pathLow, Set pathMid, Set pathHigh) { + if (pathLow.isEmpty() && pathHigh.isEmpty()) { + // balanced + return false; + } + if (pathLow.isEmpty()) { + // mid => low + pathLow.addAll(pathMid); + } else if (pathHigh.isEmpty()) { + // mid => high + pathHigh.addAll(pathMid); + } + if (pathLow.isEmpty() || pathHigh.isEmpty()) { + // check again + return false; + } + return true; + } + + /* + * Try to select alternative tablets to balance the disks. + * 1. Classify the backend into low, mid and high class by load score. + * 2. Try to select tablets from mid load backends. + * 1. Here we only select alternative tablets, without considering selected tablets' status, + * and whether it is benefit for balance (All these will be checked in tablet scheduler) + * 2. Only select tablets from 'mid' backends. + * 3. Only select tablets from 'high' paths. + * 3. Try to select tablets from prio backends. + * + * Here we only select tablets from mid load node, do not set its dest, all this will be set + * when this tablet is being scheduled in tablet scheduler. + * + * NOTICE that we may select any available tablets here, ignore their state. + * The state will be checked when being scheduled in tablet scheduler. + */ + @Override + protected List selectAlternativeTabletsForCluster( + ClusterLoadStatistic clusterStat, TStorageMedium medium) { + String clusterName = clusterStat.getClusterName(); + List alternativeTablets = Lists.newArrayList(); + + // get classification of backends + List lowBEs = Lists.newArrayList(); + List midBEs = Lists.newArrayList(); + List highBEs = Lists.newArrayList(); + clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium); + + if (!(lowBEs.isEmpty() && highBEs.isEmpty())) { + // the cluster is not balanced + if (prioBackends.isEmpty()) { + LOG.info("cluster is not balanced: {} with medium: {}. skip", clusterName, medium); + return alternativeTablets; + } else { + // prioBEs are not empty, we only schedule prioBEs' disk balance task + midBEs.addAll(lowBEs); + midBEs.addAll(highBEs); + midBEs = filterByPrioBackends(midBEs); + } + } + + // first we should check if mid backends is available. + // if all mid backends is not available, we should not start balance + if (midBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) { + LOG.info("all mid load backends is dead: {} with medium: {}. skip", + lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium); + return alternativeTablets; + } + + if (midBEs.stream().noneMatch(BackendLoadStatistic::hasAvailDisk)) { + LOG.info("all mid load backends {} have no available disk with medium: {}. skip", + lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium); + return alternativeTablets; + } + + Set unbalancedBEs = Sets.newHashSet(); + // choose tablets from backends randomly. + Collections.shuffle(midBEs); + for (int i = midBEs.size() - 1; i >= 0; i--) { + BackendLoadStatistic beStat = midBEs.get(i); + + // classify the paths. + Set pathLow = Sets.newHashSet(); + Set pathMid = Sets.newHashSet(); + Set pathHigh = Sets.newHashSet(); + // we only select tablets from available high load path + beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium); + // check if BE has low and high paths for balance after reclassification + if (!checkAndReclassifyPaths(pathLow, pathMid, pathHigh)) { + continue; + } + + // get all tablets on this backend, and shuffle them for random selection + List tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(beStat.getBeId(), medium); + Collections.shuffle(tabletIds); + + // for each path, we try to select at most BALANCE_SLOT_NUM_FOR_PATH tablets + Map remainingPaths = Maps.newHashMap(); + for (Long pathHash : pathHigh) { + remainingPaths.put(pathHash, TabletScheduler.BALANCE_SLOT_NUM_FOR_PATH); + } + + if (remainingPaths.isEmpty()) { + return alternativeTablets; + } + + // select tablet from shuffled tablets + for (Long tabletId : tabletIds) { + Replica replica = invertedIndex.getReplica(tabletId, beStat.getBeId()); + if (replica == null) { + continue; + } + // ignore empty replicas as they do not make disk more balance. (disk usage) + if (replica.getDataSize() == 0) { + continue; + } + + // check if replica's is on 'high' path. + // and only select it if the selected tablets num of this path + // does not exceed the limit (BALANCE_SLOT_NUM_FOR_PATH). + long replicaPathHash = replica.getPathHash(); + if (remainingPaths.containsKey(replicaPathHash)) { + TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId); + if (tabletMeta == null) { + continue; + } + + TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, clusterName, + tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(), + tabletMeta.getIndexId(), tabletId, null /* replica alloc is not used for balance*/, + System.currentTimeMillis()); + // we set temp src here to simplify completeSchedCtx method, and avoid take slot here + tabletCtx.setTempSrc(replica); + tabletCtx.setTag(clusterStat.getTag()); + if (prioBackends.containsKey(beStat.getBeId())) { + // priority of balance task of prio BE is NORMAL + tabletCtx.setOrigPriority(Priority.NORMAL); + } else { + // balance task's default priority is LOW + tabletCtx.setOrigPriority(Priority.LOW); + } + // we must set balanceType to DISK_BALANCE for create migration task + tabletCtx.setBalanceType(BalanceType.DISK_BALANCE); + + alternativeTablets.add(tabletCtx); + unbalancedBEs.add(beStat.getBeId()); + // update remaining paths + int remaining = remainingPaths.get(replicaPathHash) - 1; + if (remaining <= 0) { + remainingPaths.remove(replicaPathHash); + } else { + remainingPaths.put(replicaPathHash, remaining); + } + } + } + } // end for mid backends + + // remove balanced BEs from prio backends + prioBackends.keySet().removeIf(id -> !unbalancedBEs.contains(id)); + LOG.info("select alternative tablets for cluster: {}, medium: {}, num: {}, detail: {}", + clusterName, medium, alternativeTablets.size(), + alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray()); + return alternativeTablets; + } + + /* + * Create a StorageMediaMigrationTask of this selected tablet for balance. + * 1. Check if the cluster is balanced. if not, the balance will be cancelled. + * 2. Check if the src replica still on high load path. If not, the balance will be cancelled. + * 3. Select a low load path from this backend as destination. + */ + @Override + public void completeSchedCtx(TabletSchedCtx tabletCtx, Map backendsWorkingSlots) throws SchedException { + ClusterLoadStatistic clusterStat = statisticMap.get(tabletCtx.getCluster(), tabletCtx.getTag()); + if (clusterStat == null) { + throw new SchedException(Status.UNRECOVERABLE, "cluster does not exist"); + } + if (tabletCtx.getTempSrcBackendId() == -1 || tabletCtx.getTempSrcPathHash() == -1) { + throw new SchedException(Status.UNRECOVERABLE, + "src does not appear to be set correctly, something goes wrong"); + } + Replica replica = invertedIndex.getReplica(tabletCtx.getTabletId(), tabletCtx.getTempSrcBackendId()); + // check src replica still there + if (replica == null || replica.getPathHash() != tabletCtx.getTempSrcPathHash()) { + throw new SchedException(Status.UNRECOVERABLE, "src replica may be rebalanced"); + } + // ignore empty replicas as they do not make disk more balance + if (replica.getDataSize() == 0) { + throw new SchedException(Status.UNRECOVERABLE, "size of src replica is zero"); + } + // check src slot + PathSlot slot = backendsWorkingSlots.get(replica.getBackendId()); + if (slot == null) { + LOG.debug("BE does not have slot: {}", replica.getBackendId()); + throw new SchedException(Status.UNRECOVERABLE, "unable to take src slot"); + } + long pathHash = slot.takeBalanceSlot(replica.getPathHash()); + if (pathHash == -1) { + throw new SchedException(Status.UNRECOVERABLE, "unable to take src slot"); + } + // after take src slot, we can set src replica now + tabletCtx.setSrc(replica); + + BackendLoadStatistic beStat = clusterStat.getBackendLoadStatistic(replica.getBackendId()); + if (!beStat.isAvailable()) { + throw new SchedException(Status.UNRECOVERABLE, "the backend is not available"); + } + // classify the paths. + // If src path is 'high', then we can select path from 'low' and 'mid' + // If src path is 'mid', then we can only select path from 'low' + // If src path is 'low', then we have nothing to do + Set pathLow = Sets.newHashSet(); + Set pathMid = Sets.newHashSet(); + Set pathHigh = Sets.newHashSet(); + beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium()); + if (pathHigh.contains(replica.getPathHash())) { + pathLow.addAll(pathMid); + } else if (!pathMid.contains(replica.getPathHash())) { + throw new SchedException(Status.UNRECOVERABLE, "src path is low load"); + } + // check if this migration task can make the be's disks more balance. + List availPaths = Lists.newArrayList(); + BalanceStatus bs; + if ((bs = beStat.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), availPaths, + false /* not supplement */)) != BalanceStatus.OK) { + LOG.debug("tablet not fit in BE {}, reason: {}", beStat.getBeId(), bs.getErrMsgs()); + throw new SchedException(Status.UNRECOVERABLE, "tablet not fit in BE"); + } + // Select a low load path as destination. + boolean setDest = false; + for (RootPathLoadStatistic stat : availPaths) { + // check if avail path is src path + if (stat.getPathHash() == replica.getPathHash()) { + continue; + } + // check if avail path is low path + if (!pathLow.contains(stat.getPathHash())) { + LOG.debug("the path :{} is not low load", stat.getPathHash()); + continue; + } + if (!beStat.isMoreBalanced(tabletCtx.getSrcPathHash(), stat.getPathHash(), + tabletCtx.getTabletId(), tabletCtx.getTabletSize(), tabletCtx.getStorageMedium())) { + LOG.debug("the path :{} can not make more balance", stat.getPathHash()); + continue; + } + long destPathHash = slot.takeBalanceSlot(stat.getPathHash()); + if (destPathHash == -1) { + throw new SchedException(Status.UNRECOVERABLE, "unable to take dest slot"); + } + tabletCtx.setDest(beStat.getBeId(), destPathHash, stat.getPath()); + setDest = true; + break; + } + + if (!setDest) { + throw new SchedException(Status.UNRECOVERABLE, "unable to find low load path"); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java index 1fca40652c..a7177c2f54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java @@ -20,11 +20,13 @@ package org.apache.doris.clone; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.resource.Tag; +import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; -import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTask; import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Maps; import com.google.common.collect.Lists; import com.google.common.collect.Table; @@ -50,6 +52,8 @@ public abstract class Rebalancer { protected Table statisticMap = HashBasedTable.create(); protected TabletInvertedIndex invertedIndex; protected SystemInfoService infoService; + // be id -> end time of prio + protected Map prioBackends = Maps.newConcurrentMap(); public Rebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) { this.infoService = infoService; @@ -71,10 +75,14 @@ public abstract class Rebalancer { protected abstract List selectAlternativeTabletsForCluster( ClusterLoadStatistic clusterStat, TStorageMedium medium); - public void createBalanceTask(TabletSchedCtx tabletCtx, Map backendsWorkingSlots, - AgentBatchTask batchTask) throws SchedException { + public AgentTask createBalanceTask(TabletSchedCtx tabletCtx, Map backendsWorkingSlots) + throws SchedException { completeSchedCtx(tabletCtx, backendsWorkingSlots); - batchTask.addTask(tabletCtx.createCloneReplicaAndTask()); + if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.BE_BALANCE) { + return tabletCtx.createCloneReplicaAndTask(); + } else { + return tabletCtx.createStorageMediaMigrationTask(); + } } // Before createCloneReplicaAndTask, we need to complete the TabletSchedCtx. @@ -93,4 +101,21 @@ public abstract class Rebalancer { public void updateLoadStatistic(Table statisticMap) { this.statisticMap = statisticMap; } + + public void addPrioBackends(List backends, long timeoutS) { + long currentTimeMillis = System.currentTimeMillis(); + for (Backend backend : backends) { + prioBackends.put(backend.getId(), currentTimeMillis + timeoutS); + } + } + + public void removePrioBackends(List backends) { + for (Backend backend : backends) { + prioBackends.remove(backend.getId()); + } + } + + public boolean hasPrioBackends() { + return !prioBackends.isEmpty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 6610b484ae..60e8080fa8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -40,6 +40,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.CloneTask; +import org.apache.doris.task.StorageMediaMigrationTask; import org.apache.doris.thrift.TBackend; import org.apache.doris.thrift.TFinishTaskRequest; import org.apache.doris.thrift.TStatusCode; @@ -108,6 +109,10 @@ public class TabletSchedCtx implements Comparable { BALANCE, REPAIR } + public enum BalanceType { + BE_BALANCE, DISK_BALANCE + } + public enum Priority { LOW, NORMAL, @@ -141,6 +146,7 @@ public class TabletSchedCtx implements Comparable { } private Type type; + private BalanceType balanceType; /* * origPriority is the origin priority being set when this tablet being added to scheduler. @@ -193,11 +199,16 @@ public class TabletSchedCtx implements Comparable { private Replica srcReplica = null; private long srcPathHash = -1; + // for disk balance to keep src path, and avoid take slot on selectAlternativeTabletsForCluster + private Replica tempSrcReplica = null; private long destBackendId = -1; private long destPathHash = -1; + // for disk balance to set migration task's datadir + private String destPath = null; private String errMsg = null; private CloneTask cloneTask = null; + private StorageMediaMigrationTask storageMediaMigrationTask = null; // statistics gathered from clone task report // the total size of clone files and the total cost time in ms. @@ -227,6 +238,7 @@ public class TabletSchedCtx implements Comparable { this.infoService = Catalog.getCurrentSystemInfo(); this.state = State.PENDING; this.replicaAlloc = replicaAlloc; + this.balanceType = BalanceType.BE_BALANCE; } public ReplicaAllocation getReplicaAlloc() { @@ -249,6 +261,14 @@ public class TabletSchedCtx implements Comparable { return type; } + public void setBalanceType(BalanceType type) { + this.balanceType = type; + } + + public BalanceType getBalanceType() { + return balanceType; + } + public Priority getOrigPriority() { return origPriority; } @@ -380,6 +400,11 @@ public class TabletSchedCtx implements Comparable { this.destBackendId = destBeId; this.destPathHash = destPathHash; } + + public void setDest(Long destBeId, long destPathHash, String destPath) { + setDest(destBeId, destPathHash); + this.destPath = destPath; + } public void setErrMsg(String errMsg) { this.errMsg = errMsg; @@ -414,6 +439,24 @@ public class TabletSchedCtx implements Comparable { this.srcPathHash = srcReplica.getPathHash(); } + public void setTempSrc(Replica srcReplica) { + this.tempSrcReplica = srcReplica; + } + + public long getTempSrcBackendId() { + if (tempSrcReplica != null) { + return tempSrcReplica.getBackendId(); + } + return -1; + } + + public long getTempSrcPathHash() { + if (tempSrcReplica != null) { + return tempSrcReplica.getPathHash(); + } + return -1; + } + public long getDestBackendId() { return destBackendId; } @@ -422,6 +465,10 @@ public class TabletSchedCtx implements Comparable { return destPathHash; } + public String getDestPath() { + return destPath; + } + // database lock should be held. public long getTabletSize() { long max = Long.MIN_VALUE; @@ -687,6 +734,9 @@ public class TabletSchedCtx implements Comparable { } } + if (storageMediaMigrationTask != null) { + AgentTaskQueue.removeTask(storageMediaMigrationTask.getBackendId(), TTaskType.STORAGE_MEDIUM_MIGRATE, storageMediaMigrationTask.getSignature()); + } if (cloneTask != null) { AgentTaskQueue.removeTask(cloneTask.getBackendId(), TTaskType.CLONE, cloneTask.getSignature()); @@ -729,13 +779,28 @@ public class TabletSchedCtx implements Comparable { this.srcPathHash = -1; this.destBackendId = -1; this.destPathHash = -1; + this.destPath = null; this.cloneTask = null; + this.storageMediaMigrationTask = null; } } public void deleteReplica(Replica replica) { tablet.deleteReplicaByBackendId(replica.getBackendId()); } + + public StorageMediaMigrationTask createStorageMediaMigrationTask() throws SchedException { + storageMediaMigrationTask = new StorageMediaMigrationTask(getSrcBackendId(), getTabletId(), + getSchemaHash(), getStorageMedium()); + if (destPath == null || destPath.isEmpty()) { + throw new SchedException(Status.UNRECOVERABLE, + "backend " + srcReplica.getBackendId() + ", dest path is empty"); + } + storageMediaMigrationTask.setDataDir(destPath); + this.taskTimeoutMs = getApproximateTimeoutMs(); + this.state = State.RUNNING; + return storageMediaMigrationTask; + } // database lock should be held. public CloneTask createCloneReplicaAndTask() throws SchedException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 1f80cada9e..68ab63a851 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -17,6 +17,8 @@ package org.apache.doris.clone; +import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt; +import org.apache.doris.analysis.AdminRebalanceDiskStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.ColocateTableIndex.GroupId; @@ -51,7 +53,9 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.CloneTask; import org.apache.doris.task.DropReplicaTask; +import org.apache.doris.task.StorageMediaMigrationTask; import org.apache.doris.thrift.TFinishTaskRequest; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.TransactionState; @@ -101,13 +105,14 @@ public class TabletScheduler extends MasterDaemon { private static final long SCHEDULE_INTERVAL_MS = 1000; // 1s - public static final int BALANCE_SLOT_NUM_FOR_PATH = 2; + // 1 slot for reduce unnecessary balance task, provided a more accurate estimate of capacity + public static final int BALANCE_SLOT_NUM_FOR_PATH = 1; /* * Tablet is added to pendingTablets as well it's id in allTabletIds. * TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletIds when * handling a tablet. - * Tablet' id can only be removed after the clone task is done(timeout, cancelled or finished). + * Tablet' id can only be removed after the clone task or migration task is done(timeout, cancelled or finished). * So if a tablet's id is still in allTabletIds, TabletChecker can not add tablet to TabletScheduler. * * pendingTablets + runningTablets = allTabletIds @@ -135,6 +140,7 @@ public class TabletScheduler extends MasterDaemon { private ColocateTableIndex colocateTableIndex; private TabletSchedulerStat stat; private Rebalancer rebalancer; + private Rebalancer diskRebalancer; // result of adding a tablet to pendingTablets public enum AddResult { @@ -157,6 +163,8 @@ public class TabletScheduler extends MasterDaemon { } else { this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex); } + // if rebalancer can not get new task, then use diskRebalancer to get task + this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex); } public TabletSchedulerStat getStat() { @@ -244,6 +252,14 @@ public class TabletScheduler extends MasterDaemon { return allTabletIds.contains(tabletId); } + public synchronized void rebalanceDisk(AdminRebalanceDiskStmt stmt) { + diskRebalancer.addPrioBackends(stmt.getBackends(), stmt.getTimeoutS()); + } + + public synchronized void cancelRebalanceDisk(AdminCancelRebalanceDiskStmt stmt) { + diskRebalancer.removePrioBackends(stmt.getBackends()); + } + /** * Iterate current tablets, change their priority to VERY_HIGH if necessary. */ @@ -300,6 +316,7 @@ public class TabletScheduler extends MasterDaemon { updateClusterLoadStatistic(); rebalancer.updateLoadStatistic(statisticMap); + diskRebalancer.updateLoadStatistic(statisticMap); adjustPriorities(); @@ -463,7 +480,6 @@ public class TabletScheduler extends MasterDaemon { * Try to schedule a single tablet. */ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException { - LOG.debug("schedule tablet: {}, type: {}, status: {}", tabletCtx.getTabletId(), tabletCtx.getType(), tabletCtx.getTabletStatus()); long currentTime = System.currentTimeMillis(); tabletCtx.setLastSchedTime(currentTime); tabletCtx.setLastVisitedTime(currentTime); @@ -561,6 +577,11 @@ public class TabletScheduler extends MasterDaemon { throw new SchedException(Status.UNRECOVERABLE, "tablet is unhealthy when doing balance"); } + // for disk balance more accutely, we only schedule tablet when has lastly stat info about disk + if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE && + tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.DISK_BALANCE) { + checkDiskBalanceLastSuccTime(tabletCtx.getTempSrcBackendId(), tabletCtx.getTempSrcPathHash()); + } // we do not concern priority here. // once we take the tablet out of priority queue, priority is meaningless. tabletCtx.setTablet(tablet); @@ -574,6 +595,25 @@ public class TabletScheduler extends MasterDaemon { } } + private void checkDiskBalanceLastSuccTime(long beId, long pathHash) throws SchedException { + PathSlot pathSlot = backendsWorkingSlots.get(beId); + if (pathSlot == null) { + throw new SchedException(Status.UNRECOVERABLE, "path slot does not exist"); + } + long succTime = pathSlot.getDiskBalanceLastSuccTime(pathHash); + if (succTime > lastStatUpdateTime) { + throw new SchedException(Status.UNRECOVERABLE, "stat info is outdated"); + } + } + + public void updateDiskBalanceLastSuccTime(long beId, long pathHash) { + PathSlot pathSlot = backendsWorkingSlots.get(beId); + if (pathSlot == null) { + return; + } + pathSlot.updateDiskBalanceLastSuccTime(pathHash); + } + private void handleTabletByTypeAndStatus(TabletStatus status, TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException { if (tabletCtx.getType() == Type.REPAIR) { @@ -1189,6 +1229,21 @@ public class TabletScheduler extends MasterDaemon { for (TabletSchedCtx tabletCtx : alternativeTablets) { addTablet(tabletCtx, false); } + if (Config.disable_disk_balance) { + LOG.info("disk balance is disabled. skip selecting tablets for disk balance"); + return; + } + List diskBalanceTablets = Lists.newArrayList(); + // if default rebalancer can not get new task or user given prio BEs, then use disk rebalancer to get task + if (diskRebalancer.hasPrioBackends() || alternativeTablets.isEmpty()) { + diskBalanceTablets = diskRebalancer.selectAlternativeTablets(); + } + for (TabletSchedCtx tabletCtx : diskBalanceTablets) { + // add if task from prio backend or cluster is balanced + if (alternativeTablets.isEmpty() || tabletCtx.getOrigPriority() == TabletSchedCtx.Priority.NORMAL) { + addTablet(tabletCtx, false); + } + } } /** @@ -1196,7 +1251,18 @@ public class TabletScheduler extends MasterDaemon { */ private void doBalance(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException { stat.counterBalanceSchedule.incrementAndGet(); - rebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots, batchTask); + AgentTask task = null; + if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.DISK_BALANCE) { + task = diskRebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots); + checkDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash()); + checkDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash()); + } else if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.BE_BALANCE) { + task = rebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots); + } else { + throw new SchedException(Status.UNRECOVERABLE, + "unknown balance type: " + tabletCtx.getBalanceType().toString()); + } + batchTask.addTask(task); } // choose a path on a backend which is fit for the tablet @@ -1347,7 +1413,7 @@ public class TabletScheduler extends MasterDaemon { // get next batch of tablets from queue. private synchronized List getNextTabletCtxBatch() { List list = Lists.newArrayList(); - int count = Math.max(MIN_BATCH_NUM, getCurrentAvailableSlotNum()); + int count = Math.min(MIN_BATCH_NUM, getCurrentAvailableSlotNum()); while (count > 0) { TabletSchedCtx tablet = pendingTablets.poll(); if (tablet == null) { @@ -1368,6 +1434,29 @@ public class TabletScheduler extends MasterDaemon { return total; } + public boolean finishStorageMediaMigrationTask(StorageMediaMigrationTask migrationTask, + TFinishTaskRequest request) { + long tabletId = migrationTask.getTabletId(); + TabletSchedCtx tabletCtx = takeRunningTablets(tabletId); + if (tabletCtx == null) { + // tablet does not exist, the task may be created by ReportHandler.tabletReport(ssd => hdd) + LOG.warn("tablet info does not exist: {}", tabletId); + return true; + } + if (tabletCtx.getBalanceType() != TabletSchedCtx.BalanceType.DISK_BALANCE) { + // this should not happen + LOG.warn("task type is not as excepted. tablet {}", tabletId); + return true; + } + if (request.getTaskStatus().getStatusCode() == TStatusCode.OK) { + // if we have a success task, then stat must be refreshed before schedule a new task + updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash()); + updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash()); + } + // we need this function to free slot for this migration task + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, "finished"); + return true; + } /** * return true if we want to remove the clone task from AgentTaskQueue */ @@ -1379,6 +1468,11 @@ public class TabletScheduler extends MasterDaemon { // tablet does not exist, no need to keep task. return true; } + if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.DISK_BALANCE) { + // this should not happen + LOG.warn("task type is not as excepted. tablet {}", tabletId); + return true; + } Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING, tabletCtx.getState()); try { @@ -1706,6 +1800,22 @@ public class TabletScheduler extends MasterDaemon { slot.balanceSlot++; slot.rectify(); } + + public synchronized void updateDiskBalanceLastSuccTime(long pathHash) { + Slot slot = pathSlots.get(pathHash); + if (slot == null) { + return; + } + slot.diskBalanceLastSuccTime = System.currentTimeMillis(); + } + + public synchronized long getDiskBalanceLastSuccTime(long pathHash) { + Slot slot = pathSlots.get(pathHash); + if (slot == null) { + return 0L; + } + return slot.diskBalanceLastSuccTime; + } } public List> getSlotsInfo() { @@ -1726,6 +1836,9 @@ public class TabletScheduler extends MasterDaemon { public long totalCopySize = 0; public long totalCopyTimeMs = 0; + // for disk balance + public long diskBalanceLastSuccTime = 0; + public Slot(int total) { this.total = total; this.available = total; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index a9a8b9e04f..1551676910 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1133,6 +1133,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static boolean disable_balance = false; + /** + * if set to true, TabletScheduler will not do disk balance. + */ + @ConfField(mutable = true, masterOnly = true) + public static boolean disable_disk_balance = false; + // if the number of scheduled tablets in TabletScheduler exceed max_scheduling_tablets // skip checking. @ConfField(mutable = true, masterOnly = true) diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 5230f9f40a..f0ae6a3ee2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -47,6 +47,7 @@ import org.apache.doris.task.DownloadTask; import org.apache.doris.task.PublishVersionTask; import org.apache.doris.task.PushTask; import org.apache.doris.task.SnapshotTask; +import org.apache.doris.task.StorageMediaMigrationTask; import org.apache.doris.task.UpdateTabletMetaInfoTask; import org.apache.doris.task.UploadTask; import org.apache.doris.thrift.TBackend; @@ -115,8 +116,8 @@ public class MasterImpl { AgentTask task = AgentTaskQueue.getTask(backendId, taskType, signature); if (task == null) { - if (taskType != TTaskType.DROP && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE - && taskType != TTaskType.RELEASE_SNAPSHOT && taskType != TTaskType.CLEAR_TRANSACTION_TASK) { + if (taskType != TTaskType.DROP && taskType != TTaskType.RELEASE_SNAPSHOT + && taskType != TTaskType.CLEAR_TRANSACTION_TASK) { String errMsg = "cannot find task. type: " + taskType + ", backendId: " + backendId + ", signature: " + signature; LOG.warn(errMsg); @@ -137,7 +138,8 @@ public class MasterImpl { if (taskType != TTaskType.MAKE_SNAPSHOT && taskType != TTaskType.UPLOAD && taskType != TTaskType.DOWNLOAD && taskType != TTaskType.MOVE && taskType != TTaskType.CLONE && taskType != TTaskType.PUBLISH_VERSION - && taskType != TTaskType.CREATE && taskType != TTaskType.UPDATE_TABLET_META_INFO) { + && taskType != TTaskType.CREATE && taskType != TTaskType.UPDATE_TABLET_META_INFO + && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE) { return result; } } @@ -175,6 +177,9 @@ public class MasterImpl { case CLONE: finishClone(task, request); break; + case STORAGE_MEDIUM_MIGRATE: + finishStorageMediumMigrate(task, request); + break; case CHECK_CONSISTENCY: finishConsistencyCheck(task, request); break; @@ -699,6 +704,12 @@ public class MasterImpl { AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.CLONE, task.getSignature()); } + private void finishStorageMediumMigrate(AgentTask task, TFinishTaskRequest request) { + StorageMediaMigrationTask migrationTask = (StorageMediaMigrationTask) task; + Catalog.getCurrentCatalog().getTabletScheduler().finishStorageMediaMigrationTask(migrationTask, request); + AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.STORAGE_MEDIUM_MIGRATE, task.getSignature()); + } + private void finishConsistencyCheck(AgentTask task, TFinishTaskRequest request) { CheckConsistencyTask checkConsistencyTask = (CheckConsistencyTask) task; diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index ea4abd9c09..e1f1051315 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -360,10 +360,12 @@ public class ReportHandler extends Daemon { // 1. CREATE // 2. SYNC DELETE // 3. CHECK_CONSISTENCY + // 4. STORAGE_MDEIUM_MIGRATE if (task.getTaskType() == TTaskType.CREATE || (task.getTaskType() == TTaskType.PUSH && ((PushTask) task).getPushType() == TPushType.DELETE && ((PushTask) task).isSyncDelete()) - || task.getTaskType() == TTaskType.CHECK_CONSISTENCY) { + || task.getTaskType() == TTaskType.CHECK_CONSISTENCY + || task.getTaskType() == TTaskType.STORAGE_MEDIUM_MIGRATE) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index c66349a2ce..c35a27b8f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -21,6 +21,8 @@ import org.apache.doris.analysis.AdminCancelRepairTableStmt; import org.apache.doris.analysis.AdminCheckTabletsStmt; import org.apache.doris.analysis.AdminCleanTrashStmt; import org.apache.doris.analysis.AdminCompactTableStmt; +import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt; +import org.apache.doris.analysis.AdminRebalanceDiskStmt; import org.apache.doris.analysis.AdminRepairTableStmt; import org.apache.doris.analysis.AdminSetConfigStmt; import org.apache.doris.analysis.AdminSetReplicaStatusStmt; @@ -281,6 +283,10 @@ public class DdlExecutor { catalog.getSyncJobManager().stopSyncJob((StopSyncJobStmt) ddlStmt); } else if (ddlStmt instanceof AdminCleanTrashStmt) { catalog.cleanTrash((AdminCleanTrashStmt) ddlStmt); + } else if (ddlStmt instanceof AdminRebalanceDiskStmt) { + catalog.getTabletScheduler().rebalanceDisk((AdminRebalanceDiskStmt) ddlStmt); + } else if (ddlStmt instanceof AdminCancelRebalanceDiskStmt) { + catalog.getTabletScheduler().cancelRebalanceDisk((AdminCancelRebalanceDiskStmt) ddlStmt); } else if (ddlStmt instanceof CreateSqlBlockRuleStmt) { catalog.getSqlBlockRuleMgr().createSqlBlockRule((CreateSqlBlockRuleStmt) ddlStmt); } else if (ddlStmt instanceof AlterSqlBlockRuleStmt) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java index 72aef5a4d9..1ddbde77de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java @@ -21,10 +21,14 @@ import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageMediumMigrateReq; import org.apache.doris.thrift.TTaskType; +import com.google.common.base.Strings; + public class StorageMediaMigrationTask extends AgentTask { private int schemaHash; private TStorageMedium toStorageMedium; + // if dataDir is specified, the toStorageMedium is meaning less + private String dataDir; public StorageMediaMigrationTask(long backendId, long tabletId, int schemaHash, TStorageMedium toStorageMedium) { @@ -36,9 +40,20 @@ public class StorageMediaMigrationTask extends AgentTask { public TStorageMediumMigrateReq toThrift() { TStorageMediumMigrateReq request = new TStorageMediumMigrateReq(tabletId, schemaHash, toStorageMedium); + if (!Strings.isNullOrEmpty(dataDir)) { + request.setDataDir(dataDir); + } return request; } + public String getDataDir() { + return dataDir; + } + + public void setDataDir(String dataDir) { + this.dataDir = dataDir; + } + public int getSchemaHash() { return schemaHash; } diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index a2eeba952c..7ab3c7ed0b 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -171,6 +171,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("distinctpcsa", new Integer(SqlParserSymbols.KW_DISTINCTPCSA)); keywordMap.put("distributed", new Integer(SqlParserSymbols.KW_DISTRIBUTED)); keywordMap.put("distribution", new Integer(SqlParserSymbols.KW_DISTRIBUTION)); + keywordMap.put("disk", new Integer(SqlParserSymbols.KW_DISK)); keywordMap.put("dynamic", new Integer(SqlParserSymbols.KW_DYNAMIC)); keywordMap.put("div", new Integer(SqlParserSymbols.KW_DIV)); keywordMap.put("double", new Integer(SqlParserSymbols.KW_DOUBLE)); @@ -319,6 +320,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("range", new Integer(SqlParserSymbols.KW_RANGE)); keywordMap.put("read", new Integer(SqlParserSymbols.KW_READ)); keywordMap.put("real", new Integer(SqlParserSymbols.KW_DOUBLE)); + keywordMap.put("rebalance", new Integer(SqlParserSymbols.KW_REBALANCE)); keywordMap.put("recover", new Integer(SqlParserSymbols.KW_RECOVER)); keywordMap.put("refresh", new Integer(SqlParserSymbols.KW_REFRESH)); keywordMap.put("regexp", new Integer(SqlParserSymbols.KW_REGEXP)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java new file mode 100644 index 0000000000..8e92567adf --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.clone.RebalancerTestUtil; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.mysql.privilege.MockedAuth; +import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +import mockit.Mocked; + +public class AdminCancelRebalanceDiskStmtTest { + + private static Analyzer analyzer; + + @Mocked + private PaloAuth auth; + @Mocked + private ConnectContext ctx; + + @Before() + public void setUp() { + Config.disable_cluster_feature = false; + analyzer = AccessTestUtil.fetchAdminAnalyzer(true); + MockedAuth.mockedAuth(auth); + MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1"); + + List beIds = Lists.newArrayList(10001L, 10002L, 10003L, 10004L); + beIds.forEach(id -> Catalog.getCurrentSystemInfo().addBackend(RebalancerTestUtil.createBackend(id, 2048, 0))); + } + + @Test + public void testParticularBackends() throws AnalysisException { + List backends = Lists.newArrayList( + "192.168.0.10003:9051", "192.168.0.10004:9051", "192.168.0.10005:9051", "192.168.0.10006:9051"); + final AdminCancelRebalanceDiskStmt stmt = new AdminCancelRebalanceDiskStmt(backends); + stmt.analyze(analyzer); + Assert.assertEquals(2, stmt.getBackends().size()); + } + + @Test + public void testEmpty() throws AnalysisException { + List backends = Lists.newArrayList(); + final AdminCancelRebalanceDiskStmt stmt = new AdminCancelRebalanceDiskStmt(backends); + stmt.analyze(analyzer); + Assert.assertEquals(0, stmt.getBackends().size()); + } + + @Test + public void testNull() throws AnalysisException { + final AdminCancelRebalanceDiskStmt stmt = new AdminCancelRebalanceDiskStmt(null); + stmt.analyze(analyzer); + Assert.assertEquals(4, stmt.getBackends().size()); + } + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminRebalanceDiskStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminRebalanceDiskStmtTest.java new file mode 100644 index 0000000000..e83693ceda --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminRebalanceDiskStmtTest.java @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.clone.RebalancerTestUtil; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.mysql.privilege.MockedAuth; +import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +//import java.util.ArrayList; +import java.util.List; + +import mockit.Mocked; + +public class AdminRebalanceDiskStmtTest { + + private static Analyzer analyzer; + + @Mocked + private PaloAuth auth; + @Mocked + private ConnectContext ctx; + + @Before() + public void setUp() { + Config.disable_cluster_feature = false; + analyzer = AccessTestUtil.fetchAdminAnalyzer(true); + MockedAuth.mockedAuth(auth); + MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1"); + + List beIds = Lists.newArrayList(10001L, 10002L, 10003L, 10004L); + beIds.forEach(id -> Catalog.getCurrentSystemInfo().addBackend(RebalancerTestUtil.createBackend(id, 2048, 0))); + } + + @Test + public void testParticularBackends() throws AnalysisException { + List backends = Lists.newArrayList( + "192.168.0.10003:9051", "192.168.0.10004:9051", "192.168.0.10005:9051", "192.168.0.10006:9051"); + final AdminRebalanceDiskStmt stmt = new AdminRebalanceDiskStmt(backends); + stmt.analyze(analyzer); + Assert.assertEquals(2, stmt.getBackends().size()); + } + + @Test + public void testEmpty() throws AnalysisException { + List backends = Lists.newArrayList(); + final AdminRebalanceDiskStmt stmt = new AdminRebalanceDiskStmt(backends); + stmt.analyze(analyzer); + Assert.assertEquals(0, stmt.getBackends().size()); + } + + @Test + public void testNull() throws AnalysisException { + final AdminRebalanceDiskStmt stmt = new AdminRebalanceDiskStmt(null); + stmt.analyze(analyzer); + Assert.assertEquals(4, stmt.getBackends().size()); + } + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java new file mode 100644 index 0000000000..1d7cb00a39 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java @@ -0,0 +1,262 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DataProperty; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DiskInfo; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.ReplicaAllocation; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.clone.TabletScheduler.PathSlot; +import org.apache.doris.resource.Tag; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTask; +import org.apache.doris.task.StorageMediaMigrationTask; +import org.apache.doris.thrift.TStorageMedium; +import org.apache.doris.thrift.TStorageType; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Lists; +import com.google.common.collect.Table; +import com.google.common.collect.Maps; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.config.Configurator; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +//import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import mockit.Delegate; +import mockit.Expectations; +import mockit.Mocked; +//import static com.google.common.collect.MoreCollectors.onlyElement; + +public class DiskRebalanceTest { + private static final Logger LOG = LogManager.getLogger(DiskRebalanceTest.class); + + @Mocked + private Catalog catalog; + + private long id = 10086; + + private Database db; + private OlapTable olapTable; + + private final SystemInfoService systemInfoService = new SystemInfoService(); + private final TabletInvertedIndex invertedIndex = new TabletInvertedIndex(); + private Table statisticMap; + + @Before + public void setUp() throws Exception { + db = new Database(1, "test db"); + db.setClusterName(SystemInfoService.DEFAULT_CLUSTER); + new Expectations() { + { + catalog.getDbIds(); + minTimes = 0; + result = db.getId(); + + catalog.getDbNullable(anyLong); + minTimes = 0; + result = db; + + catalog.getDbOrException(anyLong, (Function) any); + minTimes = 0; + result = db; + + Catalog.getCurrentCatalogJournalVersion(); + minTimes = 0; + result = FeConstants.meta_version; + + catalog.getNextId(); + minTimes = 0; + result = new Delegate() { + long a() { + return id++; + } + }; + + Catalog.getCurrentSystemInfo(); + minTimes = 0; + result = systemInfoService; + + Catalog.getCurrentInvertedIndex(); + minTimes = 0; + result = invertedIndex; + + Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId(); + result = 111; + + Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(anyLong, anyLong, (List) any); + result = true; + } + }; + // Test mock validation + Assert.assertEquals(111, Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId()); + Assert.assertTrue(Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(1, 2, Lists.newArrayList(3L))); + } + + private void generateStatisticMap() { + ClusterLoadStatistic loadStatistic = new ClusterLoadStatistic(SystemInfoService.DEFAULT_CLUSTER, + Tag.DEFAULT_BACKEND_TAG, systemInfoService, invertedIndex); + loadStatistic.init(); + statisticMap = HashBasedTable.create(); + statisticMap.put(SystemInfoService.DEFAULT_CLUSTER, Tag.DEFAULT_BACKEND_TAG, loadStatistic); + } + + private void createPartitionsForTable(OlapTable olapTable, MaterializedIndex index, Long partitionCount) { + // partition id start from 31 + LongStream.range(0, partitionCount).forEach(idx -> { + long id = 31 + idx; + Partition partition = new Partition(id, "p" + idx, index, new HashDistributionInfo()); + olapTable.addPartition(partition); + olapTable.getPartitionInfo().addPartition(id, new DataProperty(TStorageMedium.HDD), + ReplicaAllocation.DEFAULT_ALLOCATION, false); + }); + } + + @Test + public void testDiskRebalancerWithSameUsageDisk() { + // init system + List beIds = Lists.newArrayList(10001L, 10002L, 10003L); + beIds.forEach(id -> systemInfoService.addBackend(RebalancerTestUtil.createBackend(id, 2048, Lists.newArrayList(512L,512L), 2))); + + olapTable = new OlapTable(2, "fake table", new ArrayList<>(), KeysType.DUP_KEYS, + new RangePartitionInfo(), new HashDistributionInfo()); + db.createTable(olapTable); + + // 1 table, 3 partitions p0,p1,p2 + MaterializedIndex materializedIndex = new MaterializedIndex(olapTable.getId(), null); + createPartitionsForTable(olapTable, materializedIndex, 3L); + olapTable.setIndexMeta(materializedIndex.getId(), "fake index", Lists.newArrayList(new Column()), + 0, 0, (short) 0, TStorageType.COLUMN, KeysType.DUP_KEYS); + + // Tablet distribution: we add them to olapTable & build invertedIndex manually + // all of tablets are in first path of it's backend + RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p0", TStorageMedium.HDD, + 50000, Lists.newArrayList(10001L, 10002L, 10003L)); + + RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p1", TStorageMedium.HDD, + 60000, Lists.newArrayList(10001L, 10002L, 10003L)); + + RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p2", TStorageMedium.HDD, + 70000, Lists.newArrayList(10001L, 10002L, 10003L)); + + // case start + Configurator.setLevel("org.apache.doris.clone.DiskRebalancer", Level.DEBUG); + + Rebalancer rebalancer = new DiskRebalancer(Catalog.getCurrentSystemInfo(), Catalog.getCurrentInvertedIndex()); + generateStatisticMap(); + rebalancer.updateLoadStatistic(statisticMap); + List alternativeTablets = rebalancer.selectAlternativeTablets(); + // check alternativeTablets; + Assert.assertTrue(alternativeTablets.isEmpty()); + } + + @Test + public void testDiskRebalancerWithDiffUsageDisk() { + // init system + systemInfoService.addBackend(RebalancerTestUtil.createBackend(10001L, 2048, Lists.newArrayList(1024L), 1)); + systemInfoService.addBackend(RebalancerTestUtil.createBackend(10002L, 2048, Lists.newArrayList(1024L, 512L), 2)); + systemInfoService.addBackend(RebalancerTestUtil.createBackend(10003L, 2048, Lists.newArrayList(1024L, 512L, 513L), 3)); + + olapTable = new OlapTable(2, "fake table", new ArrayList<>(), KeysType.DUP_KEYS, + new RangePartitionInfo(), new HashDistributionInfo()); + db.createTable(olapTable); + + // 1 table, 3 partitions p0,p1,p2 + MaterializedIndex materializedIndex = new MaterializedIndex(olapTable.getId(), null); + createPartitionsForTable(olapTable, materializedIndex, 3L); + olapTable.setIndexMeta(materializedIndex.getId(), "fake index", Lists.newArrayList(new Column()), + 0, 0, (short) 0, TStorageType.COLUMN, KeysType.DUP_KEYS); + + // Tablet distribution: we add them to olapTable & build invertedIndex manually + // all of tablets are in first path of it's backend + RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p0", TStorageMedium.HDD, + 50000, Lists.newArrayList(10001L, 10002L, 10003L), Lists.newArrayList(0L, 100L, 300L)); + + RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p1", TStorageMedium.HDD, + 60000, Lists.newArrayList(10001L, 10002L, 10003L), Lists.newArrayList(50L, 0L, 200L)); + + RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p2", TStorageMedium.HDD, + 70000, Lists.newArrayList(10001L, 10002L, 10003L), Lists.newArrayList(100L, 200L, 0L)); + + // case start + Configurator.setLevel("org.apache.doris.clone.DiskRebalancer", Level.DEBUG); + + Rebalancer rebalancer = new DiskRebalancer(Catalog.getCurrentSystemInfo(), Catalog.getCurrentInvertedIndex()); + generateStatisticMap(); + rebalancer.updateLoadStatistic(statisticMap); + List alternativeTablets = rebalancer.selectAlternativeTablets(); + // check alternativeTablets; + Assert.assertEquals(2, alternativeTablets.size()); + Map backendsWorkingSlots = Maps.newConcurrentMap(); + for (Backend be : Catalog.getCurrentSystemInfo().getClusterBackends(SystemInfoService.DEFAULT_CLUSTER)) { + if (!backendsWorkingSlots.containsKey(be.getId())) { + List pathHashes = be.getDisks().values().stream().map(DiskInfo::getPathHash).collect(Collectors.toList()); + PathSlot slot = new PathSlot(pathHashes, Config.schedule_slot_num_per_path); + backendsWorkingSlots.put(be.getId(), slot); + } + } + + for (TabletSchedCtx tabletCtx : alternativeTablets) { + LOG.info("try to schedule tablet {}", tabletCtx.getTabletId()); + try { + tabletCtx.setStorageMedium(TStorageMedium.HDD); + tabletCtx.setTablet(olapTable.getPartition(tabletCtx.getPartitionId()).getIndex(tabletCtx.getIndexId()).getTablet(tabletCtx.getTabletId())); + tabletCtx.setVersionInfo(1, 1); + tabletCtx.setSchemaHash(olapTable.getSchemaHashByIndexId(tabletCtx.getIndexId())); + tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // rebalance tablet should be healthy first + + AgentTask task = rebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots); + if (tabletCtx.getTabletSize() == 0) { + Assert.fail("no exception"); + } else { + Assert.assertTrue(task instanceof StorageMediaMigrationTask); + } + } catch (SchedException e) { + LOG.info("schedule tablet {} failed: {}", tabletCtx.getTabletId(), e.getMessage()); + } + } + } + +} + diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java index fc4dd7dd04..e94fa8655d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java @@ -36,6 +36,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.resource.Tag; +import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; @@ -182,6 +183,31 @@ public class RebalanceTest { }); } + @Test + public void testPrioBackends() { + Rebalancer rebalancer = new DiskRebalancer(Catalog.getCurrentSystemInfo(), Catalog.getCurrentInvertedIndex()); + // add + { + List backends = Lists.newArrayList(); + for (int i = 0; i < 3; i++) { + backends.add(RebalancerTestUtil.createBackend(10086 + i, 2048, 0)); + } + rebalancer.addPrioBackends(backends, 1000); + Assert.assertTrue(rebalancer.hasPrioBackends()); + } + + // remove + for (int i = 0; i < 3; i++) { + List backends = Lists.newArrayList(RebalancerTestUtil.createBackend(10086 + i, 2048, 0)); + rebalancer.removePrioBackends(backends); + if (i == 2) { + Assert.assertFalse(rebalancer.hasPrioBackends()); + } else { + Assert.assertTrue(rebalancer.hasPrioBackends()); + } + } + } + @Test public void testPartitionRebalancer() { Configurator.setLevel("org.apache.doris.clone.PartitionRebalancer", Level.DEBUG); @@ -218,7 +244,8 @@ public class RebalanceTest { tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // rebalance tablet should be healthy first // createCloneReplicaAndTask, create replica will change invertedIndex too. - rebalancer.createBalanceTask(tabletCtx, tabletScheduler.getBackendsWorkingSlots(), batchTask); + AgentTask task = rebalancer.createBalanceTask(tabletCtx, tabletScheduler.getBackendsWorkingSlots()); + batchTask.addTask(task); } catch (SchedException e) { LOG.warn("schedule tablet {} failed: {}", tabletCtx.getTabletId(), e.getMessage()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java index 4b3e4c693b..5e02a51ffc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java @@ -19,6 +19,7 @@ package org.apache.doris.clone; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.collect.Lists; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.MaterializedIndex; @@ -40,14 +41,20 @@ public class RebalancerTestUtil { // Add only one path, PathHash:id public static Backend createBackend(long id, long totalCap, long usedCap) { + return createBackend(id, totalCap, Lists.newArrayList(usedCap), 1); + } + // size of usedCaps should equal to diskNum + public static Backend createBackend(long id, long totalCap, List usedCaps, int diskNum) { // ip:port won't be checked Backend be = new Backend(id, "192.168.0." + id, 9051); Map disks = Maps.newHashMap(); - DiskInfo diskInfo = new DiskInfo("/path1"); - diskInfo.setPathHash(id); - diskInfo.setTotalCapacityB(totalCap); - diskInfo.setDataUsedCapacityB(usedCap); - disks.put(diskInfo.getRootPath(), diskInfo); + for (int i = 0; i < diskNum; i++) { + DiskInfo diskInfo = new DiskInfo("/path" + (i + 1)); + diskInfo.setPathHash(id + i); + diskInfo.setTotalCapacityB(totalCap); + diskInfo.setDataUsedCapacityB(usedCaps.get(i)); + disks.put(diskInfo.getRootPath(), diskInfo); + } be.setDisks(ImmutableMap.copyOf(disks)); be.setAlive(true); be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER); @@ -59,28 +66,37 @@ public class RebalancerTestUtil { // Only use the partition's baseIndex for simplicity public static void createTablet(TabletInvertedIndex invertedIndex, Database db, OlapTable olapTable, String partitionName, TStorageMedium medium, int tabletId, List beIds) { + createTablet(invertedIndex, db, olapTable, partitionName, medium, tabletId, beIds, null); + } + public static void createTablet(TabletInvertedIndex invertedIndex, Database db, OlapTable olapTable, String partitionName, TStorageMedium medium, + int tabletId, List beIds, List replicaSizes) { Partition partition = olapTable.getPartition(partitionName); MaterializedIndex baseIndex = partition.getBaseIndex(); int schemaHash = olapTable.getSchemaHashByIndexId(baseIndex.getId()); TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), partition.getId(), baseIndex.getId(), - schemaHash, medium); + schemaHash, medium); Tablet tablet = new Tablet(tabletId); // add tablet to olapTable olapTable.getPartition("p0").getBaseIndex().addTablet(tablet, tabletMeta); - createReplicasAndAddToIndex(invertedIndex, tabletMeta, tablet, beIds); + createReplicasAndAddToIndex(invertedIndex, tabletMeta, tablet, beIds, replicaSizes); } // Create replicas on backends which are numbered in beIds. // The tablet & replicas will be added to invertedIndex. - public static void createReplicasAndAddToIndex(TabletInvertedIndex invertedIndex, TabletMeta tabletMeta, Tablet tablet, List beIds) { + public static void createReplicasAndAddToIndex(TabletInvertedIndex invertedIndex, TabletMeta tabletMeta, + Tablet tablet, List beIds, List replicaSizes) { invertedIndex.addTablet(tablet.getId(), tabletMeta); IntStream.range(0, beIds.size()).forEach(i -> { Replica replica = new Replica(tablet.getId() + i, beIds.get(i), Replica.ReplicaState.NORMAL, 1, tabletMeta.getOldSchemaHash()); // We've set pathHash to beId for simplicity replica.setPathHash(beIds.get(i)); + if (replicaSizes != null) { + // for disk rebalancer, every beId corresponding to a replicaSize + replica.updateStat(replicaSizes.get(i), 0); + } // isRestore set true, to avoid modifying Catalog.getCurrentInvertedIndex tablet.addReplica(replica, true); invertedIndex.addReplica(tablet.getId(), replica); diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java index 375c86fc01..c1e7d42dbd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -89,6 +89,7 @@ public class AgentTaskTest { private AgentTask rollupTask; private AgentTask schemaChangeTask; private AgentTask cancelDeleteTask; + private AgentTask storageMediaMigrationTask; @Before public void setUp() throws AnalysisException { @@ -140,6 +141,11 @@ public class AgentTaskTest { new SchemaChangeTask(null, backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, columns, schemaHash2, schemaHash1, shortKeyNum, storageType, null, 0, TKeysType.AGG_KEYS); + + // storageMediaMigrationTask + storageMediaMigrationTask = + new StorageMediaMigrationTask(backendId1, tabletId1, schemaHash1, TStorageMedium.HDD); + ((StorageMediaMigrationTask) storageMediaMigrationTask).setDataDir("/home/a"); } @Test @@ -211,6 +217,15 @@ public class AgentTaskTest { Assert.assertEquals(TTaskType.SCHEMA_CHANGE, request6.getTaskType()); Assert.assertEquals(schemaChangeTask.getSignature(), request6.getSignature()); Assert.assertNotNull(request6.getAlterTabletReq()); + + // storageMediaMigrationTask + TAgentTaskRequest request7 = + (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, storageMediaMigrationTask); + Assert.assertEquals(TTaskType.STORAGE_MEDIUM_MIGRATE, request7.getTaskType()); + Assert.assertEquals(storageMediaMigrationTask.getSignature(), request7.getSignature()); + Assert.assertNotNull(request7.getStorageMediumMigrateReq()); + Assert.assertTrue(request7.getStorageMediumMigrateReq().isSetDataDir()); + Assert.assertEquals(request7.getStorageMediumMigrateReq().getDataDir(), "/home/a"); } @Test