From e1f13863950237d96e8488dd42fa288cc6df94b7 Mon Sep 17 00:00:00 2001 From: plat1ko Date: Thu, 9 Feb 2023 09:12:55 +0800 Subject: [PATCH] [fix](cooldown) Rewrite update cooldown conf (#16488) Remove error-prone CooldownJob, and use CooldownConfHandler to update Tablet's cooldown conf. Some bug fix about cooldown. --- be/src/agent/task_worker_pool.cpp | 11 +- be/src/olap/olap_server.cpp | 2 +- be/src/olap/tablet.cpp | 16 +- be/src/olap/tablet.h | 7 +- .../java/org/apache/doris/common/Config.java | 14 +- .../doris/alter/MaterializedViewHandler.java | 2 +- .../org/apache/doris/alter/RollupJobV2.java | 2 +- .../doris/alter/SchemaChangeHandler.java | 2 +- .../apache/doris/alter/SchemaChangeJobV2.java | 2 +- .../org/apache/doris/backup/RestoreJob.java | 9 +- .../doris/catalog/CatalogRecycleBin.java | 6 +- .../java/org/apache/doris/catalog/Env.java | 30 +- .../java/org/apache/doris/catalog/Tablet.java | 32 +- .../doris/catalog/TabletInvertedIndex.java | 105 +++-- .../org/apache/doris/catalog/TabletMeta.java | 58 +-- .../apache/doris/cooldown/CooldownConf.java | 76 +--- .../doris/cooldown/CooldownConfHandler.java | 138 ++++++ .../doris/cooldown/CooldownConfList.java | 53 +++ .../doris/cooldown/CooldownException.java | 32 -- .../doris/cooldown/CooldownHandler.java | 192 --------- .../apache/doris/cooldown/CooldownJob.java | 401 ------------------ .../doris/datasource/InternalCatalog.java | 12 +- .../apache/doris/journal/JournalEntity.java | 6 +- .../apache/doris/master/ReportHandler.java | 31 +- .../org/apache/doris/persist/EditLog.java | 15 +- .../apache/doris/persist/OperationType.java | 4 +- .../doris/persist/meta/MetaPersistMethod.java | 6 - .../persist/meta/PersistMetaModules.java | 2 +- .../apache/doris/backup/CatalogMocker.java | 10 +- .../apache/doris/catalog/CatalogTestUtil.java | 4 +- .../org/apache/doris/catalog/TabletTest.java | 2 +- .../clone/ClusterLoadStatisticsTest.java | 6 +- .../doris/clone/RebalancerTestUtil.java | 2 +- .../doris/common/util/UnitTestUtil.java | 2 +- .../doris/cooldown/CooldownJobTest.java | 130 ------ .../apache/doris/http/DorisHttpTestCase.java | 2 +- .../apache/doris/load/DeleteHandlerTest.java | 2 +- gensrc/thrift/MasterService.thrift | 4 +- .../cold_heat_separation/policy/alter.groovy | 2 - .../cold_heat_separation/policy/create.groovy | 2 - .../cold_heat_separation/policy/drop.groovy | 2 - .../cold_heat_separation/policy/show.groovy | 2 - .../use_policy/alter_table_add_policy.groovy | 2 - .../create_table_use_partition_policy.groovy | 2 - .../use_policy/create_table_use_policy.groovy | 2 - .../modify_partition_add_policy.groovy | 2 - .../use_default_storage_policy.groovy | 2 - 47 files changed, 402 insertions(+), 1046 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfHandler.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfList.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java delete mode 100644 fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index a61ea17ba9..2c6b757c0f 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -268,7 +268,8 @@ void TaskWorkerPool::notify_thread() { } bool TaskWorkerPool::_register_task_info(const TTaskType::type task_type, int64_t signature) { - if (task_type == TTaskType::type::PUSH_STORAGE_POLICY) { + if (task_type == TTaskType::type::PUSH_STORAGE_POLICY || + task_type == TTaskType::type::PUSH_COOLDOWN_CONF) { // no need to report task of these types return true; } @@ -1716,14 +1717,6 @@ void TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() { agent_task_req = _tasks.front(); _tasks.pop_front(); } - // FIXME(plat1ko): no need to save cooldown conf job state in FE - TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(_backend); - finish_task_request.__set_task_type(agent_task_req.task_type); - finish_task_request.__set_signature(agent_task_req.signature); - finish_task_request.__set_task_status(Status::OK().to_thrift()); - _finish_task(finish_task_request); - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); TPushCooldownConfReq& push_cooldown_conf_req = agent_task_req.push_cooldown_conf; for (auto& cooldown_conf : push_cooldown_conf_req.cooldown_confs) { diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 98dca4ea1c..9764680ff9 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -729,7 +729,7 @@ void StorageEngine::_cooldown_tasks_producer_callback() { Status st = tablet->cooldown(); if (!st.ok()) { LOG(WARNING) << "failed to cooldown, tablet: " << tablet->tablet_id() - << " err: " << st.to_string(); + << " err: " << st; } else { LOG(INFO) << "succeed to cooldown, tablet: " << tablet->tablet_id() << " cooldown progress (" diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 009513748f..92b203b339 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1392,9 +1392,9 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info, tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory()); tablet_info->__set_replica_id(replica_id()); tablet_info->__set_remote_data_size(_tablet_meta->tablet_remote_size()); - tablet_info->__set_is_cooldown(_tablet_meta->storage_policy_id() > 0); - if (tablet_info->is_cooldown) { + if (tablet_state() == TABLET_RUNNING && _tablet_meta->storage_policy_id() > 0) { tablet_info->__set_cooldown_replica_id(_cooldown_replica_id); + tablet_info->__set_cooldown_term(_cooldown_term); } } @@ -1645,7 +1645,7 @@ Status Tablet::cooldown() { } int64_t cooldown_replica_id = _cooldown_replica_id; if (cooldown_replica_id <= 0) { // wait for FE to push cooldown conf - return Status::OK(); + return Status::InternalError("invalid cooldown_replica_id"); } auto storage_policy = get_storage_policy(storage_policy_id()); if (storage_policy == nullptr) { @@ -1768,6 +1768,8 @@ Status Tablet::_write_cooldown_meta(io::RemoteFileSystem* fs, RowsetMeta* new_rs } Status Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t cooldown_replica_id) { + LOG(INFO) << "try to follow cooldowned data. tablet_id=" << tablet_id() + << " cooldown_replica_id=" << cooldown_replica_id; TabletMetaPB cooldown_meta_pb; RETURN_IF_ERROR(_read_cooldown_meta(fs, cooldown_replica_id, &cooldown_meta_pb)); DCHECK(cooldown_meta_pb.rs_metas_size() > 0); @@ -1788,12 +1790,13 @@ Status Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t cooldow } } if (!version_aligned) { - LOG(INFO) << "cooldowned version is not aligned"; - return Status::OK(); + return Status::InternalError("cooldowned version is not aligned"); } for (auto& [v, rs] : _rs_version_map) { if (v.second <= cooldowned_version) { overlap_rowsets.push_back(rs); + } else if (!rs->is_local()) { + return Status::InternalError("cooldowned version larger than that to follow"); } } std::sort(overlap_rowsets.begin(), overlap_rowsets.end(), Rowset::comparator); @@ -1854,6 +1857,9 @@ RowsetSharedPtr Tablet::pick_cooldown_rowset() { } } } + if (!rowset) { + return nullptr; + } if (min_local_version != cooldowned_version + 1) { // ensure version continuity if (UNLIKELY(cooldowned_version != -1)) { LOG(WARNING) << "version not continuous. tablet_id=" << tablet_id() diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 934dd04298..c56808e8b5 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -305,9 +305,10 @@ public: void update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id) { if (cooldown_term > _cooldown_term) { - LOG(INFO) << "update cooldown conf. cooldown_replica_id: " << _cooldown_replica_id - << " -> " << cooldown_replica_id << ", cooldown_term: " << _cooldown_term - << " -> " << cooldown_term; + LOG(INFO) << "update cooldown conf. tablet_id=" << tablet_id() + << " cooldown_replica_id: " << _cooldown_replica_id << " -> " + << cooldown_replica_id << ", cooldown_term: " << _cooldown_term << " -> " + << cooldown_term; _cooldown_replica_id = cooldown_replica_id; _cooldown_term = cooldown_term; } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 6edbc5a160..ef325d357c 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -802,13 +802,6 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static int alter_table_timeout_second = 86400 * 30; // 1month - /** - * Maximal timeout of push cooldown conf request. - */ - @ConfField(mutable = false, masterOnly = true) - public static boolean cooldown_single_remote_file = false; - @ConfField(mutable = false, masterOnly = true) - public static int push_cooldown_conf_timeout_second = 600; // 10 min /** * If a backend is down for *max_backend_down_time_second*, a BACKEND_DOWN event will be triggered. * Do not set this if you know what you are doing. @@ -1932,11 +1925,10 @@ public class Config extends ConfigBase { public static int max_same_name_catalog_trash_num = 3; /** - * The storage policy is still under developement. - * Disable it by default. + * NOTE: The storage policy is still under developement. */ - @ConfField(mutable = true, masterOnly = true) - public static boolean enable_storage_policy = false; + @ConfField(mutable = false, masterOnly = true) + public static boolean enable_storage_policy = true; /** * This config is mainly used in the k8s cluster environment. diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index ef760bd0a6..851e4c7016 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -379,7 +379,7 @@ public class MaterializedViewHandler extends AlterHandler { short replicationNum = olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum(); for (Tablet baseTablet : baseIndex.getTablets()) { TabletMeta mvTabletMeta = new TabletMeta( - dbId, tableId, partitionId, mvIndexId, mvSchemaHash, medium, -1, 0); + dbId, tableId, partitionId, mvIndexId, mvSchemaHash, medium); long baseTabletId = baseTablet.getId(); long mvTabletId = idGeneratorBuffer.getNextId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index d83c9a6f70..b47052c601 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -658,7 +658,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { for (Tablet rollupTablet : rollupIndex.getTablets()) { TabletMeta rollupTabletMeta = new TabletMeta(dbId, tableId, partitionId, rollupIndexId, - rollupSchemaHash, medium, -1, 0); + rollupSchemaHash, medium); invertedIndex.addTablet(rollupTablet.getId(), rollupTabletMeta); for (Replica rollupReplica : rollupTablet.getReplicas()) { invertedIndex.addReplica(rollupTablet.getId(), rollupReplica); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index a330923582..29ff80ec79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1493,7 +1493,7 @@ public class SchemaChangeHandler extends AlterHandler { Short totalReplicaNum = replicaAlloc.getTotalReplicaNum(); for (Tablet originTablet : originIndex.getTablets()) { TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, partitionId, shadowIndexId, - newSchemaHash, medium, -1, 0); + newSchemaHash, medium); long originTabletId = originTablet.getId(); long shadowTabletId = idGeneratorBuffer.getNextId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 3731f7cb88..757898d0ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -746,7 +746,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { for (Tablet shadownTablet : shadowIndex.getTablets()) { TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, partitionId, shadowIndexId, - indexSchemaVersionAndHashMap.get(shadowIndexId).schemaHash, medium, -1, 0); + indexSchemaVersionAndHashMap.get(shadowIndexId).schemaHash, medium); invertedIndex.addTablet(shadownTablet.getId(), shadowTabletMeta); for (Replica shadowReplica : shadownTablet.getReplicas()) { invertedIndex.addReplica(shadownTablet.getId(), shadowReplica); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index c01aa2b685..8b894fc920 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -988,8 +988,7 @@ public class RestoreJob extends AbstractJob { MaterializedIndexMeta indexMeta = localTbl.getIndexMetaByIndexId(restoredIdx.getId()); for (Tablet restoreTablet : restoredIdx.getTablets()) { TabletMeta tabletMeta = new TabletMeta(db.getId(), localTbl.getId(), restorePart.getId(), - restoredIdx.getId(), indexMeta.getSchemaHash(), TStorageMedium.HDD, - restoreTablet.getCooldownReplicaId(), restoreTablet.getCooldownTerm()); + restoredIdx.getId(), indexMeta.getSchemaHash(), TStorageMedium.HDD); Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta); for (Replica restoreReplica : restoreTablet.getReplicas()) { Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica); @@ -1177,8 +1176,7 @@ public class RestoreJob extends AbstractJob { int schemaHash = localTbl.getSchemaHashByIndexId(restoreIdx.getId()); for (Tablet restoreTablet : restoreIdx.getTablets()) { TabletMeta tabletMeta = new TabletMeta(db.getId(), localTbl.getId(), restorePart.getId(), - restoreIdx.getId(), schemaHash, TStorageMedium.HDD, restoreTablet.getCooldownReplicaId(), - restoreTablet.getCooldownTerm()); + restoreIdx.getId(), schemaHash, TStorageMedium.HDD); Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta); for (Replica restoreReplica : restoreTablet.getReplicas()) { Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica); @@ -1210,8 +1208,7 @@ public class RestoreJob extends AbstractJob { int schemaHash = olapRestoreTbl.getSchemaHashByIndexId(restoreIdx.getId()); for (Tablet restoreTablet : restoreIdx.getTablets()) { TabletMeta tabletMeta = new TabletMeta(db.getId(), restoreTbl.getId(), restorePart.getId(), - restoreIdx.getId(), schemaHash, TStorageMedium.HDD, - restoreTablet.getCooldownReplicaId(), restoreTablet.getCooldownTerm()); + restoreIdx.getId(), schemaHash, TStorageMedium.HDD); Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta); for (Replica restoreReplica : restoreTablet.getReplicas()) { Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java index ec2588ae2e..dadb0f722e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java @@ -840,8 +840,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { long indexId = index.getId(); int schemaHash = olapTable.getSchemaHashByIndexId(indexId); for (Tablet tablet : index.getTablets()) { - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium, - tablet.getCooldownReplicaId(), tablet.getCooldownTerm()); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium); long tabletId = tablet.getId(); invertedIndex.addTablet(tabletId, tabletMeta); for (Replica replica : tablet.getReplicas()) { @@ -893,8 +892,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { long indexId = index.getId(); int schemaHash = olapTable.getSchemaHashByIndexId(indexId); for (Tablet tablet : index.getTablets()) { - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium, - tablet.getCooldownReplicaId(), tablet.getCooldownTerm()); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium); long tabletId = tablet.getId(); invertedIndex.addTablet(tabletId, tabletMeta); for (Replica replica : tablet.getReplicas()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 34f0dafdd2..15d38f61c6 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -122,7 +122,7 @@ import org.apache.doris.common.util.SmallFileMgr; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; import org.apache.doris.consistency.ConsistencyChecker; -import org.apache.doris.cooldown.CooldownHandler; +import org.apache.doris.cooldown.CooldownConfHandler; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.datasource.EsExternalCatalog; @@ -314,7 +314,7 @@ public class Env { private DeleteHandler deleteHandler; private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector; private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector; - private CooldownHandler cooldownHandler; + private CooldownConfHandler cooldownConfHandler; private MetastoreEventsProcessor metastoreEventsProcessor; private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos @@ -551,7 +551,9 @@ public class Env { this.deleteHandler = new DeleteHandler(); this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector(); this.partitionInMemoryInfoCollector = new PartitionInMemoryInfoCollector(); - this.cooldownHandler = new CooldownHandler(); + if (Config.enable_storage_policy) { + this.cooldownConfHandler = new CooldownConfHandler(); + } this.metastoreEventsProcessor = new MetastoreEventsProcessor(); this.replayedJournalId = new AtomicLong(0L); @@ -1398,8 +1400,8 @@ public class Env { dbUsedDataQuotaInfoCollector.start(); // start daemon thread to update global partition in memory information periodically partitionInMemoryInfoCollector.start(); - if (Config.cooldown_single_remote_file) { - cooldownHandler.start(); + if (Config.enable_storage_policy) { + cooldownConfHandler.start(); } streamLoadRecordMgr.start(); getInternalCatalog().getIcebergTableCreationRecordMgr().start(); @@ -1738,12 +1740,6 @@ public class Env { return checksum; } - public long loadCooldownJob(DataInputStream dis, long checksum) throws IOException { - cooldownHandler.readField(dis); - LOG.info("finished replay loadCooldownJob from image"); - return checksum; - } - public long loadAlterJob(DataInputStream dis, long checksum) throws IOException { long newChecksum = checksum; for (JobType type : JobType.values()) { @@ -2155,14 +2151,6 @@ public class Env { return checksum; } - /** - * Save CooldownJob. - */ - public long saveCooldownJob(CountingDataOutputStream out, long checksum) throws IOException { - Env.getCurrentEnv().getCooldownHandler().write(out); - return checksum; - } - public long saveMTMVJobManager(CountingDataOutputStream out, long checksum) throws IOException { if (Config.enable_mtmv_scheduler_framework) { Env.getCurrentEnv().getMTMVJobManager().write(out, checksum); @@ -3387,8 +3375,8 @@ public class Env { return (MaterializedViewHandler) this.alter.getMaterializedViewHandler(); } - public CooldownHandler getCooldownHandler() { - return cooldownHandler; + public CooldownConfHandler getCooldownConfHandler() { + return cooldownConfHandler; } public SystemHandler getClusterHandler() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 7f594afa7d..69964693c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -49,6 +49,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -90,10 +91,13 @@ public class Tablet extends MetaObject implements Writable { private long checkedVersionHash; @SerializedName(value = "isConsistent") private boolean isConsistent; + + // cooldown conf @SerializedName(value = "cooldownReplicaId") - private long cooldownReplicaId; + private long cooldownReplicaId = -1; @SerializedName(value = "cooldownTerm") - private long cooldownTerm; + private long cooldownTerm = -1; + private ReentrantReadWriteLock cooldownConfLock = new ReentrantReadWriteLock(); // last time that the tablet checker checks this tablet. // no need to persist @@ -143,20 +147,20 @@ public class Tablet extends MetaObject implements Writable { return isConsistent; } - public long getCooldownReplicaId() { - return cooldownReplicaId; - } - - public void setCooldownReplicaId(long cooldownReplicaId) { + public void setCooldownConf(long cooldownReplicaId, long cooldownTerm) { + cooldownConfLock.writeLock().lock(); this.cooldownReplicaId = cooldownReplicaId; - } - - public long getCooldownTerm() { - return cooldownTerm; - } - - public void setCooldownTerm(long cooldownTerm) { this.cooldownTerm = cooldownTerm; + cooldownConfLock.writeLock().unlock(); + } + + public Pair getCooldownConf() { + cooldownConfLock.readLock().lock(); + try { + return Pair.of(cooldownReplicaId, cooldownTerm); + } finally { + cooldownConfLock.readLock().unlock(); + } } private boolean deleteRedundantReplica(long backendId, long version) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 98fcf380f6..0ed99185ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -19,6 +19,8 @@ package org.apache.doris.catalog; import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.cooldown.CooldownConf; import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TTablet; @@ -46,7 +48,6 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -66,7 +67,7 @@ public class TabletInvertedIndex { public static final int NOT_EXIST_VALUE = -1; public static final TabletMeta NOT_EXIST_TABLET_META = new TabletMeta(NOT_EXIST_VALUE, NOT_EXIST_VALUE, - NOT_EXIST_VALUE, NOT_EXIST_VALUE, NOT_EXIST_VALUE, TStorageMedium.HDD, -1, -1); + NOT_EXIST_VALUE, NOT_EXIST_VALUE, NOT_EXIST_VALUE, TStorageMedium.HDD); private StampedLock lock = new StampedLock(); @@ -126,7 +127,8 @@ public class TabletInvertedIndex { ListMultimap transactionsToClear, ListMultimap tabletRecoveryMap, List> tabletToInMemory, - Map syncCooldownTabletMap) { + List cooldownConfToPush, + List cooldownConfToUpdate) { long stamp = readLock(); long start = System.currentTimeMillis(); try { @@ -188,9 +190,9 @@ public class TabletInvertedIndex { } } - if (Config.cooldown_single_remote_file - && needChangeCooldownConf(tabletMeta, backendTabletInfo)) { - syncCooldownTabletMap.put(backendTabletInfo.getTabletId(), tabletMeta); + if (Config.enable_storage_policy) { + handleCooldownConf(tabletMeta, backendTabletInfo, cooldownConfToPush, + cooldownConfToUpdate); } long partitionId = tabletMeta.getPartitionId(); @@ -335,48 +337,81 @@ public class TabletInvertedIndex { return false; } - private boolean needChangeCooldownConf(TabletMeta tabletMeta, TTabletInfo beTabletInfo) { - if (!beTabletInfo.isIsCooldown()) { - return false; + private void handleCooldownConf(TabletMeta tabletMeta, TTabletInfo beTabletInfo, + List cooldownConfToPush, List cooldownConfToUpdate) { + if (!beTabletInfo.isSetCooldownReplicaId()) { + return; } - // check cooldown type in fe and be, they need to be the same. - if (tabletMeta.getCooldownReplicaId() != beTabletInfo.getCooldownReplicaId()) { - LOG.warn("cooldownReplicaId is wrong for tablet: {}, Fe: {}, Be: {}", beTabletInfo.getTabletId(), - tabletMeta.getCooldownReplicaId(), beTabletInfo.getCooldownReplicaId()); - return true; - } - // check cooldown type in one tablet, One UPLOAD_DATA is needed in the replicas. - long stamp = readLock(); + Tablet tablet; try { - boolean replicaInvalid = true; - Map replicaMap = replicaMetaTable.row(beTabletInfo.getTabletId()); - for (Map.Entry entry : replicaMap.entrySet()) { - if (entry.getValue().getId() == beTabletInfo.getCooldownReplicaId()) { - replicaInvalid = false; - break; - } + OlapTable table = (OlapTable) Env.getCurrentInternalCatalog().getDbNullable(tabletMeta.getDbId()) + .getTable(tabletMeta.getTableId()) + .get(); + table.readLock(); + try { + tablet = table.getPartition(tabletMeta.getPartitionId()).getIndex(tabletMeta.getIndexId()) + .getTablet(beTabletInfo.tablet_id); + } finally { + table.readUnlock(); } - if (replicaInvalid) { - return true; - } - } finally { - readUnlock(stamp); + } catch (RuntimeException e) { + LOG.warn("failed to get tablet. tabletId={}", beTabletInfo.tablet_id); + return; + } + Pair cooldownConf = tablet.getCooldownConf(); + if (beTabletInfo.getCooldownTerm() > cooldownConf.second) { // should not be here + LOG.warn("report cooldownTerm({}) > cooldownTerm in TabletMeta({}), tabletId={}", + beTabletInfo.getCooldownTerm(), cooldownConf.second, beTabletInfo.tablet_id); + return; + } + + if (cooldownConf.first <= 0) { // invalid cooldownReplicaId + CooldownConf conf = new CooldownConf(tabletMeta.getDbId(), tabletMeta.getTableId(), + tabletMeta.getPartitionId(), tabletMeta.getIndexId(), beTabletInfo.tablet_id, cooldownConf.second); + synchronized (cooldownConfToUpdate) { + cooldownConfToUpdate.add(conf); + } + return; + } + + // validate replica is active + Map replicaMap = replicaMetaTable.row(beTabletInfo.getTabletId()); + if (replicaMap.isEmpty()) { + return; + } + boolean replicaInvalid = true; + for (Replica replica : replicaMap.values()) { + if (replica.getId() == cooldownConf.first) { + replicaInvalid = false; + break; + } + } + if (replicaInvalid) { + CooldownConf conf = new CooldownConf(tabletMeta.getDbId(), tabletMeta.getTableId(), + tabletMeta.getPartitionId(), tabletMeta.getIndexId(), beTabletInfo.tablet_id, cooldownConf.second); + synchronized (cooldownConfToUpdate) { + cooldownConfToUpdate.add(conf); + } + return; + } + + if (cooldownConf.first != beTabletInfo.getCooldownReplicaId()) { + CooldownConf conf = new CooldownConf(beTabletInfo.tablet_id, cooldownConf.first, cooldownConf.second); + synchronized (cooldownConfToPush) { + cooldownConfToPush.add(conf); + } + return; } - return false; } public List getReplicas(Long tabletId) { - List replicas = new LinkedList<>(); long stamp = readLock(); try { Map replicaMap = replicaMetaTable.row(tabletId); - for (Map.Entry entry : replicaMap.entrySet()) { - replicas.add(entry.getValue()); - } + return replicaMap.values().stream().collect(Collectors.toList()); } finally { readUnlock(stamp); } - return replicas; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java index bfaaba3eb2..be291d93ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java @@ -22,8 +22,6 @@ import org.apache.doris.thrift.TStorageMedium; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.concurrent.locks.ReentrantReadWriteLock; - public class TabletMeta { private static final Logger LOG = LogManager.getLogger(TabletMeta.class); @@ -37,14 +35,8 @@ public class TabletMeta { private TStorageMedium storageMedium; - private long cooldownReplicaId; - - private long cooldownTerm; - - private ReentrantReadWriteLock lock; - public TabletMeta(long dbId, long tableId, long partitionId, long indexId, int schemaHash, - TStorageMedium storageMedium, long cooldownReplicaId, long cooldownTerm) { + TStorageMedium storageMedium) { this.dbId = dbId; this.tableId = tableId; this.partitionId = partitionId; @@ -54,10 +46,6 @@ public class TabletMeta { this.newSchemaHash = -1; this.storageMedium = storageMedium; - this.cooldownReplicaId = cooldownReplicaId; - this.cooldownTerm = cooldownTerm; - - lock = new ReentrantReadWriteLock(); } public long getDbId() { @@ -84,46 +72,20 @@ public class TabletMeta { this.storageMedium = storageMedium; } - public long getCooldownReplicaId() { - return cooldownReplicaId; - } - - public void setCooldownReplicaId(long cooldownReplicaId) { - this.cooldownReplicaId = cooldownReplicaId; - } - - public long getCooldownTerm() { - return cooldownTerm; - } - - public void setCooldownTerm(long cooldownTerm) { - this.cooldownTerm = cooldownTerm; - } - public int getOldSchemaHash() { - lock.readLock().lock(); - try { - return this.oldSchemaHash; - } finally { - lock.readLock().unlock(); - } + return this.oldSchemaHash; } @Override public String toString() { - lock.readLock().lock(); - try { - StringBuilder sb = new StringBuilder(); - sb.append("dbId=").append(dbId); - sb.append(" tableId=").append(tableId); - sb.append(" partitionId=").append(partitionId); - sb.append(" indexId=").append(indexId); - sb.append(" oldSchemaHash=").append(oldSchemaHash); - sb.append(" newSchemaHash=").append(newSchemaHash); + StringBuilder sb = new StringBuilder(); + sb.append("dbId=").append(dbId); + sb.append(" tableId=").append(tableId); + sb.append(" partitionId=").append(partitionId); + sb.append(" indexId=").append(indexId); + sb.append(" oldSchemaHash=").append(oldSchemaHash); + sb.append(" newSchemaHash=").append(newSchemaHash); - return sb.toString(); - } finally { - lock.readLock().unlock(); - } + return sb.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java index 9275a2633c..b0dcec8732 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java @@ -22,8 +22,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; import com.google.gson.annotations.SerializedName; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import lombok.Data; import java.io.DataInput; import java.io.DataOutput; @@ -32,9 +31,8 @@ import java.io.IOException; /** * This class represents the olap replica related metadata. */ +@Data public class CooldownConf implements Writable { - private static final Logger LOG = LogManager.getLogger(CooldownConf.class); - @SerializedName(value = "dbId") protected long dbId; @SerializedName(value = "tableId") @@ -46,74 +44,27 @@ public class CooldownConf implements Writable { @SerializedName(value = "tabletId") protected long tabletId; @SerializedName(value = "cooldownReplicaId") - protected long cooldownReplicaId; + protected long cooldownReplicaId = -1; @SerializedName(value = "cooldownTerm") - protected long cooldownTerm; + protected long cooldownTerm = -1; - public CooldownConf(long dbId, long tableId, long partitionId, long indexId, long tabletId, long cooldownReplicaId, - long cooldownTerm) { + public CooldownConf() { + } + + // for update + public CooldownConf(long dbId, long tableId, long partitionId, long indexId, long tabletId, long cooldownTerm) { this.dbId = dbId; this.tableId = tableId; this.partitionId = partitionId; this.indexId = indexId; this.tabletId = tabletId; - this.cooldownReplicaId = cooldownReplicaId; this.cooldownTerm = cooldownTerm; } - public long getDbId() { - return dbId; - } - - public void setDbId(long dbId) { - this.dbId = dbId; - } - - public long getTableId() { - return tableId; - } - - public void setTableId(long tableId) { - this.tableId = tableId; - } - - public long getPartitionId() { - return partitionId; - } - - public void setPartitionId(long partitionId) { - this.partitionId = partitionId; - } - - public long getIndexId() { - return indexId; - } - - public void setIndexId(long indexId) { - this.indexId = indexId; - } - - public long getTabletId() { - return tabletId; - } - - public void setTabletId(long tabletId) { + // for push + public CooldownConf(long tabletId, long cooldownReplicaId, long cooldownTerm) { this.tabletId = tabletId; - } - - public long getCooldownReplicaId() { - return cooldownReplicaId; - } - - public void setCooldownReplicaId(long cooldownReplicaId) { this.cooldownReplicaId = cooldownReplicaId; - } - - public long getCooldownTerm() { - return cooldownTerm; - } - - public void setCooldownTerm(long cooldownTerm) { this.cooldownTerm = cooldownTerm; } @@ -123,8 +74,9 @@ public class CooldownConf implements Writable { Text.writeString(out, json); } - public static CooldownJob read(DataInput in) throws IOException { + public static CooldownConf read(DataInput in) throws IOException { String json = Text.readString(in); - return GsonUtils.GSON.fromJson(json, CooldownJob.class); + return GsonUtils.GSON.fromJson(json, CooldownConf.class); } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfHandler.java b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfHandler.java new file mode 100644 index 0000000000..f840a304fc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfHandler.java @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cooldown; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.common.util.MasterDaemon; + +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.stream.Collectors; + +public class CooldownConfHandler extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(CooldownConfHandler.class); + + // TODO(plat1ko): better to use `Condition`? + private static final long INTERVAL_MS = 5000; // 5s + private static final int UPDATE_BATCH_SIZE = 512; + + private final Map cooldownConfToUpdate = Maps.newConcurrentMap(); + + public CooldownConfHandler() { + super("CooldownConfHandler", INTERVAL_MS); + } + + public void addCooldownConfToUpdate(List cooldownConfs) { + cooldownConfs.forEach(conf -> cooldownConfToUpdate.put(conf.getTabletId(), conf)); + } + + @Override + protected void runAfterCatalogReady() { + if (cooldownConfToUpdate.isEmpty()) { + return; + } + List cooldownConfList = cooldownConfToUpdate.values().stream().collect(Collectors.toList()); + for (int start = 0; start < cooldownConfList.size(); start += UPDATE_BATCH_SIZE) { + updateCooldownConf( + cooldownConfList.subList(start, Math.min(start + UPDATE_BATCH_SIZE, cooldownConfList.size()))); + } + } + + private void updateCooldownConf(List confToUpdate) { + ArrayList updatedConf = new ArrayList<>(); + updatedConf.ensureCapacity(confToUpdate.size()); + + Map tabletMap = new HashMap<>(); // cache tablet + + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + for (CooldownConf conf : confToUpdate) { + // choose cooldown replica + List replicas = invertedIndex.getReplicas(conf.getTabletId()); + if (replicas.isEmpty()) { + continue; + } + Random rand = new Random(System.currentTimeMillis()); + int index = rand.nextInt(replicas.size()); + conf.setCooldownReplicaId(replicas.get(index).getId()); + // find TabletMeta to get cooldown term + Tablet tablet = getTablet(conf); + if (tablet == null || tablet.getCooldownConf().second != conf.cooldownTerm) { + // If tablet.cooldownTerm != conf.cooldownTerm, means cooldown conf of this tablet has been updated, + // should skip this update. + continue; + } + ++conf.cooldownTerm; + updatedConf.add(conf); + tabletMap.put(conf.tabletId, tablet); + } + + // write editlog + CooldownConfList list = new CooldownConfList(updatedConf); + Env.getCurrentEnv().getEditLog().logUpdateCooldownConf(list); + + // update Tablet + for (CooldownConf conf : updatedConf) { + Tablet tablet = tabletMap.get(conf.tabletId); + tablet.setCooldownConf(conf.cooldownReplicaId, conf.cooldownTerm); + LOG.info("update cooldown conf. tabletId={} cooldownReplicaId={} cooldownTerm={}", conf.tabletId, + conf.cooldownReplicaId, conf.cooldownTerm); + } + + // update finish, remove from map + confToUpdate.forEach(conf -> cooldownConfToUpdate.remove(conf.getTabletId())); + + // TODO(plat1ko): push CooldownConf to BE? + } + + private static Tablet getTablet(CooldownConf conf) { + try { + OlapTable table = (OlapTable) Env.getCurrentInternalCatalog().getDbNullable(conf.dbId) + .getTable(conf.tableId) + .get(); + table.readLock(); + try { + return table.getPartition(conf.partitionId).getIndex(conf.indexId).getTablet(conf.tabletId); + } finally { + table.readUnlock(); + } + } catch (RuntimeException e) { + LOG.warn("failed to get tablet. tabletId={}", conf.tabletId); + return null; + } + } + + public static void replayUpdateCooldownConf(CooldownConfList cooldownConfList) { + cooldownConfList.getCooldownConf().forEach(conf -> { + Tablet tablet = getTablet(conf); + if (tablet != null) { + tablet.setCooldownConf(conf.cooldownReplicaId, conf.cooldownTerm); + } + }); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfList.java b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfList.java new file mode 100644 index 0000000000..06bca7a42b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfList.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cooldown; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +public class CooldownConfList implements Writable { + @SerializedName(value = "cooldownConf") + private List cooldownConf; + + CooldownConfList(List cooldownConf) { + this.cooldownConf = cooldownConf; + } + + List getCooldownConf() { + return cooldownConf; + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static CooldownConfList read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, CooldownConfList.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java deleted file mode 100644 index c0ec6bd5e7..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java +++ /dev/null @@ -1,32 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.cooldown; - -import org.apache.doris.common.DdlException; - -/* - * This exception will be thrown when the cooldown job(v2) running failed. - */ -public class CooldownException extends DdlException { - - private static final long serialVersionUID = 4844951783432954268L; - - public CooldownException(String msg) { - super(msg); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java deleted file mode 100644 index 4ad601e5f4..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java +++ /dev/null @@ -1,192 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.cooldown; - -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.Replica; -import org.apache.doris.catalog.TabletMeta; -import org.apache.doris.common.Config; -import org.apache.doris.common.FeConstants; -import org.apache.doris.common.FeMetaVersion; -import org.apache.doris.common.ThreadPoolManager; -import org.apache.doris.common.util.MasterDaemon; - -import com.google.common.collect.Maps; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ThreadPoolExecutor; - -public class CooldownHandler extends MasterDaemon { - private static final Logger LOG = LogManager.getLogger(CooldownHandler.class); - private static final int MAX_ACTIVE_COOLDOWN_JOB_SIZE = 10; - - private static final int MAX_RUNABLE_COOLDOWN_JOB_SIZE = 100; - - private static final int MAX_TABLET_PER_JOB = 100; - - private static final long timeoutMs = 1000L * Config.push_cooldown_conf_timeout_second; - - // jobId -> CooldownJob, it is used to hold CooldownJob which is used to sent conf to be. - private final Map runableCooldownJobs = Maps.newConcurrentMap(); - private final Map resetingTablet = Maps.newConcurrentMap(); - // jobId -> CooldownJob, - public final Map activeCooldownJobs = Maps.newConcurrentMap(); - - public final ThreadPoolExecutor cooldownThreadPool = ThreadPoolManager.newDaemonCacheThreadPool( - MAX_ACTIVE_COOLDOWN_JOB_SIZE, "cooldown-pool", true); - - // syncCooldownTabletMap: tabletId -> TabletMeta - public void handleCooldownConf(Map syncCooldownTabletMap) { - List cooldownConfList = new LinkedList<>(); - for (Map.Entry entry : syncCooldownTabletMap.entrySet()) { - if (runableCooldownJobs.size() >= MAX_RUNABLE_COOLDOWN_JOB_SIZE) { - return; - } - Long tabletId = entry.getKey(); - TabletMeta tabletMeta = entry.getValue(); - if (resetingTablet.containsKey(tabletId)) { - continue; - } - long cooldownReplicaId = -1; - List replicas = Env.getCurrentInvertedIndex().getReplicas(tabletId); - if (replicas.size() == 0) { - continue; - } - for (Replica replica : replicas) { - if (tabletMeta.getCooldownReplicaId() == replica.getId()) { - cooldownReplicaId = tabletMeta.getCooldownReplicaId(); - break; - } - } - long cooldownTerm = tabletMeta.getCooldownTerm(); - if (cooldownReplicaId == -1) { - Random rand = new Random(System.currentTimeMillis()); - int index = rand.nextInt(replicas.size()); - cooldownReplicaId = replicas.get(index).getId(); - ++cooldownTerm; - } - CooldownConf cooldownConf = new CooldownConf(tabletMeta.getDbId(), tabletMeta.getTableId(), - tabletMeta.getPartitionId(), tabletMeta.getIndexId(), tabletId, cooldownReplicaId, cooldownTerm); - cooldownConfList.add(cooldownConf); - if (cooldownConfList.size() >= MAX_TABLET_PER_JOB) { - long jobId = Env.getCurrentEnv().getNextId(); - CooldownJob cooldownJob = new CooldownJob(jobId, cooldownConfList, timeoutMs); - runableCooldownJobs.put(jobId, cooldownJob); - for (CooldownConf conf : cooldownConfList) { - resetingTablet.put(conf.getTabletId(), true); - } - cooldownConfList = new LinkedList<>(); - } - } - if (cooldownConfList.size() > 0) { - long jobId = Env.getCurrentEnv().getNextId(); - CooldownJob cooldownJob = new CooldownJob(jobId, cooldownConfList, timeoutMs); - runableCooldownJobs.put(jobId, cooldownJob); - for (CooldownConf conf : cooldownConfList) { - resetingTablet.put(conf.getTabletId(), true); - } - } - } - - @Override - protected void runAfterCatalogReady() { - clearFinishedOrCancelledCooldownJob(); - runableCooldownJobs.values().forEach(cooldownJob -> { - if (!cooldownJob.isDone() && !activeCooldownJobs.containsKey(cooldownJob.getJobId()) - && activeCooldownJobs.size() < MAX_ACTIVE_COOLDOWN_JOB_SIZE) { - if (FeConstants.runningUnitTest) { - cooldownJob.run(); - } else { - cooldownThreadPool.submit(() -> { - if (activeCooldownJobs.putIfAbsent(cooldownJob.getJobId(), cooldownJob) == null) { - try { - cooldownJob.run(); - } finally { - activeCooldownJobs.remove(cooldownJob.getJobId()); - } - } - }); - } - } - }); - } - - public void write(DataOutput out) throws IOException { - if (Config.cooldown_single_remote_file) { - out.writeInt(runableCooldownJobs.size()); - for (CooldownJob cooldownJob : runableCooldownJobs.values()) { - cooldownJob.write(out); - } - } - } - - public void readField(DataInput in) throws IOException { - if (Config.cooldown_single_remote_file) { - if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_115) { - int size = in.readInt(); - for (int i = 0; i < size; i++) { - CooldownJob cooldownJob = CooldownJob.read(in); - replayCooldownJob(cooldownJob); - } - } - } - } - - public void replayCooldownJob(CooldownJob cooldownJob) { - CooldownJob replayCooldownJob; - if (!runableCooldownJobs.containsKey(cooldownJob.getJobId())) { - replayCooldownJob = new CooldownJob(cooldownJob.jobId, cooldownJob.getCooldownConfList(), - cooldownJob.timeoutMs); - runableCooldownJobs.put(cooldownJob.getJobId(), replayCooldownJob); - for (CooldownConf cooldownConf : cooldownJob.getCooldownConfList()) { - resetingTablet.put(cooldownConf.getTabletId(), true); - } - } else { - replayCooldownJob = runableCooldownJobs.get(cooldownJob.getJobId()); - } - replayCooldownJob.replay(cooldownJob); - if (replayCooldownJob.isDone()) { - runableCooldownJobs.remove(cooldownJob.getJobId()); - for (CooldownConf cooldownConf : cooldownJob.getCooldownConfList()) { - resetingTablet.remove(cooldownConf.getTabletId()); - } - } - } - - private void clearFinishedOrCancelledCooldownJob() { - Iterator> iterator = runableCooldownJobs.entrySet().iterator(); - while (iterator.hasNext()) { - CooldownJob cooldownJob = iterator.next().getValue(); - if (cooldownJob.isDone()) { - iterator.remove(); - for (CooldownConf cooldownConf : cooldownJob.getCooldownConfList()) { - resetingTablet.remove(cooldownConf.getTabletId()); - } - } - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java deleted file mode 100644 index 99643b874a..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java +++ /dev/null @@ -1,401 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.cooldown; - -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.MaterializedIndex; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.Replica; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.Tablet; -import org.apache.doris.common.FeConstants; -import org.apache.doris.common.FeMetaVersion; -import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.io.Text; -import org.apache.doris.common.io.Writable; -import org.apache.doris.persist.gson.GsonUtils; -import org.apache.doris.task.AgentBatchTask; -import org.apache.doris.task.AgentTask; -import org.apache.doris.task.AgentTaskExecutor; -import org.apache.doris.task.AgentTaskQueue; -import org.apache.doris.task.PushCooldownConfTask; -import org.apache.doris.thrift.TTaskType; - -import com.google.common.base.Preconditions; -import com.google.gson.annotations.SerializedName; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -public class CooldownJob implements Writable { - private static final Logger LOG = LogManager.getLogger(CooldownJob.class); - - public enum JobState { - PENDING, // Job is created - SEND_CONF, // send cooldown task to BE. - RUNNING, // cooldown tasks are sent to BE, and waiting for them finished. - FINISHED, // job is done - CANCELLED; // job is cancelled(failed or be cancelled by user) - - public boolean isFinalState() { - return this == CooldownJob.JobState.FINISHED || this == CooldownJob.JobState.CANCELLED; - } - } - - @SerializedName(value = "jobId") - protected long jobId; - @SerializedName(value = "jobState") - protected CooldownJob.JobState jobState; - @SerializedName(value = "cooldownConfList") - protected List cooldownConfList; - - @SerializedName(value = "errMsg") - protected String errMsg = ""; - @SerializedName(value = "createTimeMs") - protected long createTimeMs = -1; - @SerializedName(value = "finishedTimeMs") - protected long finishedTimeMs = -1; - @SerializedName(value = "timeoutMs") - protected long timeoutMs = -1; - - public long getJobId() { - return jobId; - } - - public JobState getJobState() { - return jobState; - } - - public List getCooldownConfList() { - return cooldownConfList; - } - - public long getTimeoutMs() { - return timeoutMs; - } - - private AgentBatchTask cooldownBatchTask = new AgentBatchTask(); - - public CooldownJob(long jobId, List cooldownConfList, long timeoutMs) { - this.jobId = jobId; - this.jobState = JobState.PENDING; - this.cooldownConfList = cooldownConfList; - this.createTimeMs = System.currentTimeMillis(); - this.timeoutMs = timeoutMs; - } - - protected void runPendingJob() throws CooldownException { - Preconditions.checkState(jobState == CooldownJob.JobState.PENDING, jobState); - this.jobState = JobState.SEND_CONF; - // write edit log - for (CooldownConf cooldownConf : cooldownConfList) { - setCooldownReplica(cooldownConf.getDbId(), cooldownConf.getTableId(), cooldownConf.getPartitionId(), - cooldownConf.getIndexId(), cooldownConf.getTabletId(), cooldownConf.getCooldownReplicaId(), - cooldownConf.getCooldownTerm()); - Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownReplicaId( - cooldownConf.getCooldownReplicaId()); - Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownTerm( - cooldownConf.getCooldownTerm()); - } - Env.getCurrentEnv().getEditLog().logCooldownJob(this); - LOG.info("send cooldown job {} state to {}", jobId, this.jobState); - } - - protected void runSendJob() throws CooldownException { - Preconditions.checkState(jobState == JobState.SEND_CONF, jobState); - LOG.info("begin to send cooldown conf tasks. job: {}", jobId); - if (!FeConstants.runningUnitTest) { - Map> cooldownMap = new HashMap<>(); - for (CooldownConf cooldownConf : cooldownConfList) { - Database db = Env.getCurrentInternalCatalog() - .getDbOrException(cooldownConf.getDbId(), s -> new CooldownException( - "Database " + s + " does not exist")); - OlapTable tbl; - try { - tbl = (OlapTable) db.getTableOrMetaException(cooldownConf.getTableId(), TableIf.TableType.OLAP); - } catch (MetaNotFoundException e) { - throw new CooldownException(e.getMessage()); - } - if (tbl == null) { - throw new CooldownException(String.format("No table: %d", cooldownConf.getTableId())); - } - tbl.readLock(); - try { - Partition partition = tbl.getPartition(cooldownConf.getPartitionId()); - if (partition == null) { - throw new CooldownException(String.format("No partition: %d", cooldownConf.getPartitionId())); - } - MaterializedIndex index = partition.getIndex(cooldownConf.getIndexId()); - if (index == null) { - throw new CooldownException(String.format("No index: %d", cooldownConf.getIndexId())); - } - Tablet tablet = index.getTablet(cooldownConf.getTabletId()); - if (tablet == null) { - throw new CooldownException(String.format("No tablet: %d", cooldownConf.getTabletId())); - } - for (Replica replica : tablet.getReplicas()) { - if (!cooldownMap.containsKey(replica.getBackendId())) { - cooldownMap.put(replica.getBackendId(), new LinkedList<>()); - } - cooldownMap.get(replica.getBackendId()).add(cooldownConf); - } - } finally { - tbl.readUnlock(); - } - } - for (Map.Entry> entry : cooldownMap.entrySet()) { - PushCooldownConfTask pushCooldownConfTask = new PushCooldownConfTask(entry.getKey(), entry.getValue()); - cooldownBatchTask.addTask(pushCooldownConfTask); - } - AgentTaskQueue.addBatchTask(cooldownBatchTask); - AgentTaskExecutor.submit(cooldownBatchTask); - } - - this.jobState = JobState.RUNNING; - // write edit log - Env.getCurrentEnv().getEditLog().logCooldownJob(this); - LOG.info("send cooldown job {} state to {}", jobId, this.jobState); - } - - protected void runRunningJob() throws CooldownException { - if (!cooldownBatchTask.isFinished()) { - LOG.info("cooldown tasks not finished. job: {}", jobId); - List tasks = cooldownBatchTask.getUnfinishedTasks(2000); - for (AgentTask task : tasks) { - if (task.getFailedTimes() >= 3) { - task.setFinished(true); - AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUSH_COOLDOWN_CONF, task.getSignature()); - LOG.warn("push cooldown conf task failed after try three times: " + task.getErrorMsg()); - throw new CooldownException("cooldown tasks failed on backend: " + task.getBackendId()); - } - } - return; - } - this.jobState = CooldownJob.JobState.FINISHED; - this.finishedTimeMs = System.currentTimeMillis(); - - Env.getCurrentEnv().getEditLog().logCooldownJob(this); - LOG.info("push cooldown conf job finished: {}", jobId); - } - - public boolean isTimeout() { - return System.currentTimeMillis() - createTimeMs > timeoutMs; - } - - public boolean isDone() { - return jobState.isFinalState(); - } - - /* - * cancelImpl() can be called any time any place. - * We need to clean any possible residual of this job. - */ - protected synchronized void cancelImpl(String errMsg) { - if (jobState.isFinalState()) { - return; - } - - cancelInternal(); - - this.errMsg = errMsg; - this.finishedTimeMs = System.currentTimeMillis(); - LOG.info("cancel cooldown job {}, err: {}", jobId, errMsg); - Env.getCurrentEnv().getEditLog().logCooldownJob(this); - } - - /** - * The keyword 'synchronized' only protects 2 methods: - * run() and cancel() - * Only these 2 methods can be visited by different thread(internal working thread and user connection thread) - * So using 'synchronized' to make sure only one thread can run the job at one time. - * - * lock order: - * synchronized - * db lock - */ - public synchronized void run() { - if (isTimeout()) { - cancelImpl("Timeout"); - return; - } - - try { - switch (jobState) { - case PENDING: - runPendingJob(); - break; - case SEND_CONF: - runSendJob(); - break; - case RUNNING: - runRunningJob(); - break; - default: - break; - } - } catch (CooldownException e) { - cancelImpl(e.getMessage()); - } - } - - public void replay(CooldownJob replayedJob) { - try { - switch (replayedJob.jobState) { - case PENDING: - replayCreateJob(replayedJob); - break; - case SEND_CONF: - replayPendingJob(); - break; - case FINISHED: - replayRunningJob(replayedJob); - break; - case CANCELLED: - replayCancelled(replayedJob); - break; - default: - break; - } - } catch (CooldownException e) { - LOG.warn("[INCONSISTENT META] replay cooldown job failed {}", replayedJob.jobId, e); - } - } - - @Override - public void write(DataOutput out) throws IOException { - String json = GsonUtils.GSON.toJson(this); - Text.writeString(out, json); - } - - public static CooldownJob read(DataInput in) throws IOException { - if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_115) { - String json = Text.readString(in); - return GsonUtils.GSON.fromJson(json, CooldownJob.class); - } - return null; - } - - /** - * Replay job in PENDING state. - * Should replay all changes before this job's state transfer to PENDING. - * These changes should be same as changes in CooldownHandler.createJob() - */ - private void replayCreateJob(CooldownJob replayedJob) { - jobId = replayedJob.jobId; - for (CooldownConf replayedConf : replayedJob.cooldownConfList) { - CooldownConf cooldownConf = new CooldownConf(replayedConf.getDbId(), replayedConf.getTableId(), - replayedConf.getPartitionId(), replayedConf.getIndexId(), replayedConf.getTabletId(), - replayedConf.getCooldownReplicaId(), replayedConf.getCooldownTerm()); - cooldownConfList.add(cooldownConf); - } - createTimeMs = replayedJob.createTimeMs; - timeoutMs = replayedJob.timeoutMs; - jobState = JobState.PENDING; - LOG.info("replay create cooldown job: {}, conf size: {}", jobId, cooldownConfList.size()); - } - - /** - * Replay job in PENDING state. set cooldown type in Replica - */ - private void replayPendingJob() throws CooldownException { - for (CooldownConf cooldownConf : cooldownConfList) { - setCooldownReplica(cooldownConf.getDbId(), cooldownConf.getTableId(), cooldownConf.getPartitionId(), - cooldownConf.getIndexId(), cooldownConf.getTabletId(), cooldownConf.getCooldownReplicaId(), - cooldownConf.getCooldownTerm()); - if (Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()) != null) { - Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownReplicaId( - cooldownConf.getCooldownReplicaId()); - Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownTerm( - cooldownConf.getCooldownTerm()); - } - } - jobState = JobState.SEND_CONF; - LOG.info("replay send cooldown conf, job: {}", jobId); - } - - /** - * Replay job in FINISHED state. - * Should replay all changes in runRunningJob() - */ - private void replayRunningJob(CooldownJob replayedJob) throws CooldownException { - jobState = CooldownJob.JobState.FINISHED; - this.finishedTimeMs = replayedJob.finishedTimeMs; - LOG.info("replay finished cooldown job: {}", jobId); - } - - private void setCooldownReplica(long dbId, long tableId, long partitionId, long indexId, long tabletId, - long cooldownReplicaId, long cooldownTerm) throws CooldownException { - Database db = Env.getCurrentInternalCatalog() - .getDbOrException(dbId, s -> new CooldownException("Database " + s + " does not exist")); - OlapTable tbl; - try { - tbl = (OlapTable) db.getTableOrMetaException(tableId, TableIf.TableType.OLAP); - } catch (MetaNotFoundException e) { - throw new CooldownException(e.getMessage()); - } - if (tbl != null) { - tbl.writeLock(); - try { - Partition partition = tbl.getPartition(partitionId); - if (partition != null) { - MaterializedIndex index = partition.getIndex(indexId); - if (index != null) { - Tablet tablet = index.getTablet(tabletId); - if (tablet != null) { - tablet.setCooldownReplicaId(cooldownReplicaId); - tablet.setCooldownTerm(cooldownTerm); - LOG.info("setCooldownReplicaId to {} when cancel job: {}:{}", cooldownReplicaId, - tablet.getId(), jobId); - return; - } - } - } - throw new CooldownException("set cooldown type failed."); - } finally { - tbl.writeUnlock(); - } - } - } - - private void cancelInternal() { - // clear tasks if has - AgentTaskQueue.removeBatchTask(cooldownBatchTask, TTaskType.PUSH_COOLDOWN_CONF); - jobState = CooldownJob.JobState.CANCELLED; - } - - /** - * Replay job in CANCELLED state. - */ - private void replayCancelled(CooldownJob replayedJob) { - cancelInternal(); - this.jobState = CooldownJob.JobState.CANCELLED; - this.finishedTimeMs = replayedJob.finishedTimeMs; - this.errMsg = replayedJob.errMsg; - LOG.info("replay cancelled cooldown job: {}", jobId); - } - -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 6a9392ffee..10562b9a37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -361,7 +361,7 @@ public class InternalCatalog implements CatalogIf { int schemaHash = olapTable.getSchemaHashByIndexId(indexId); for (Tablet tablet : index.getTablets()) { TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, - medium, tablet.getCooldownReplicaId(), tablet.getCooldownTerm()); + medium); long tabletId = tablet.getId(); invertedIndex.addTablet(tabletId, tabletMeta); for (Replica replica : tablet.getReplicas()) { @@ -1290,7 +1290,7 @@ public class InternalCatalog implements CatalogIf { int schemaHash = olapTable.getSchemaHashByIndexId(indexId); for (Tablet tablet : mIndex.getTablets()) { TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, - medium, tablet.getCooldownReplicaId(), tablet.getCooldownTerm()); + medium); long tabletId = tablet.getId(); invertedIndex.addTablet(tabletId, tabletMeta); for (Replica replica : tablet.getReplicas()) { @@ -1557,8 +1557,7 @@ public class InternalCatalog implements CatalogIf { int schemaHash = olapTable.getSchemaHashByIndexId(indexId); for (Tablet tablet : index.getTablets()) { TabletMeta tabletMeta = new TabletMeta(info.getDbId(), info.getTableId(), partition.getId(), - index.getId(), schemaHash, info.getDataProperty().getStorageMedium(), - tablet.getCooldownReplicaId(), tablet.getCooldownTerm()); + index.getId(), schemaHash, info.getDataProperty().getStorageMedium()); long tabletId = tablet.getId(); invertedIndex.addTablet(tabletId, tabletMeta); for (Replica replica : tablet.getReplicas()) { @@ -1709,8 +1708,7 @@ public class InternalCatalog implements CatalogIf { // create tablets int schemaHash = indexMeta.getSchemaHash(); - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, storageMedium, -1, - -1); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, storageMedium); createTablets(clusterName, index, ReplicaState.NORMAL, distributionInfo, version, replicaAlloc, tabletMeta, tabletIdSet, idGeneratorBuffer); @@ -2710,7 +2708,7 @@ public class InternalCatalog implements CatalogIf { int schemaHash = olapTable.getSchemaHashByIndexId(indexId); for (Tablet tablet : mIndex.getTablets()) { TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), partitionId, indexId, - schemaHash, medium, tablet.getCooldownReplicaId(), tablet.getCooldownTerm()); + schemaHash, medium); long tabletId = tablet.getId(); invertedIndex.addTablet(tabletId, tabletMeta); for (Replica replica : tablet.getReplicas()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 0c0e49f617..2eb0b8dbe3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -36,7 +36,7 @@ import org.apache.doris.cluster.Cluster; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.SmallFileMgr.SmallFile; -import org.apache.doris.cooldown.CooldownJob; +import org.apache.doris.cooldown.CooldownConfList; import org.apache.doris.datasource.CatalogLog; import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.InitCatalogLog; @@ -580,8 +580,8 @@ public class JournalEntity implements Writable { isRead = true; break; } - case OperationType.OP_PUSH_COOLDOWN_CONF: { - data = CooldownJob.read(in); + case OperationType.OP_UPDATE_COOLDOWN_CONF: { + data = CooldownConfList.read(in); isRead = true; break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index e14f87d375..d45d787ed0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -41,6 +41,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.util.Daemon; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.cooldown.CooldownConf; import org.apache.doris.metric.GaugeMetric; import org.apache.doris.metric.Metric.MetricUnit; import org.apache.doris.metric.MetricRepo; @@ -62,6 +63,7 @@ import org.apache.doris.task.CreateReplicaTask; import org.apache.doris.task.DropReplicaTask; import org.apache.doris.task.MasterTask; import org.apache.doris.task.PublishVersionTask; +import org.apache.doris.task.PushCooldownConfTask; import org.apache.doris.task.PushStoragePolicyTask; import org.apache.doris.task.StorageMediaMigrationTask; import org.apache.doris.task.UpdateTabletMetaInfoTask; @@ -94,6 +96,7 @@ import org.apache.thrift.TException; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -263,6 +266,17 @@ public class ReportHandler extends Daemon { } } + private static void handlePushCooldownConf(long backendId, List cooldownConfToPush) { + final int PUSH_BATCH_SIZE = 1024; + AgentBatchTask batchTask = new AgentBatchTask(); + for (int start = 0; start < cooldownConfToPush.size(); start += PUSH_BATCH_SIZE) { + PushCooldownConfTask task = new PushCooldownConfTask(backendId, + cooldownConfToPush.subList(start, Math.min(start + PUSH_BATCH_SIZE, cooldownConfToPush.size()))); + batchTask.addTask(task); + } + AgentTaskExecutor.submit(batchTask); + } + private static void handlePushStoragePolicy(long backendId, List policyToPush, List resourceToPush, List policyToDrop) { AgentBatchTask batchTask = new AgentBatchTask(); @@ -396,8 +410,6 @@ public class ReportHandler extends Daemon { Set tabletFoundInMeta = Sets.newConcurrentHashSet(); // storage medium -> tablet id ListMultimap tabletMigrationMap = LinkedListMultimap.create(); - // the cooldown type of replicas which need to be sync. tabletId -> TabletMeta - Map syncCooldownTabletMap = new HashMap<>(); // dbid -> txn id -> [partition info] Map> transactionsToPublish = Maps.newHashMap(); @@ -408,6 +420,9 @@ public class ReportHandler extends Daemon { List> tabletToInMemory = Lists.newArrayList(); + List cooldownConfToPush = new LinkedList<>(); + List cooldownConfToUpdate = new LinkedList<>(); + // 1. do the diff. find out (intersection) / (be - meta) / (meta - be) Env.getCurrentInvertedIndex().tabletReport(backendId, backendTablets, storageMediumMap, tabletSyncMap, @@ -418,7 +433,8 @@ public class ReportHandler extends Daemon { transactionsToClear, tabletRecoveryMap, tabletToInMemory, - syncCooldownTabletMap); + cooldownConfToPush, + cooldownConfToUpdate); // 2. sync if (!tabletSyncMap.isEmpty()) { @@ -461,9 +477,12 @@ public class ReportHandler extends Daemon { handleSetTabletInMemory(backendId, tabletToInMemory); } - // 10. send cooldownType which need sync to CooldownHandler - if (!syncCooldownTabletMap.isEmpty()) { - Env.getCurrentEnv().getCooldownHandler().handleCooldownConf(syncCooldownTabletMap); + // handle cooldown conf + if (!cooldownConfToPush.isEmpty()) { + handlePushCooldownConf(backendId, cooldownConfToPush); + } + if (!cooldownConfToUpdate.isEmpty()) { + Env.getCurrentEnv().getCooldownConfHandler().addCooldownConfToUpdate(cooldownConfToUpdate); } final SystemInfoService currentSystemInfo = Env.getCurrentSystemInfo(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index f6a71b5377..7a1f9a967e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -41,7 +41,8 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.SmallFileMgr.SmallFile; -import org.apache.doris.cooldown.CooldownJob; +import org.apache.doris.cooldown.CooldownConfHandler; +import org.apache.doris.cooldown.CooldownConfList; import org.apache.doris.datasource.CatalogLog; import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.InitCatalogLog; @@ -720,11 +721,9 @@ public class EditLog { } break; } - case OperationType.OP_PUSH_COOLDOWN_CONF: - if (Config.cooldown_single_remote_file) { - CooldownJob cooldownJob = (CooldownJob) journal.getData(); - env.getCooldownHandler().replayCooldownJob(cooldownJob); - } + case OperationType.OP_UPDATE_COOLDOWN_CONF: + CooldownConfList cooldownConfList = (CooldownConfList) journal.getData(); + CooldownConfHandler.replayUpdateCooldownConf(cooldownConfList); break; case OperationType.OP_BATCH_ADD_ROLLUP: { BatchAlterJobPersistInfo batchAlterJobV2 = (BatchAlterJobPersistInfo) journal.getData(); @@ -1517,8 +1516,8 @@ public class EditLog { logEdit(OperationType.OP_ALTER_JOB_V2, alterJob); } - public void logCooldownJob(CooldownJob cooldownJob) { - logEdit(OperationType.OP_PUSH_COOLDOWN_CONF, cooldownJob); + public void logUpdateCooldownConf(CooldownConfList cooldownConf) { + logEdit(OperationType.OP_UPDATE_COOLDOWN_CONF, cooldownConf); } public void logBatchAlterJob(BatchAlterJobPersistInfo batchAlterJobV2) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 73ead272e4..256331444f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -76,8 +76,6 @@ public class OperationType { //schema change for add and drop columns public static final short OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS = 128; - // set cooldown conf in replica - public static final short OP_PUSH_COOLDOWN_CONF = 129; // 30~39 130~139 230~239 ... // load job for only hadoop load @@ -266,6 +264,8 @@ public class OperationType { public static final short OP_REFRESH_EXTERNAL_PARTITIONS = 356; public static final short OP_ALTER_USER = 400; + // cooldown conf + public static final short OP_UPDATE_COOLDOWN_CONF = 401; /** * Get opcode name by op code. diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java index 0198a6b20f..c01b868f02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java @@ -203,12 +203,6 @@ public class MetaPersistMethod { metaPersistMethod.writeMethod = Env.class.getDeclaredMethod("saveMTMVJobManager", CountingDataOutputStream.class, long.class); break; - case "cooldownJob": - metaPersistMethod.readMethod = Env.class.getDeclaredMethod("loadCooldownJob", DataInputStream.class, - long.class); - metaPersistMethod.writeMethod = Env.class.getDeclaredMethod("saveCooldownJob", - CountingDataOutputStream.class, long.class); - break; default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java index 2074ffe539..2737c052f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java @@ -38,7 +38,7 @@ public class PersistMetaModules { "masterInfo", "frontends", "backends", "datasource", "db", "alterJob", "recycleBin", "globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler", "paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles", - "plugins", "deleteHandler", "sqlBlockRule", "policy", "mtmvJobManager", "cooldownJob"); + "plugins", "deleteHandler", "sqlBlockRule", "policy", "mtmvJobManager"); static { MODULES_MAP = Maps.newHashMap(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java b/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java index 42e788b537..1c8a0130cc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java @@ -249,7 +249,7 @@ public class CatalogMocker { Tablet tablet0 = new Tablet(TEST_TABLET0_ID); TabletMeta tabletMeta = new TabletMeta(TEST_DB_ID, TEST_TBL_ID, TEST_SINGLE_PARTITION_ID, - TEST_TBL_ID, SCHEMA_HASH, TStorageMedium.HDD, -1, -1); + TEST_TBL_ID, SCHEMA_HASH, TStorageMedium.HDD); baseIndex.addTablet(tablet0, tabletMeta); Replica replica0 = new Replica(TEST_REPLICA0_ID, BACKEND1_ID, 0, ReplicaState.NORMAL); Replica replica1 = new Replica(TEST_REPLICA1_ID, BACKEND2_ID, 0, ReplicaState.NORMAL); @@ -323,7 +323,7 @@ public class CatalogMocker { Tablet baseTabletP1 = new Tablet(TEST_BASE_TABLET_P1_ID); TabletMeta tabletMetaBaseTabletP1 = new TabletMeta(TEST_DB_ID, TEST_TBL2_ID, TEST_PARTITION1_ID, - TEST_TBL2_ID, SCHEMA_HASH, TStorageMedium.HDD, -1, -1); + TEST_TBL2_ID, SCHEMA_HASH, TStorageMedium.HDD); baseIndexP1.addTablet(baseTabletP1, tabletMetaBaseTabletP1); Replica replica3 = new Replica(TEST_REPLICA3_ID, BACKEND1_ID, 0, ReplicaState.NORMAL); Replica replica4 = new Replica(TEST_REPLICA4_ID, BACKEND2_ID, 0, ReplicaState.NORMAL); @@ -335,7 +335,7 @@ public class CatalogMocker { Tablet baseTabletP2 = new Tablet(TEST_BASE_TABLET_P2_ID); TabletMeta tabletMetaBaseTabletP2 = new TabletMeta(TEST_DB_ID, TEST_TBL2_ID, TEST_PARTITION2_ID, - TEST_TBL2_ID, SCHEMA_HASH, TStorageMedium.HDD, -1, -1); + TEST_TBL2_ID, SCHEMA_HASH, TStorageMedium.HDD); baseIndexP2.addTablet(baseTabletP2, tabletMetaBaseTabletP2); Replica replica6 = new Replica(TEST_REPLICA6_ID, BACKEND1_ID, 0, ReplicaState.NORMAL); Replica replica7 = new Replica(TEST_REPLICA7_ID, BACKEND2_ID, 0, ReplicaState.NORMAL); @@ -356,7 +356,7 @@ public class CatalogMocker { Tablet rollupTabletP1 = new Tablet(TEST_ROLLUP_TABLET_P1_ID); TabletMeta tabletMetaRollupTabletP1 = new TabletMeta(TEST_DB_ID, TEST_TBL2_ID, TEST_PARTITION1_ID, TEST_ROLLUP_TABLET_P1_ID, ROLLUP_SCHEMA_HASH, - TStorageMedium.HDD, -1, -1); + TStorageMedium.HDD); rollupIndexP1.addTablet(rollupTabletP1, tabletMetaRollupTabletP1); Replica replica9 = new Replica(TEST_REPLICA9_ID, BACKEND1_ID, 0, ReplicaState.NORMAL); Replica replica10 = new Replica(TEST_REPLICA10_ID, BACKEND2_ID, 0, ReplicaState.NORMAL); @@ -373,7 +373,7 @@ public class CatalogMocker { Tablet rollupTabletP2 = new Tablet(TEST_ROLLUP_TABLET_P2_ID); TabletMeta tabletMetaRollupTabletP2 = new TabletMeta(TEST_DB_ID, TEST_TBL2_ID, TEST_PARTITION1_ID, TEST_ROLLUP_TABLET_P2_ID, ROLLUP_SCHEMA_HASH, - TStorageMedium.HDD, -1, -1); + TStorageMedium.HDD); rollupIndexP2.addTablet(rollupTabletP2, tabletMetaRollupTabletP2); Replica replica12 = new Replica(TEST_REPLICA12_ID, BACKEND1_ID, 0, ReplicaState.NORMAL); Replica replica13 = new Replica(TEST_REPLICA13_ID, BACKEND2_ID, 0, ReplicaState.NORMAL); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java index d79855ce67..5e022a56e2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java @@ -193,7 +193,7 @@ public class CatalogTestUtil { // index MaterializedIndex index = new MaterializedIndex(indexId, IndexState.NORMAL); - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 0, TStorageMedium.HDD, -1, -1); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 0, TStorageMedium.HDD); index.addTablet(tablet, tabletMeta); tablet.addReplica(replica1); @@ -261,7 +261,7 @@ public class CatalogTestUtil { // index MaterializedIndex index = new MaterializedIndex(testIndexId2, IndexState.NORMAL); TabletMeta tabletMeta = new TabletMeta(testDbId1, testTableId2, testPartitionId2, testIndexId2, 0, - TStorageMedium.HDD, -1, -1); + TStorageMedium.HDD); index.addTablet(tablet, tabletMeta); tablet.addReplica(replica); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java index 14e20a1bd7..d7fdb2694a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java @@ -66,7 +66,7 @@ public class TabletTest { }; tablet = new Tablet(1); - TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1, TStorageMedium.HDD, -1, -1); + TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1, TStorageMedium.HDD); invertedIndex.addTablet(1, tabletMeta); replica1 = new Replica(1L, 1L, 100L, 0, 200000L, 0, 3000L, ReplicaState.NORMAL, 0, 0); replica2 = new Replica(2L, 2L, 100L, 0, 200000L, 0, 3000L, ReplicaState.NORMAL, 0, 0); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java index 1ac66444ef..c81c3839c4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java @@ -145,16 +145,16 @@ public class ClusterLoadStatisticsTest { // tablet invertedIndex = new TabletInvertedIndex(); - invertedIndex.addTablet(50000, new TabletMeta(1, 2, 3, 4, 5, TStorageMedium.HDD, -1, -1)); + invertedIndex.addTablet(50000, new TabletMeta(1, 2, 3, 4, 5, TStorageMedium.HDD)); invertedIndex.addReplica(50000, new Replica(50001, be1.getId(), 0, ReplicaState.NORMAL)); invertedIndex.addReplica(50000, new Replica(50002, be2.getId(), 0, ReplicaState.NORMAL)); invertedIndex.addReplica(50000, new Replica(50003, be3.getId(), 0, ReplicaState.NORMAL)); - invertedIndex.addTablet(60000, new TabletMeta(1, 2, 3, 4, 5, TStorageMedium.HDD, -1, -1)); + invertedIndex.addTablet(60000, new TabletMeta(1, 2, 3, 4, 5, TStorageMedium.HDD)); invertedIndex.addReplica(60000, new Replica(60002, be2.getId(), 0, ReplicaState.NORMAL)); invertedIndex.addReplica(60000, new Replica(60003, be3.getId(), 0, ReplicaState.NORMAL)); - invertedIndex.addTablet(70000, new TabletMeta(1, 2, 3, 4, 5, TStorageMedium.HDD, -1, -1)); + invertedIndex.addTablet(70000, new TabletMeta(1, 2, 3, 4, 5, TStorageMedium.HDD)); invertedIndex.addReplica(70000, new Replica(70002, be2.getId(), 0, ReplicaState.NORMAL)); invertedIndex.addReplica(70000, new Replica(70003, be3.getId(), 0, ReplicaState.NORMAL)); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java index 756510de77..88f02df4cb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java @@ -81,7 +81,7 @@ public class RebalancerTestUtil { int schemaHash = olapTable.getSchemaHashByIndexId(baseIndex.getId()); TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), - partition.getId(), baseIndex.getId(), schemaHash, medium, -1, -1); + partition.getId(), baseIndex.getId(), schemaHash, medium); Tablet tablet = new Tablet(tabletId); // add tablet to olapTable diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java index d933b80982..4460fcb31f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java @@ -78,7 +78,7 @@ public class UnitTestUtil { // index MaterializedIndex index = new MaterializedIndex(indexId, IndexState.NORMAL); - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 0, TStorageMedium.HDD, -1, -1); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 0, TStorageMedium.HDD); index.addTablet(tablet, tabletMeta); tablet.addReplica(replica1); diff --git a/fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java deleted file mode 100644 index 6e16e9e945..0000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java +++ /dev/null @@ -1,130 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.cooldown; - -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.KeysType; -import org.apache.doris.catalog.MaterializedIndex; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.PartitionInfo; -import org.apache.doris.catalog.Replica; -import org.apache.doris.catalog.Tablet; -import org.apache.doris.catalog.TabletMeta; -import org.apache.doris.cluster.Cluster; -import org.apache.doris.persist.EditLog; -import org.apache.doris.thrift.TStorageMedium; - -import mockit.Mocked; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; - -public class CooldownJobTest { - - private static long jobId = 100L; - private static long dbId = 101L; - private static long tableId = 102L; - private static long partitionId = 103L; - private static long indexId = 104L; - private static long tabletId = 105L; - private static long replicaId = 106L; - private static long backendId = 107L; - private static long cooldownReplicaId = 106L; - private static long cooldownTerm = 109L; - private static long timeoutMs = 10000L; - private static Tablet tablet = new Tablet(tabletId); - private static Replica replica = new Replica(replicaId, backendId, 1, Replica.ReplicaState.NORMAL); - - private static CooldownConf cooldownConf = new CooldownConf(dbId, tableId, partitionId, indexId, tabletId, - cooldownReplicaId, cooldownTerm); - - private static List cooldownConfList = new LinkedList<>(); - - @Mocked - private EditLog editLog; - - public static CooldownJob createCooldownJob() { - tablet.setCooldownReplicaId(cooldownReplicaId); - tablet.setCooldownTerm(cooldownTerm); - cooldownConfList.add(cooldownConf); - return new CooldownJob(jobId, cooldownConfList, timeoutMs); - } - - @Before - public void setUp() { - Cluster testCluster = new Cluster("test_cluster", 0); - Database db = new Database(dbId, "db1"); - db.setClusterName("test_cluster"); - Env.getCurrentEnv().addCluster(testCluster); - Env.getCurrentEnv().unprotectCreateDb(db); - OlapTable table = new OlapTable(tableId, "testTable", new ArrayList<>(), KeysType.DUP_KEYS, - new PartitionInfo(), null); - table.setId(tableId); - db.createTable(table); - MaterializedIndex baseIndex = new MaterializedIndex(); - baseIndex.setIdForRestore(indexId); - Partition partition = new Partition(partitionId, "part1", baseIndex, null); - table.addPartition(partition); - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 1, TStorageMedium.HDD, - cooldownReplicaId, cooldownTerm); - baseIndex.addTablet(tablet, tabletMeta); - tablet.addReplica(replica); - Env.getCurrentEnv().setEditLog(editLog); - } - - @Test - public void testPending() throws Exception { - CooldownJob cooldownJob = createCooldownJob(); - cooldownJob.runPendingJob(); - Assert.assertEquals(CooldownJob.JobState.SEND_CONF, cooldownJob.jobState); - for (CooldownConf conf : cooldownJob.getCooldownConfList()) { - Assert.assertEquals(conf.getCooldownReplicaId(), replica.getId()); - } - CooldownJob job1 = createCooldownJob(); - job1.replay(cooldownJob); - Assert.assertEquals(CooldownJob.JobState.SEND_CONF, job1.jobState); - // run send job - cooldownJob.runSendJob(); - Assert.assertEquals(CooldownJob.JobState.RUNNING, cooldownJob.jobState); - // run replay finish job - cooldownJob.jobState = CooldownJob.JobState.FINISHED; - job1.replay(cooldownJob); - Assert.assertEquals(CooldownJob.JobState.FINISHED, job1.jobState); - } - - @Test - public void testCancelJob() throws Exception { - CooldownJob cooldownJob = createCooldownJob(); - cooldownJob.runPendingJob(); - Assert.assertEquals(CooldownJob.JobState.SEND_CONF, cooldownJob.jobState); - Assert.assertEquals(cooldownReplicaId, replica.getId()); - // run send job - cooldownJob.runSendJob(); - Assert.assertEquals(CooldownJob.JobState.RUNNING, cooldownJob.jobState); - // run cancel job - cooldownJob.cancelImpl("test cancel"); - Assert.assertEquals(CooldownJob.JobState.CANCELLED, cooldownJob.jobState); - } - -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java index df8ed5a316..2871ff3596 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java +++ b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java @@ -157,7 +157,7 @@ public abstract class DorisHttpTestCase { // index MaterializedIndex baseIndex = new MaterializedIndex(testIndexId, MaterializedIndex.IndexState.NORMAL); TabletMeta tabletMeta = new TabletMeta(testDbId, testTableId, testPartitionId, testIndexId, testSchemaHash, - TStorageMedium.HDD, -1, -1); + TStorageMedium.HDD); baseIndex.addTablet(tablet, tabletMeta); tablet.addReplica(replica1); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java index e464517d7f..5c2c005972 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java @@ -116,7 +116,7 @@ public class DeleteHandlerTest { auth = AccessTestUtil.fetchAdminAccess(); analyzer = AccessTestUtil.fetchAdminAnalyzer(false); db = CatalogMocker.mockDb(); - TabletMeta tabletMeta = new TabletMeta(DB_ID, TBL_ID, PARTITION_ID, TBL_ID, 0, null, -1, -1); + TabletMeta tabletMeta = new TabletMeta(DB_ID, TBL_ID, PARTITION_ID, TBL_ID, 0, null); invertedIndex.addTablet(TABLET_ID, tabletMeta); invertedIndex.addReplica(TABLET_ID, new Replica(REPLICA_ID_1, BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL)); invertedIndex.addReplica(TABLET_ID, new Replica(REPLICA_ID_2, BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL)); diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index 70112f4a5c..fe98c7ef4e 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -43,8 +43,8 @@ struct TTabletInfo { // data size on remote storage 16: optional Types.TSize remote_data_size 17: optional Types.TReplicaId cooldown_replica_id - 18: optional bool is_cooldown = false - 19: optional i64 cooldown_term = -1 + // 18: optional bool is_cooldown + 19: optional i64 cooldown_term } struct TFinishTaskRequest { diff --git a/regression-test/suites/cold_heat_separation/policy/alter.groovy b/regression-test/suites/cold_heat_separation/policy/alter.groovy index e56c235bb6..781a619ad9 100644 --- a/regression-test/suites/cold_heat_separation/policy/alter.groovy +++ b/regression-test/suites/cold_heat_separation/policy/alter.groovy @@ -16,8 +16,6 @@ // under the License. suite("alter_policy") { - sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");""" - def has_resouce_policy_alter = sql """ SHOW RESOURCES WHERE NAME = "has_resouce_policy_alter"; """ diff --git a/regression-test/suites/cold_heat_separation/policy/create.groovy b/regression-test/suites/cold_heat_separation/policy/create.groovy index 19f259065e..111dc00f27 100644 --- a/regression-test/suites/cold_heat_separation/policy/create.groovy +++ b/regression-test/suites/cold_heat_separation/policy/create.groovy @@ -16,8 +16,6 @@ // under the License. suite("create_policy") { - sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");""" - def has_created_1 = sql """ SHOW RESOURCES WHERE NAME = "crete_policy_1"; """ diff --git a/regression-test/suites/cold_heat_separation/policy/drop.groovy b/regression-test/suites/cold_heat_separation/policy/drop.groovy index c04b1afdcb..f29d27c530 100644 --- a/regression-test/suites/cold_heat_separation/policy/drop.groovy +++ b/regression-test/suites/cold_heat_separation/policy/drop.groovy @@ -16,8 +16,6 @@ // under the License. suite("drop_policy") { - sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");""" - def storage_exist = { name -> def show_storage_policy = sql """ SHOW STORAGE POLICY; diff --git a/regression-test/suites/cold_heat_separation/policy/show.groovy b/regression-test/suites/cold_heat_separation/policy/show.groovy index c18f3cab8b..5d750e0f3b 100644 --- a/regression-test/suites/cold_heat_separation/policy/show.groovy +++ b/regression-test/suites/cold_heat_separation/policy/show.groovy @@ -16,8 +16,6 @@ // under the License. suite("show_policy") { - sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");""" - def get_storage_policy = { name -> def show_storage_policy = sql """ SHOW STORAGE POLICY; diff --git a/regression-test/suites/cold_heat_separation/use_policy/alter_table_add_policy.groovy b/regression-test/suites/cold_heat_separation/use_policy/alter_table_add_policy.groovy index 0c02dc2888..b793465f46 100644 --- a/regression-test/suites/cold_heat_separation/use_policy/alter_table_add_policy.groovy +++ b/regression-test/suites/cold_heat_separation/use_policy/alter_table_add_policy.groovy @@ -16,8 +16,6 @@ // under the License. suite("add_table_policy_by_alter_table") { - sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");""" - def create_table_not_have_policy_result = try_sql """ CREATE TABLE IF NOT EXISTS create_table_not_have_policy ( diff --git a/regression-test/suites/cold_heat_separation/use_policy/create_table_use_partition_policy.groovy b/regression-test/suites/cold_heat_separation/use_policy/create_table_use_partition_policy.groovy index 8f1fcba080..37adbbd2ef 100644 --- a/regression-test/suites/cold_heat_separation/use_policy/create_table_use_partition_policy.groovy +++ b/regression-test/suites/cold_heat_separation/use_policy/create_table_use_partition_policy.groovy @@ -16,8 +16,6 @@ // under the License. suite("create_table_use_partition_policy") { - sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");""" - def cooldown_ttl = "10" def create_table_partition_use_not_create_policy = try_sql """ diff --git a/regression-test/suites/cold_heat_separation/use_policy/create_table_use_policy.groovy b/regression-test/suites/cold_heat_separation/use_policy/create_table_use_policy.groovy index 021ffd33e1..c94065f5ae 100644 --- a/regression-test/suites/cold_heat_separation/use_policy/create_table_use_policy.groovy +++ b/regression-test/suites/cold_heat_separation/use_policy/create_table_use_policy.groovy @@ -16,8 +16,6 @@ // under the License. suite("create_table_use_policy") { - sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");""" - def cooldown_ttl = "10" def create_table_use_not_create_policy = try_sql """ diff --git a/regression-test/suites/cold_heat_separation/use_policy/modify_partition_add_policy.groovy b/regression-test/suites/cold_heat_separation/use_policy/modify_partition_add_policy.groovy index 0998cc4b1d..cb9c807729 100644 --- a/regression-test/suites/cold_heat_separation/use_policy/modify_partition_add_policy.groovy +++ b/regression-test/suites/cold_heat_separation/use_policy/modify_partition_add_policy.groovy @@ -19,8 +19,6 @@ import java.text.SimpleDateFormat; import java.util.Date; suite("add_table_policy_by_modify_partition") { - sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");""" - SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") Date date = new Date(System.currentTimeMillis() + 3600000) def cooldownTime = format.format(date) diff --git a/regression-test/suites/cold_heat_separation/use_policy/use_default_storage_policy.groovy b/regression-test/suites/cold_heat_separation/use_policy/use_default_storage_policy.groovy index 45bd4a77d3..b11344198e 100644 --- a/regression-test/suites/cold_heat_separation/use_policy/use_default_storage_policy.groovy +++ b/regression-test/suites/cold_heat_separation/use_policy/use_default_storage_policy.groovy @@ -16,8 +16,6 @@ // under the License. suite("use_default_storage_policy") { - sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");""" - def storage_exist = { name -> def show_storage_policy = sql """ SHOW STORAGE POLICY;