From cdfbfd1f6b171684e7f8cbf9eafb9a506f2f1471 Mon Sep 17 00:00:00 2001 From: plat1ko Date: Sat, 6 May 2023 11:06:29 +0800 Subject: [PATCH] [fix](replica) Fix inconsistent replica id between FE and BE (#18688) --- be/src/agent/task_worker_pool.cpp | 83 +++++++--------- be/src/olap/snapshot_manager.cpp | 1 + be/src/olap/storage_engine.cpp | 5 - be/src/olap/tablet.cpp | 6 +- be/src/olap/tablet.h | 2 +- be/src/olap/tablet_manager.cpp | 10 +- be/src/olap/tablet_meta.cpp | 20 ---- be/src/olap/tablet_meta.h | 2 +- be/src/olap/task/engine_clone_task.cpp | 47 +++++---- .../task/engine_storage_migration_task.cpp | 3 - .../doris/catalog/TabletInvertedIndex.java | 28 ++++-- .../apache/doris/master/ReportHandler.java | 22 +++-- .../doris/task/UpdateTabletMetaInfoTask.java | 97 +++++-------------- gensrc/thrift/AgentService.thrift | 5 +- 14 files changed, 133 insertions(+), 198 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index f93cee6a6b..d0d0f9b576 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -295,6 +295,9 @@ bool TaskWorkerPool::_register_task_info(const TTaskType::type task_type, int64_ // no need to report task of these types return true; } + if (signature == -1) { // No need to report task with unintialized signature + return true; + } std::lock_guard task_signatures_lock(_s_task_signatures_lock); std::set& signature_set = _s_task_signatures[task_type]; return signature_set.insert(signature).second; @@ -965,7 +968,6 @@ void TaskWorkerPool::_clear_transaction_task_worker_thread_callback() { void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TUpdateTabletMetaInfoReq update_tablet_meta_req; { std::unique_lock worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -973,68 +975,55 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() { if (!_is_work) { return; } - agent_task_req = _tasks.front(); - update_tablet_meta_req = agent_task_req.update_tablet_meta_info_req; _tasks.pop_front(); } LOG(INFO) << "get update tablet meta task. signature=" << agent_task_req.signature; Status status; - + auto& update_tablet_meta_req = agent_task_req.update_tablet_meta_info_req; for (auto& tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) { TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( tablet_meta_info.tablet_id); if (tablet == nullptr) { - LOG(WARNING) << "could not find tablet when update partition id. tablet_id=" - << tablet_meta_info.tablet_id - << ", schema_hash=" << tablet_meta_info.schema_hash; + LOG(WARNING) << "could not find tablet when update tablet meta. tablet_id=" + << tablet_meta_info.tablet_id; continue; } - std::lock_guard wrlock(tablet->get_header_lock()); - // update tablet meta - if (!tablet_meta_info.__isset.meta_type) { - tablet->set_partition_id(tablet_meta_info.partition_id); - } else { - switch (tablet_meta_info.meta_type) { - case TTabletMetaType::PARTITIONID: // FIXME(plat1ko): deprecate? - tablet->set_partition_id(tablet_meta_info.partition_id); - break; - case TTabletMetaType::INMEMORY: - if (tablet_meta_info.__isset.storage_policy_id) { - LOG(INFO) << "set tablet storage_policy_id=" - << tablet_meta_info.storage_policy_id; - tablet->tablet_meta()->set_storage_policy_id( - tablet_meta_info.storage_policy_id); - } - if (tablet_meta_info.__isset.is_in_memory) { - tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory( - tablet_meta_info.is_in_memory); - // The field is_in_memory should not be in the tablet_schema. - // it should be in the tablet_meta. - for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) { - rowset_meta->tablet_schema()->set_is_in_memory( - tablet_meta_info.is_in_memory); - } - tablet->get_max_version_schema(wrlock)->set_is_in_memory( - tablet_meta_info.is_in_memory); - } - break; - } + bool need_to_save = false; + if (tablet_meta_info.__isset.storage_policy_id) { + tablet->tablet_meta()->set_storage_policy_id(tablet_meta_info.storage_policy_id); + need_to_save = true; + } + if (tablet_meta_info.__isset.is_in_memory) { + tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory( + tablet_meta_info.is_in_memory); + std::shared_lock rlock(tablet->get_header_lock()); + for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) { + rowset_meta->tablet_schema()->set_is_in_memory(tablet_meta_info.is_in_memory); + } + tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory); + need_to_save = true; + } + if (tablet_meta_info.__isset.replica_id) { + tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id); + } + if (need_to_save) { + std::shared_lock rlock(tablet->get_header_lock()); + tablet->save_meta(); } - tablet->save_meta(); } LOG(INFO) << "finish update tablet meta task. signature=" << agent_task_req.signature; - - TFinishTaskRequest finish_task_request; - finish_task_request.__set_task_status(status.to_thrift()); - finish_task_request.__set_backend(_backend); - finish_task_request.__set_task_type(agent_task_req.task_type); - finish_task_request.__set_signature(agent_task_req.signature); - - _finish_task(finish_task_request); - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); + if (agent_task_req.signature != -1) { + TFinishTaskRequest finish_task_request; + finish_task_request.__set_task_status(status.to_thrift()); + finish_task_request.__set_backend(_backend); + finish_task_request.__set_task_type(agent_task_req.task_type); + finish_task_request.__set_signature(agent_task_req.signature); + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); + } } } diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index cf498c0b7b..d1ef6a96b5 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -163,6 +163,7 @@ Status SnapshotManager::convert_rowset_ids(const std::string& clone_dir, int64_t // should modify tablet id and schema hash because in restore process the tablet id is not // equal to tablet id in meta new_tablet_meta_pb.set_tablet_id(tablet_id); + *new_tablet_meta_pb.mutable_tablet_uid() = TabletUid::gen_uid().to_proto(); new_tablet_meta_pb.set_replica_id(replica_id); new_tablet_meta_pb.set_schema_hash(schema_hash); TabletSchemaSPtr tablet_schema; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index f7968fc650..4015d6f10c 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -969,11 +969,6 @@ Status StorageEngine::load_header(const string& shard_path, const TCloneReq& req string header_path = TabletMeta::construct_header_file_path(schema_hash_path_stream.str(), request.tablet_id); - res = TabletMeta::reset_tablet_uid(header_path); - if (!res.ok()) { - LOG(WARNING) << "fail reset tablet uid file path = " << header_path << " res=" << res; - return res; - } res = _tablet_manager->load_tablet_from_dir(store, request.tablet_id, request.schema_hash, schema_hash_path_stream.str(), false, restore); if (!res.ok()) { diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 75b2824849..d3b010d787 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2371,7 +2371,7 @@ void Tablet::remove_unused_remote_files() { } TabletSchemaSPtr Tablet::tablet_schema() const { - std::shared_lock wrlock(_meta_lock); + std::shared_lock rlock(_meta_lock); return _max_version_schema; } @@ -2384,10 +2384,6 @@ void Tablet::update_max_version_schema(const TabletSchemaSPtr& tablet_schema) { } } -TabletSchemaSPtr Tablet::get_max_version_schema(std::lock_guard&) { - return _max_version_schema; -} - // fetch value by row column Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint32_t segid, const std::vector& rowids, diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 1bdd56b829..e666dcb16d 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -300,7 +300,7 @@ public: TabletSchemaSPtr tablet_schema() const override; - TabletSchemaSPtr get_max_version_schema(std::lock_guard&); + const TabletSchemaSPtr& tablet_schema_unlocked() const { return _max_version_schema; } // Find the related rowset with specified version and return its tablet schema TabletSchemaSPtr tablet_schema(Version version) const { diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 94986cdcfb..e236c0668f 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -846,12 +846,6 @@ Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, path_util::dir_name(path_util::dir_name(path_util::dir_name(header_path))); std::string shard_str = shard_path.substr(shard_path.find_last_of('/') + 1); int32_t shard = stol(shard_str); - // load dir is called by clone, restore, storage migration - // should change tablet uid when tablet object changed - RETURN_NOT_OK_LOG( - TabletMeta::reset_tablet_uid(header_path), - strings::Substitute("failed to set tablet uid when copied meta file. header_path=%0", - header_path)); bool exists = false; RETURN_IF_ERROR(io::global_local_filesystem()->exists(header_path, &exists)); @@ -868,12 +862,14 @@ Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, // has to change shard id here, because meta file maybe copied from other source // its shard is different from local shard tablet_meta->set_shard_id(shard); + // load dir is called by clone, restore, storage migration + // should change tablet uid when tablet object changed + tablet_meta->set_tablet_uid(TabletUid::gen_uid()); std::string meta_binary; tablet_meta->serialize(&meta_binary); RETURN_NOT_OK_LOG(load_tablet_from_meta(store, tablet_id, schema_hash, meta_binary, true, force, restore, true), strings::Substitute("fail to load tablet. header_path=$0", header_path)); - return Status::OK(); } diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index f5548e8061..05ce5aaf84 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -349,26 +349,6 @@ Status TabletMeta::create_from_file(const string& file_path) { return Status::OK(); } -Status TabletMeta::reset_tablet_uid(const string& header_file) { - Status res = Status::OK(); - TabletMeta tmp_tablet_meta; - if ((res = tmp_tablet_meta.create_from_file(header_file)) != Status::OK()) { - LOG(WARNING) << "fail to load tablet meta from file" - << ", meta_file=" << header_file; - return res; - } - TabletMetaPB tmp_tablet_meta_pb; - tmp_tablet_meta.to_meta_pb(&tmp_tablet_meta_pb); - *(tmp_tablet_meta_pb.mutable_tablet_uid()) = TabletUid::gen_uid().to_proto(); - res = save(header_file, tmp_tablet_meta_pb); - if (!res.ok()) { - LOG(FATAL) << "fail to save tablet meta pb to " - << " meta_file=" << header_file; - return res; - } - return res; -} - std::string TabletMeta::construct_header_file_path(const string& schema_hash_path, int64_t tablet_id) { std::stringstream header_name_stream; diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index d2870a8dfe..3ac1b148dc 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -112,7 +112,6 @@ public: Status save(const std::string& file_path); Status save_as_json(const string& file_path, DataDir* dir); static Status save(const std::string& file_path, const TabletMetaPB& tablet_meta_pb); - static Status reset_tablet_uid(const std::string& file_path); static std::string construct_header_file_path(const std::string& schema_hash_path, int64_t tablet_id); Status save_meta(DataDir* data_dir); @@ -134,6 +133,7 @@ public: int64_t partition_id() const; int64_t tablet_id() const; int64_t replica_id() const; + void set_replica_id(int64_t replica_id) { _replica_id = replica_id; } int32_t schema_hash() const; int16_t shard_id() const; void set_shard_id(int32_t shard_id); diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 0c7d917a50..0cf7b1bcea 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -79,6 +79,14 @@ const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3; const uint32_t LIST_REMOTE_FILE_TIMEOUT = 15; const uint32_t GET_LENGTH_TIMEOUT = 10; +#define RETURN_IF_ERROR_(status, stmt) \ + do { \ + status = (stmt); \ + if (UNLIKELY(!status.ok())) { \ + return status; \ + } \ + } while (false) + EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo& master_info, int64_t signature, std::vector* tablet_infos) : _clone_req(clone_req), @@ -113,17 +121,16 @@ Status EngineCloneTask::_do_clone() { std::vector missed_versions; // try to repair a tablet with missing version if (tablet != nullptr) { - if (tablet->replica_id() != _clone_req.replica_id) { - // `tablet` may be a dropped replica in FE, e.g: BE1 migrates replica of tablet_1 to BE2, - // but before BE1 drop this replica, another new replica of tablet_1 is migrated to BE1. - // If we allow to clone success on dropped replica, replica id may never be consistent between FE and BE. - return Status::InternalError("replica_id not match({} vs {})", tablet->replica_id(), - _clone_req.replica_id); - } std::shared_lock migration_rlock(tablet->get_migration_lock(), std::try_to_lock); if (!migration_rlock.owns_lock()) { return Status::Error(); } + if (tablet->replica_id() < _clone_req.replica_id) { + // `tablet` may be a dropped replica in FE, e.g: + // BE1 migrates replica of tablet_1 to BE2, but before BE1 drop this replica, another new replica of tablet_1 is migrated to BE1. + // Clone can still continue in this case. But to keep `replica_id` consitent with FE, MUST reset `replica_id` with request `replica_id`. + tablet->tablet_meta()->set_replica_id(_clone_req.replica_id); + } // get download path auto local_data_path = fmt::format("{}/{}", tablet->tablet_path(), CLONE_PREFIX); @@ -179,21 +186,27 @@ Status EngineCloneTask::_do_clone() { }}; bool allow_incremental_clone = false; - status = _make_and_download_snapshots(*store, tablet_dir, &src_host, &src_file_path, - missed_versions, &allow_incremental_clone); - if (!status.ok()) { - return status; - } + RETURN_IF_ERROR_(status, + _make_and_download_snapshots(*store, tablet_dir, &src_host, &src_file_path, + missed_versions, &allow_incremental_clone)); LOG(INFO) << "clone copy done. src_host: " << src_host.host << " src_file_path: " << src_file_path; + auto tablet_manager = StorageEngine::instance()->tablet_manager(); + RETURN_IF_ERROR_(status, tablet_manager->load_tablet_from_dir(store, _clone_req.tablet_id, + _clone_req.schema_hash, + tablet_dir, false)); + auto tablet = tablet_manager->get_tablet(_clone_req.tablet_id); + if (!tablet) { + status = Status::NotFound("tablet not found, tablet_id={}", _clone_req.tablet_id); + return status; + } + // MUST reset `replica_id` to request `replica_id` to keep consistent with FE + tablet->tablet_meta()->set_replica_id(_clone_req.replica_id); + // clone success, delete .hdr file because tablet meta is stored in rocksdb string header_path = TabletMeta::construct_header_file_path(tablet_dir, _clone_req.tablet_id); - RETURN_IF_ERROR(TabletMeta::reset_tablet_uid(header_path)); - RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->load_tablet_from_dir( - store, _clone_req.tablet_id, _clone_req.schema_hash, tablet_dir, false)); - // clone success, delete .hdr file because tablet meta is stored in rocksdb - RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(header_path)); + io::global_local_filesystem()->delete_file(header_path); } return _set_tablet_info(is_new_tablet); } diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index 7bf512d407..262123866e 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -149,9 +149,6 @@ Status EngineStorageMigrationTask::_gen_and_write_header_to_hdr_file( std::string new_meta_file = full_path + "/" + std::to_string(tablet_id) + ".hdr"; RETURN_IF_ERROR(new_tablet_meta->save(new_meta_file)); - // reset tablet id and rowset id - RETURN_IF_ERROR(TabletMeta::reset_tablet_uid(new_meta_file)); - // it will change rowset id and its create time // rowset create time is useful when load tablet from meta to check which tablet is the tablet to load return SnapshotManager::instance()->convert_rowset_ids(full_path, tablet_id, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 52c7ca8786..4bf06c4e4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -26,6 +26,7 @@ import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TTablet; import org.apache.doris.thrift.TTabletInfo; +import org.apache.doris.thrift.TTabletMetaInfo; import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.PartitionCommitInfo; import org.apache.doris.transaction.TableCommitInfo; @@ -42,8 +43,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Table; import com.google.common.collect.TreeMultimap; -import org.apache.commons.lang3.tuple.ImmutableTriple; -import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -127,7 +126,7 @@ public class TabletInvertedIndex { Map> transactionsToPublish, ListMultimap transactionsToClear, ListMultimap tabletRecoveryMap, - List> tabletToInMemory, + List tabletToUpdate, List cooldownConfToPush, List cooldownConfToUpdate) { long stamp = readLock(); @@ -148,11 +147,18 @@ public class TabletInvertedIndex { Replica replica = entry.getValue(); tabletFoundInMeta.add(tabletId); TTabletInfo backendTabletInfo = backendTablet.getTabletInfos().get(0); + TTabletMetaInfo tabletMetaInfo = null; + if (backendTabletInfo.getReplicaId() != replica.getId() + && replica.getState() != ReplicaState.CLONE) { + // Need to update replica id in BE + tabletMetaInfo = new TTabletMetaInfo(); + tabletMetaInfo.setReplicaId(replica.getId()); + } if (partitionIdInMemorySet.contains( backendTabletInfo.getPartitionId()) != backendTabletInfo.isIsInMemory()) { - synchronized (tabletToInMemory) { - tabletToInMemory.add(new ImmutableTriple<>(tabletId, - backendTabletInfo.getSchemaHash(), !backendTabletInfo.isIsInMemory())); + if (tabletMetaInfo == null) { + tabletMetaInfo = new TTabletMetaInfo(); + tabletMetaInfo.setIsInMemory(!backendTabletInfo.isIsInMemory()); } } // 1. (intersection) @@ -300,6 +306,12 @@ public class TabletInvertedIndex { if (backendTabletInfo.isSetVersionCount()) { replica.setVersionCount(backendTabletInfo.getVersionCount()); } + if (tabletMetaInfo != null) { + tabletMetaInfo.setTabletId(tabletId); + synchronized (tabletToUpdate) { + tabletToUpdate.add(tabletMetaInfo); + } + } } else { // 2. (meta - be) // may need delete from meta @@ -318,10 +330,10 @@ public class TabletInvertedIndex { long end = System.currentTimeMillis(); LOG.info("finished to do tablet diff with backend[{}]. sync: {}." + " metaDel: {}. foundInMeta: {}. migration: {}. " - + "found invalid transactions {}. found republish transactions {}. tabletInMemorySync: {}." + + "found invalid transactions {}. found republish transactions {}. tabletToUpdate: {}." + " need recovery: {}. cost: {} ms", backendId, tabletSyncMap.size(), tabletDeleteFromMeta.size(), tabletFoundInMeta.size(), tabletMigrationMap.size(), - transactionsToClear.size(), transactionsToPublish.size(), tabletToInMemory.size(), + transactionsToClear.size(), transactionsToPublish.size(), tabletToUpdate.size(), tabletRecoveryMap.size(), (end - start)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index d6dd0070ff..646bed22a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -81,6 +81,7 @@ import org.apache.doris.thrift.TStorageResource; import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TTablet; import org.apache.doris.thrift.TTabletInfo; +import org.apache.doris.thrift.TTabletMetaInfo; import org.apache.doris.thrift.TTaskType; import com.google.common.base.Preconditions; @@ -91,7 +92,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Queues; import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -420,7 +420,7 @@ public class ReportHandler extends Daemon { // db id -> tablet id ListMultimap tabletRecoveryMap = LinkedListMultimap.create(); - List> tabletToInMemory = Lists.newArrayList(); + List tabletToUpdate = Lists.newArrayList(); List cooldownConfToPush = new LinkedList<>(); List cooldownConfToUpdate = new LinkedList<>(); @@ -434,7 +434,7 @@ public class ReportHandler extends Daemon { transactionsToPublish, transactionsToClear, tabletRecoveryMap, - tabletToInMemory, + tabletToUpdate, cooldownConfToPush, cooldownConfToUpdate); @@ -474,9 +474,9 @@ public class ReportHandler extends Daemon { handleRecoverTablet(tabletRecoveryMap, backendTablets, backendId); } - // 9. send set tablet in memory to be - if (!tabletToInMemory.isEmpty()) { - handleSetTabletInMemory(backendId, tabletToInMemory); + // 9. send tablet meta to be for updating + if (!tabletToUpdate.isEmpty()) { + handleUpdateTabletMeta(backendId, tabletToUpdate); } // handle cooldown conf @@ -1032,10 +1032,14 @@ public class ReportHandler extends Daemon { } } - private static void handleSetTabletInMemory(long backendId, List> tabletToInMemory) { + private static void handleUpdateTabletMeta(long backendId, List tabletToUpdate) { + final int updateBatchSize = 4096; AgentBatchTask batchTask = new AgentBatchTask(); - UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(backendId, tabletToInMemory); - batchTask.addTask(task); + for (int start = 0; start < tabletToUpdate.size(); start += updateBatchSize) { + UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(backendId, + tabletToUpdate.subList(start, Math.min(start + updateBatchSize, tabletToUpdate.size()))); + batchTask.addTask(task); + } AgentTaskExecutor.submit(batchTask); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java index a39197f5ad..2617e6dba6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java @@ -17,23 +17,19 @@ package org.apache.doris.task; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Pair; import org.apache.doris.common.Status; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TTabletMetaInfo; -import org.apache.doris.thrift.TTabletMetaType; import org.apache.doris.thrift.TTaskType; import org.apache.doris.thrift.TUpdateTabletMetaInfoReq; -import com.google.common.collect.Lists; -import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.Random; import java.util.Set; public class UpdateTabletMetaInfoTask extends AgentTask { @@ -45,36 +41,31 @@ public class UpdateTabletMetaInfoTask extends AgentTask { private Set> tableIdWithSchemaHash; private int inMemory = -1; // < 0 means not to update inMemory property, > 0 means true, == 0 means false - private TTabletMetaType metaType; private long storagePolicyId = -1; // < 0 means not to update storage policy, == 0 means to reset storage policy + // For ReportHandler + private List tabletMetaInfos; - // - private List> tabletToInMemory; - - public UpdateTabletMetaInfoTask(long backendId, Set> tableIdWithSchemaHash, - TTabletMetaType metaType) { + public UpdateTabletMetaInfoTask(long backendId, Set> tableIdWithSchemaHash) { super(null, backendId, TTaskType.UPDATE_TABLET_META_INFO, - -1L, -1L, -1L, -1L, -1L, tableIdWithSchemaHash.hashCode()); + -1L, -1L, -1L, -1L, -1L, Math.abs(new Random().nextLong())); this.tableIdWithSchemaHash = tableIdWithSchemaHash; - this.metaType = metaType; } public UpdateTabletMetaInfoTask(long backendId, Set> tableIdWithSchemaHash, int inMemory, long storagePolicyId, MarkedCountDownLatch>> latch) { - this(backendId, tableIdWithSchemaHash, TTabletMetaType.INMEMORY); + this(backendId, tableIdWithSchemaHash); this.storagePolicyId = storagePolicyId; this.inMemory = inMemory; this.latch = latch; } - public UpdateTabletMetaInfoTask(long backendId, - List> tabletToInMemory) { + public UpdateTabletMetaInfoTask(long backendId, List tabletMetaInfos) { + // For ReportHandler, never add to AgentTaskQueue, so signature is useless. super(null, backendId, TTaskType.UPDATE_TABLET_META_INFO, - -1L, -1L, -1L, -1L, -1L, tabletToInMemory.hashCode()); - this.metaType = TTabletMetaType.INMEMORY; - this.tabletToInMemory = tabletToInMemory; + -1L, -1L, -1L, -1L, -1L); + this.tabletMetaInfos = tabletMetaInfos; } public void countDownLatch(long backendId, Set> tablets) { @@ -100,64 +91,24 @@ public class UpdateTabletMetaInfoTask extends AgentTask { public TUpdateTabletMetaInfoReq toThrift() { TUpdateTabletMetaInfoReq updateTabletMetaInfoReq = new TUpdateTabletMetaInfoReq(); - List metaInfos = Lists.newArrayList(); - switch (metaType) { - case PARTITIONID: { - int tabletEntryNum = 0; - for (Pair pair : tableIdWithSchemaHash) { - // add at most 10000 tablet meta during one sync to avoid too large task - if (tabletEntryNum > 10000) { - break; - } - TTabletMetaInfo metaInfo = new TTabletMetaInfo(); - metaInfo.setTabletId(pair.first); - metaInfo.setSchemaHash(pair.second); - TabletMeta tabletMeta = Env.getCurrentEnv() - .getTabletInvertedIndex().getTabletMeta(pair.first); - if (tabletMeta == null) { - LOG.warn("could not find tablet [{}] in meta ignore it", pair.second); - continue; - } - metaInfo.setPartitionId(tabletMeta.getPartitionId()); - metaInfo.setMetaType(metaType); - metaInfos.add(metaInfo); - ++tabletEntryNum; + if (latch != null) { + // for schema change + for (Pair pair : tableIdWithSchemaHash) { + TTabletMetaInfo metaInfo = new TTabletMetaInfo(); + metaInfo.setTabletId(pair.first); + metaInfo.setSchemaHash(pair.second); + if (inMemory >= 0) { + metaInfo.setIsInMemory(inMemory > 0); } - break; - } - case INMEMORY: { - if (latch != null) { - // for schema change - for (Pair pair : tableIdWithSchemaHash) { - TTabletMetaInfo metaInfo = new TTabletMetaInfo(); - metaInfo.setTabletId(pair.first); - metaInfo.setSchemaHash(pair.second); - if (inMemory >= 0) { - metaInfo.setIsInMemory(inMemory > 0); - } - if (storagePolicyId >= 0) { - metaInfo.setStoragePolicyId(storagePolicyId); - } - metaInfo.setMetaType(metaType); - metaInfos.add(metaInfo); - } - } else { - // for ReportHandler - for (Triple triple : tabletToInMemory) { - TTabletMetaInfo metaInfo = new TTabletMetaInfo(); - metaInfo.setTabletId(triple.getLeft()); - metaInfo.setSchemaHash(triple.getMiddle()); - metaInfo.setIsInMemory(triple.getRight()); - metaInfo.setMetaType(metaType); - metaInfos.add(metaInfo); - } + if (storagePolicyId >= 0) { + metaInfo.setStoragePolicyId(storagePolicyId); } - break; + updateTabletMetaInfoReq.addToTabletMetaInfos(metaInfo); } - default: - break; + } else { + // for ReportHandler + updateTabletMetaInfoReq.setTabletMetaInfos(tabletMetaInfos); } - updateTabletMetaInfoReq.setTabletMetaInfos(metaInfos); return updateTabletMetaInfoReq; } } diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 91b2927705..e7a8b3ae27 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -362,10 +362,11 @@ struct TTabletMetaInfo { 1: optional Types.TTabletId tablet_id 2: optional Types.TSchemaHash schema_hash 3: optional Types.TPartitionId partition_id - 4: optional TTabletMetaType meta_type + // 4: optional TTabletMetaType Deprecated_meta_type 5: optional bool is_in_memory - // 6: optional string storage_policy; + // 6: optional string Deprecated_storage_policy 7: optional i64 storage_policy_id + 8: optional Types.TReplicaId replica_id } struct TUpdateTabletMetaInfoReq {