[Performance] Optimize the performance of tabletReport (#6200)

1. Use parallelStream to speed up tabletReport.
2. Add partitionIdInMemorySet to speed up tabletToInMemory check.
3. Add disable_storage_medium_check to disable storage medium check when user doesn't care what tablet's storage medium is, and remove enable_strict_storage_medium_check config to fix some potential migration task failures.

Co-authored-by: caiconghui <caiconghui@xiaomi.com>
This commit is contained in:
caiconghui
2021-07-19 20:26:14 +08:00
committed by GitHub
parent a1a37c8cba
commit 94c50012b2
16 changed files with 321 additions and 251 deletions

View File

@ -366,7 +366,7 @@ MasterOnly:true
Whether to enable spark load temporarily, it is not enabled by default
### enable_strict_storage_medium_check
### disable_storage_medium_check
Default:false
@ -374,12 +374,8 @@ IsMutable:true
MasterOnly:true
This configuration indicates that when the table is being built, it checks for the presence of the appropriate storage medium in the cluster. For example, when the user specifies that the storage medium is' SSD 'when the table is built, but only' HDD 'disks exist in the cluster,
If this parameter is' True ', the error 'Failed to find enough host in all Backends with storage medium with storage medium is SSD, need 3'.
If this parameter is' False ', no error is reported when the table is built. Instead, the table is built on a disk with 'HDD' as the storage medium
If disable_storage_medium_check is true, ReportHandler would not check tablet's storage medium and disable storage cool down function, the default value is false. You can set the value true when you don't care what the storage medium of the tablet is.
### drop_backend_after_decommission
Default:false

View File

@ -352,7 +352,7 @@ show data (其他用法:HELP SHOW DATA)
是否临时启用 spark load,默认不启用
### enable_strict_storage_medium_check
### disable_storage_medium_check
默认值:false
@ -360,9 +360,7 @@ show data (其他用法:HELP SHOW DATA)
是否为 Master FE 节点独有的配置项:true
如果设置为 true,FE 将在创建表时通过存储介质检查后端可用容量
参数 `enable_strict_storage_medium_check` 为 `False` 该参数只是一个“尽力而为”的设置。即使集群内没有设置 SSD 存储介质,也不会报错,而是自动存储在可用的数据目录中。 同样,如果 SSD 介质不可访问、空间不足,都可能导致数据初始直接存储在其他可用介质上。而数据到期迁移到 HDD 时,如果 HDD 介质不可访问、空间不足,也可能迁移失败(但是会不断尝试)。 如果FE参数 `enable_strict_storage_medium_check` 为 `True` 则当集群内没有设置 SSD 存储介质时,会报错 `Failed to find enough host in all backends with storage medium is SSD`
如果 disable_storage_medium_check 为true, ReportHandler 将不会检查 tablet 的存储介质, 并使得存储冷却功能失效,默认值为false。当您不关心 tablet 的存储介质是什么时,可以将值设置为true 。
### drop_backend_after_decommission

View File

@ -395,7 +395,7 @@ public class RestoreJob extends AbstractJob {
* A. Table already exist
* A1. Partition already exist, generate file mapping
* A2. Partition does not exist, add restored partition to the table.
* Reset all index/tablet/replica id, and create replica on BE outside the db lock.
* Reset all index/tablet/replica id, and create replica on BE outside the table lock.
* B. Table does not exist
* B1. Add table to the db, reset all table/index/tablet/replica id,
* and create replica on BE outside the db lock.
@ -1566,7 +1566,6 @@ public class RestoreJob extends AbstractJob {
} finally {
restoreTbl.writeUnlock();
}
}
// remove restored resource

View File

@ -159,6 +159,7 @@ import org.apache.doris.load.routineload.RoutineLoadScheduler;
import org.apache.doris.load.routineload.RoutineLoadTaskScheduler;
import org.apache.doris.master.Checkpoint;
import org.apache.doris.master.MetaHelper;
import org.apache.doris.master.PartitionInMemoryInfoCollector;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.PaloAuth;
@ -214,9 +215,9 @@ import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTabletType;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.PublishVersionDaemon;
import org.apache.doris.transaction.UpdateDbUsedDataQuotaDaemon;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@ -306,7 +307,8 @@ public class Catalog {
private BackupHandler backupHandler;
private PublishVersionDaemon publishVersionDaemon;
private DeleteHandler deleteHandler;
private UpdateDbUsedDataQuotaDaemon updateDbUsedDataQuotaDaemon;
private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos
private MasterDaemon txnCleaner; // To clean aborted or timeout txns
@ -492,7 +494,8 @@ public class Catalog {
this.metaDir = Config.meta_dir;
this.publishVersionDaemon = new PublishVersionDaemon();
this.deleteHandler = new DeleteHandler();
this.updateDbUsedDataQuotaDaemon = new UpdateDbUsedDataQuotaDaemon();
this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector();
this.partitionInMemoryInfoCollector = new PartitionInMemoryInfoCollector();
this.replayedJournalId = new AtomicLong(0L);
this.isElectable = false;
@ -1304,8 +1307,10 @@ public class Catalog {
routineLoadTaskScheduler.start();
// start dynamic partition task
dynamicPartitionScheduler.start();
// start daemon thread to update db used data quota for db txn manager periodly
updateDbUsedDataQuotaDaemon.start();
// start daemon thread to update db used data quota for db txn manager periodically
dbUsedDataQuotaInfoCollector.start();
// start daemon thread to update global partition in memory information periodically
partitionInMemoryInfoCollector.start();
streamLoadRecordMgr.start();
}
@ -3407,7 +3412,7 @@ public class Catalog {
}
}
public void replayErasePartition(long partitionId) throws DdlException {
public void replayErasePartition(long partitionId) {
Catalog.getCurrentRecycleBin().replayErasePartition(partitionId);
}
@ -3449,7 +3454,7 @@ public class Catalog {
Partition partition = new Partition(partitionId, partitionName, baseIndex, distributionInfo);
// add to index map
Map<Long, MaterializedIndex> indexMap = new HashMap<Long, MaterializedIndex>();
Map<Long, MaterializedIndex> indexMap = new HashMap<>();
indexMap.put(baseIndexId, baseIndex);
// create rollup index if has
@ -3771,8 +3776,7 @@ public class Catalog {
// a set to record every new tablet created when create table
// if failed in any step, use this set to do clear things
Set<Long> tabletIdSet = new HashSet<Long>();
Set<Long> tabletIdSet = new HashSet<>();
// create partition
try {
if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
@ -3866,7 +3870,6 @@ public class Catalog {
for (Long tabletId : tabletIdSet) {
Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
}
// only remove from memory, because we have not persist it
if (getColocateTableIndex().isColocateTable(tableId)) {
getColocateTableIndex().removeTable(tableId);
@ -4355,7 +4358,7 @@ public class Catalog {
if (chooseBackendsArbitrary) {
// This is the first colocate table in the group, or just a normal table,
// randomly choose backends
if (Config.enable_strict_storage_medium_check) {
if (!Config.disable_storage_medium_check) {
chosenBackendIds = chosenBackendIdBySeq(replicationNum, clusterName, tabletMeta.getStorageMedium());
} else {
chosenBackendIds = chosenBackendIdBySeq(replicationNum, clusterName);
@ -5543,7 +5546,7 @@ public class Catalog {
// need to update partition info meta
for(Partition partition: table.getPartitions()) {
table.getPartitionInfo().setIsInMemory(partition.getId(), tableProperty.IsInMemory());
table.getPartitionInfo().setIsInMemory(partition.getId(), tableProperty.isInMemory());
}
ModifyTablePropertyOperationLog info = new ModifyTablePropertyOperationLog(db.getId(), table.getId(), properties);
@ -5575,7 +5578,7 @@ public class Catalog {
// need to replay partition info meta
if (opCode == OperationType.OP_MODIFY_IN_MEMORY) {
for(Partition partition: olapTable.getPartitions()) {
olapTable.getPartitionInfo().setIsInMemory(partition.getId(), tableProperty.IsInMemory());
olapTable.getPartitionInfo().setIsInMemory(partition.getId(), tableProperty.isInMemory());
}
}
} finally {

View File

@ -442,23 +442,14 @@ public class OlapTable extends Table {
if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
for (Map.Entry<String, Long> entry : origPartNameToId.entrySet()) {
long newPartId = catalog.getNextId();
partitionInfo.idToDataProperty.put(newPartId,
partitionInfo.idToDataProperty.remove(entry.getValue()));
partitionInfo.idToReplicationNum.remove(entry.getValue());
partitionInfo.idToReplicationNum.put(newPartId, (short) restoreReplicationNum);
partitionInfo.getIdToItem(false).put(newPartId,
partitionInfo.getIdToItem(false).remove(entry.getValue()));
partitionInfo.idToInMemory.put(newPartId, partitionInfo.idToInMemory.remove(entry.getValue()));
partitionInfo.resetPartitionIdForRestore(newPartId, entry.getValue(), (short) restoreReplicationNum, false);
idToPartition.put(newPartId, idToPartition.remove(entry.getValue()));
}
} else {
// Single partitioned
long newPartId = catalog.getNextId();
for (Map.Entry<String, Long> entry : origPartNameToId.entrySet()) {
partitionInfo.idToDataProperty.put(newPartId, partitionInfo.idToDataProperty.remove(entry.getValue()));
partitionInfo.idToReplicationNum.remove(entry.getValue());
partitionInfo.idToReplicationNum.put(newPartId, (short) restoreReplicationNum);
partitionInfo.idToInMemory.put(newPartId, partitionInfo.idToInMemory.remove(entry.getValue()));
partitionInfo.resetPartitionIdForRestore(newPartId, entry.getValue(), (short) restoreReplicationNum, true);
idToPartition.put(newPartId, idToPartition.remove(entry.getValue()));
}
}
@ -1529,7 +1520,7 @@ public class OlapTable extends Table {
public Boolean isInMemory() {
if (tableProperty != null) {
return tableProperty.IsInMemory();
return tableProperty.isInMemory();
}
return false;
}

View File

@ -282,6 +282,16 @@ public class PartitionInfo implements Writable {
}
}
public void resetPartitionIdForRestore(long newPartitionId, long oldPartitionId, short restoreReplicationNum, boolean isSinglePartitioned) {
idToDataProperty.put(newPartitionId, idToDataProperty.remove(oldPartitionId));
idToReplicationNum.remove(oldPartitionId);
idToReplicationNum.put(newPartitionId, restoreReplicationNum);
if (!isSinglePartitioned) {
idToItem.put(newPartitionId, idToItem.remove(oldPartitionId));
}
idToInMemory.put(newPartitionId, idToInMemory.remove(oldPartitionId));
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, type.name());

View File

@ -163,7 +163,7 @@ public class TableProperty implements Writable {
return replicationNum;
}
public boolean IsInMemory() {
public boolean isInMemory() {
return isInMemory;
}

View File

@ -17,8 +17,11 @@
package org.apache.doris.catalog;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Config;
import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TTablet;
@ -89,6 +92,8 @@ public class TabletInvertedIndex {
// backend id -> (tablet id -> replica)
private Table<Long, Long, Replica> backingReplicaMetaTable = HashBasedTable.create();
private volatile ImmutableSet<Long> partitionIdInMemorySet = ImmutableSet.of();
public TabletInvertedIndex() {
}
@ -118,16 +123,7 @@ public class TabletInvertedIndex {
Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish,
ListMultimap<Long, Long> transactionsToClear,
ListMultimap<Long, Long> tabletRecoveryMap,
Set<Pair<Long, Integer>> tabletWithoutPartitionId) {
for (TTablet backendTablet : backendTablets.values()) {
for (TTabletInfo tabletInfo : backendTablet.tablet_infos) {
if (!tabletInfo.isSetPartitionId() || tabletInfo.getPartitionId() < 1) {
tabletWithoutPartitionId.add(new Pair<>(tabletInfo.getTabletId(), tabletInfo.getSchemaHash()));
}
}
}
List<Triple<Long, Integer, Boolean>> tabletToInMemory) {
readLock();
long start = System.currentTimeMillis();
try {
@ -135,7 +131,7 @@ public class TabletInvertedIndex {
Map<Long, Replica> replicaMetaWithBackend = backingReplicaMetaTable.row(backendId);
if (replicaMetaWithBackend != null) {
// traverse replicas in meta with this backend
for (Map.Entry<Long, Replica> entry : replicaMetaWithBackend.entrySet()) {
replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> {
long tabletId = entry.getKey();
Preconditions.checkState(tabletMetaMap.containsKey(tabletId));
TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
@ -146,10 +142,17 @@ public class TabletInvertedIndex {
for (TTabletInfo backendTabletInfo : backendTablet.getTabletInfos()) {
if (tabletMeta.containsSchemaHash(backendTabletInfo.getSchemaHash())) {
foundTabletsWithValidSchema.add(tabletId);
if (partitionIdInMemorySet.contains(backendTabletInfo.getPartitionId()) != backendTabletInfo.isIsInMemory()) {
synchronized (tabletToInMemory) {
tabletToInMemory.add(new ImmutableTriple<>(tabletId, backendTabletInfo.getSchemaHash(), !backendTabletInfo.isIsInMemory()));
}
}
// 1. (intersection)
if (needSync(replica, backendTabletInfo)) {
// need sync
tabletSyncMap.put(tabletMeta.getDbId(), tabletId);
synchronized (tabletSyncMap) {
tabletSyncMap.put(tabletMeta.getDbId(), tabletId);
}
}
// check and set path
@ -175,20 +178,27 @@ public class TabletInvertedIndex {
backendTabletInfo.getSchemaHash(),
backendTabletInfo.isSetUsed() ? backendTabletInfo.isUsed() : "unknown",
backendTabletInfo.isSetVersionMiss() ? backendTabletInfo.isVersionMiss() : "unset");
tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId);
synchronized (tabletRecoveryMap) {
tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId);
}
}
// check if need migration
long partitionId = tabletMeta.getPartitionId();
TStorageMedium storageMedium = storageMediumMap.get(partitionId);
if (storageMedium != null && backendTabletInfo.isSetStorageMedium()) {
if (storageMedium != backendTabletInfo.getStorageMedium()) {
tabletMigrationMap.put(storageMedium, tabletId);
}
if (storageMedium != tabletMeta.getStorageMedium()) {
tabletMeta.setStorageMedium(storageMedium);
if (!Config.disable_storage_medium_check) {
// check if need migration
TStorageMedium storageMedium = storageMediumMap.get(partitionId);
if (storageMedium != null && backendTabletInfo.isSetStorageMedium()) {
if (storageMedium != backendTabletInfo.getStorageMedium()) {
synchronized (tabletMigrationMap) {
tabletMigrationMap.put(storageMedium, tabletId);
}
}
if (storageMedium != tabletMeta.getStorageMedium()) {
tabletMeta.setStorageMedium(storageMedium);
}
}
}
// check if should clear transactions
if (backendTabletInfo.isSetTransactionIds()) {
List<Long> transactionIds = backendTabletInfo.getTransactionIds();
@ -196,7 +206,9 @@ public class TabletInvertedIndex {
for (Long transactionId : transactionIds) {
TransactionState transactionState = transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId);
if (transactionState == null || transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
transactionsToClear.put(transactionId, tabletMeta.getPartitionId());
synchronized (transactionsToClear) {
transactionsToClear.put(transactionId, tabletMeta.getPartitionId());
}
LOG.debug("transaction id [{}] is not valid any more, "
+ "clear it from backend [{}]", transactionId, backendId);
} else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
@ -218,12 +230,14 @@ public class TabletInvertedIndex {
TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(tabletMeta.getPartitionId(),
partitionCommitInfo.getVersion(),
partitionCommitInfo.getVersionHash());
ListMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get(transactionState.getDbId());
if (map == null) {
map = ArrayListMultimap.create();
transactionsToPublish.put(transactionState.getDbId(), map);
synchronized (transactionsToPublish) {
ListMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get(transactionState.getDbId());
if (map == null) {
map = ArrayListMultimap.create();
transactionsToPublish.put(transactionState.getDbId(), map);
}
map.put(transactionId, versionInfo);
}
map.put(transactionId, versionInfo);
}
}
}
@ -243,9 +257,11 @@ public class TabletInvertedIndex {
// 2. (meta - be)
// may need delete from meta
LOG.debug("backend[{}] does not report tablet[{}-{}]", backendId, tabletId, tabletMeta);
tabletDeleteFromMeta.put(tabletMeta.getDbId(), tabletId);
synchronized (tabletDeleteFromMeta) {
tabletDeleteFromMeta.put(tabletMeta.getDbId(), tabletId);
}
}
} // end for replicaMetaWithBackend
});
}
} finally {
readUnlock();
@ -253,10 +269,10 @@ public class TabletInvertedIndex {
long end = System.currentTimeMillis();
LOG.info("finished to do tablet diff with backend[{}]. sync: {}. metaDel: {}. foundValid: {}. foundInvalid: {}."
+ " migration: {}. found invalid transactions {}. found republish transactions {} "
+ " migration: {}. found invalid transactions {}. found republish transactions {}. tabletInMemorySync: {}."
+ " cost: {} ms", backendId, tabletSyncMap.size(),
tabletDeleteFromMeta.size(), foundTabletsWithValidSchema.size(), foundTabletsWithInvalidSchema.size(),
tabletMigrationMap.size(), transactionsToClear.size(), transactionsToPublish.size(), (end - start));
tabletMigrationMap.size(), transactionsToClear.size(), transactionsToPublish.size(), tabletToInMemory.size(), (end - start));
}
public Long getTabletIdByReplica(long replicaId) {
@ -484,12 +500,12 @@ public class TabletInvertedIndex {
if (Catalog.isCheckpointThread()) {
return;
}
writeLock();
readLock();
try {
Preconditions.checkState(tabletMetaTable.contains(partitionId, indexId));
tabletMetaTable.get(partitionId, indexId).setNewSchemaHash(newSchemaHash);
} finally {
writeUnlock();
readUnlock();
}
}
@ -497,12 +513,12 @@ public class TabletInvertedIndex {
if (Catalog.isCheckpointThread()) {
return;
}
writeLock();
readLock();
try {
Preconditions.checkState(tabletMetaTable.contains(partitionId, indexId));
tabletMetaTable.get(partitionId, indexId).updateToNewSchemaHash();
} finally {
writeUnlock();
readUnlock();
}
}
@ -510,14 +526,14 @@ public class TabletInvertedIndex {
if (Catalog.isCheckpointThread()) {
return;
}
writeLock();
readLock();
try {
TabletMeta tabletMeta = tabletMetaTable.get(partitionId, indexId);
if (tabletMeta != null) {
tabletMeta.deleteNewSchemaHash();
}
} finally {
writeUnlock();
readUnlock();
}
}
@ -601,6 +617,10 @@ public class TabletInvertedIndex {
}
}
public void setPartitionIdInMemorySet(ImmutableSet<Long> partitionIdInMemorySet) {
this.partitionIdInMemorySet = partitionIdInMemorySet;
}
public Map<Long, Long> getReplicaToTabletMap() {
return replicaToTabletMap;
}

View File

@ -751,9 +751,17 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_backend_down_time_second = 3600; // 1h
/**
* If disable_storage_medium_check is true, ReportHandler would not check tablet's storage medium
* and disable storage cool down function, the default value is false.
* You can set the value true when you don't care what the storage medium of the tablet is.
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean disable_storage_medium_check = false;
/**
* When create a table(or partition), you can specify its storage medium(HDD or SSD).
* If not set, this specifies the default medium when creat.
* If not set, this specifies the default medium when created.
*/
@ConfField public static String default_storage_medium = "HDD";
/**
@ -1020,7 +1028,7 @@ public class Config extends ConfigBase {
/*
* One master daemon thread will update database used data quota for db txn manager every db_used_data_quota_update_interval_secs
*/
@ConfField(mutable = true, masterOnly = true)
@ConfField(mutable = false, masterOnly = true)
public static int db_used_data_quota_update_interval_secs = 300;
/**
@ -1247,12 +1255,6 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static boolean drop_backend_after_decommission = true;
/**
* If set to true, FE will check backend available capacity by storage medium when create table
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_strict_storage_medium_check = false;
/**
* enable spark load for temporary use
*/
@ -1435,4 +1437,10 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true)
public static int default_max_query_instances = -1;
/*
* One master daemon thread will update global partition in memory info every partition_in_memory_update_interval_secs
*/
@ConfField(mutable = false, masterOnly = true)
public static int partition_in_memory_update_interval_secs = 300;
}

View File

@ -38,7 +38,6 @@ import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.time.DateTimeException;
import java.time.ZoneId;
import java.util.Calendar;
import java.util.Date;
import java.util.SimpleTimeZone;
import java.util.TimeZone;
@ -81,6 +80,9 @@ public class TimeUtils {
public static Date MIN_DATETIME = null;
public static Date MAX_DATETIME = null;
private static ThreadLocal<SimpleDateFormat> datetimeFormatThreadLocal =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
static {
TIME_ZONE = new SimpleTimeZone(8 * 3600 * 1000, "");
@ -148,11 +150,11 @@ public class TimeUtils {
return dateFormat.format(new Date(timeStamp));
}
public static synchronized String longToTimeString(long timeStamp) {
public static String longToTimeString(long timeStamp) {
SimpleDateFormat datetimeFormatTimeZone = datetimeFormatThreadLocal.get();
TimeZone timeZone = getTimeZone();
SimpleDateFormat dateFormatTimeZone = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
dateFormatTimeZone.setTimeZone(timeZone);
return longToTimeString(timeStamp, dateFormatTimeZone);
datetimeFormatTimeZone.setTimeZone(timeZone);
return longToTimeString(timeStamp, datetimeFormatTimeZone);
}
public static synchronized Date getTimeAsDate(String timeString) {
@ -208,32 +210,7 @@ public class TimeUtils {
return format(date, type.getPrimitiveType());
}
/*
* only used for ETL
*/
public static long dateTransform(long time, PrimitiveType type) {
Calendar cal = Calendar.getInstance(TIME_ZONE);
cal.setTimeInMillis(time);
int year = cal.get(Calendar.YEAR);
int month = cal.get(Calendar.MONTH) + 1;
int day = cal.get(Calendar.DAY_OF_MONTH);
if (type == PrimitiveType.DATE) {
return year * 16 * 32L + month * 32 + day;
} else if (type == PrimitiveType.DATETIME) {
// datetime
int hour = cal.get(Calendar.HOUR_OF_DAY);
int minute = cal.get(Calendar.MINUTE);
int second = cal.get(Calendar.SECOND);
return (year * 10000 + month * 100 + day) * 1000000L + hour * 10000 + minute * 100 + second;
} else {
Preconditions.checkState(false, "invalid date type: " + type);
return -1L;
}
}
public static long timeStringToLong(String timeStr) {
public static synchronized long timeStringToLong(String timeStr) {
Date d;
try {
d = DATETIME_FORMAT.parse(timeStr);
@ -244,7 +221,7 @@ public class TimeUtils {
}
public static long timeStringToLong(String timeStr, TimeZone timeZone) {
SimpleDateFormat dateFormatTimeZone = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
SimpleDateFormat dateFormatTimeZone = datetimeFormatThreadLocal.get();
dateFormatTimeZone.setTimeZone(timeZone);
Date d;
try {

View File

@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.master;
import com.google.common.collect.ImmutableSet;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
public class PartitionInMemoryInfoCollector extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(PartitionInMemoryInfoCollector.class);
public PartitionInMemoryInfoCollector() {
super("PartitionInMemoryInfoCollector", Config.partition_in_memory_update_interval_secs * 1000);
}
@Override
protected void runAfterCatalogReady() {
updatePartitionInMemoryInfo();
}
private void updatePartitionInMemoryInfo() {
Catalog catalog = Catalog.getCurrentCatalog();
TabletInvertedIndex tabletInvertedIndex = catalog.getTabletInvertedIndex();
ImmutableSet.Builder builder = ImmutableSet.builder();
List<Long> dbIdList = catalog.getDbIds();
for (Long dbId : dbIdList) {
Database db = catalog.getDb(dbId);
if (db == null) {
LOG.warn("Database [" + dbId + "] does not exist, skip to update database used data quota");
continue;
}
if (db.isInfoSchemaDb()) {
continue;
}
try {
int partitionInMemoryCount = 0;
for (Table table : db.getTables()) {
if (table.getType() != Table.TableType.OLAP) {
continue;
}
table.readLock();
try {
OlapTable olapTable = (OlapTable) table;
for (Partition partition : olapTable.getAllPartitions()) {
if (olapTable.getPartitionInfo().getIsInMemory(partition.getId())) {
partitionInMemoryCount++;
builder.add(partition.getId());
}
}
} finally {
table.readUnlock();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Update database[{}] partition in memory info, partitionInMemoryCount : {}.", db.getFullName(), partitionInMemoryCount);
}
} catch (Exception e) {
LOG.warn("Update database[" + db.getFullName() + "] partition in memory info failed", e);
}
}
ImmutableSet<Long> partitionIdInMemorySet = builder.build();
tabletInvertedIndex.setPartitionIdInMemorySet(partitionIdInMemorySet);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.master;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
@ -69,7 +70,6 @@ import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTablet;
import org.apache.doris.thrift.TTabletInfo;
import org.apache.doris.thrift.TTabletMetaType;
import org.apache.doris.thrift.TTaskType;
import com.google.common.collect.LinkedListMultimap;
@ -80,14 +80,12 @@ import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -98,6 +96,13 @@ public class ReportHandler extends Daemon {
private BlockingQueue<ReportTask> reportQueue = Queues.newLinkedBlockingQueue();
private enum ReportType {
UNKNOWN,
TASK,
DISK,
TABLET
}
public ReportHandler() {
GaugeMetric<Long> gaugeQueueSize = new GaugeMetric<Long>(
"report_queue_size", MetricUnit.NOUNIT, "report queue size") {
@ -133,26 +138,27 @@ public class ReportHandler extends Daemon {
Map<Long, TTablet> tablets = null;
long reportVersion = -1;
String reportType = "";
ReportType reportType = ReportType.UNKNOWN;
if (request.isSetTasks()) {
tasks = request.getTasks();
reportType += "task";
reportType = ReportType.TASK;
}
if (request.isSetDisks()) {
disks = request.getDisks();
reportType += "disk";
reportType = ReportType.DISK;
}
if (request.isSetTablets()) {
tablets = request.getTablets();
reportVersion = request.getReportVersion();
reportType += "tablet";
reportType = ReportType.TABLET;
} else if (request.isSetTabletList()) {
// the 'tablets' member will be deprecated in future.
tablets = buildTabletMap(request.getTabletList());
reportVersion = request.getReportVersion();
reportType += "tablet";
reportType = ReportType.TABLET;
}
if (request.isSetTabletMaxCompactionScore()) {
@ -242,16 +248,17 @@ public class ReportHandler extends Daemon {
backendId, backendTablets.size(), backendReportVersion);
// storage medium map
HashMap<Long, TStorageMedium> storageMediumMap = Catalog.getCurrentCatalog().getPartitionIdToStorageMediumMap();
HashMap<Long, TStorageMedium> storageMediumMap = Config.disable_storage_medium_check ?
Maps.newHashMap() : Catalog.getCurrentCatalog().getPartitionIdToStorageMediumMap();
// db id -> tablet id
ListMultimap<Long, Long> tabletSyncMap = LinkedListMultimap.create();
// db id -> tablet id
ListMultimap<Long, Long> tabletDeleteFromMeta = LinkedListMultimap.create();
// tablet ids which schema hash is valid
Set<Long> foundTabletsWithValidSchema = new HashSet<Long>();
Set<Long> foundTabletsWithValidSchema = Sets.newConcurrentHashSet();
// tablet ids which schema hash is invalid
Map<Long, TTabletInfo> foundTabletsWithInvalidSchema = new HashMap<Long, TTabletInfo>();
Map<Long, TTabletInfo> foundTabletsWithInvalidSchema = Maps.newConcurrentMap();
// storage medium -> tablet id
ListMultimap<TStorageMedium, Long> tabletMigrationMap = LinkedListMultimap.create();
@ -262,7 +269,7 @@ public class ReportHandler extends Daemon {
// db id -> tablet id
ListMultimap<Long, Long> tabletRecoveryMap = LinkedListMultimap.create();
Set<Pair<Long, Integer>> tabletWithoutPartitionId = Sets.newHashSet();
List<Triple<Long, Integer, Boolean>> tabletToInMemory = Lists.newArrayList();
// 1. do the diff. find out (intersection) / (be - meta) / (meta - be)
Catalog.getCurrentInvertedIndex().tabletReport(backendId, backendTablets, storageMediumMap,
@ -274,35 +281,48 @@ public class ReportHandler extends Daemon {
transactionsToPublish,
transactionsToClear,
tabletRecoveryMap,
tabletWithoutPartitionId);
tabletToInMemory);
// 2. sync
sync(backendTablets, tabletSyncMap, backendId, backendReportVersion);
if (!tabletSyncMap.isEmpty()) {
sync(backendTablets, tabletSyncMap, backendId, backendReportVersion);
}
// 3. delete (meta - be)
// BE will automatically drop defective tablets. these tablets should also be dropped in catalog
deleteFromMeta(tabletDeleteFromMeta, backendId, backendReportVersion);
if (!tabletDeleteFromMeta.isEmpty()) {
deleteFromMeta(tabletDeleteFromMeta, backendId, backendReportVersion);
}
// 4. handle (be - meta)
deleteFromBackend(backendTablets, foundTabletsWithValidSchema, foundTabletsWithInvalidSchema, backendId);
if (foundTabletsWithValidSchema.size() != backendTablets.size()) {
deleteFromBackend(backendTablets, foundTabletsWithValidSchema, foundTabletsWithInvalidSchema, backendId);
}
// 5. migration (ssd <-> hdd)
handleMigration(tabletMigrationMap, backendId);
if (!Config.disable_storage_medium_check && !tabletMigrationMap.isEmpty()) {
handleMigration(tabletMigrationMap, backendId);
}
// 6. send clear transactions to be
handleClearTransactions(transactionsToClear, backendId);
if (!transactionsToClear.isEmpty()) {
handleClearTransactions(transactionsToClear, backendId);
}
// 7. send publish version request to be
handleRepublishVersionInfo(transactionsToPublish, backendId);
if (!transactionsToPublish.isEmpty()) {
handleRepublishVersionInfo(transactionsToPublish, backendId);
}
// 8. send recover request to be
handleRecoverTablet(tabletRecoveryMap, backendTablets, backendId);
if (!tabletRecoveryMap.isEmpty()) {
handleRecoverTablet(tabletRecoveryMap, backendTablets, backendId);
}
// 9. send set tablet partition info to be
handleSetTabletPartitionId(backendId, tabletWithoutPartitionId);
// 10. send set tablet in memory to be
handleSetTabletInMemory(backendId, backendTablets);
// 9. send set tablet in memory to be
if (!tabletToInMemory.isEmpty()) {
handleSetTabletInMemory(backendId, tabletToInMemory);
}
final SystemInfoService currentSystemInfo = Catalog.getCurrentSystemInfo();
Backend reportBackend = currentSystemInfo.getBackend(backendId);
@ -312,7 +332,7 @@ public class ReportHandler extends Daemon {
}
long end = System.currentTimeMillis();
LOG.info("tablet report from backend[{}] cost: {} ms", backendId, (end - start));
LOG.info("finished to handle tablet report from backend[{}] cost: {} ms", backendId, (end - start));
}
private static void taskReport(long backendId, Map<TTaskType, Set<Long>> runningTasks) {
@ -660,42 +680,10 @@ public class ReportHandler extends Daemon {
int deleteFromBackendCounter = 0;
int addToMetaCounter = 0;
AgentBatchTask batchTask = new AgentBatchTask();
for (Long tabletId : backendTablets.keySet()) {
TTablet backendTablet = backendTablets.get(tabletId);
for (TTabletInfo backendTabletInfo : backendTablet.getTabletInfos()) {
boolean needDelete = false;
if (!foundTabletsWithValidSchema.contains(tabletId)) {
if (isBackendReplicaHealthy(backendTabletInfo)) {
// if this tablet is not in meta. try adding it.
// if add failed. delete this tablet from backend.
try {
addReplica(tabletId, backendTabletInfo, backendId);
// update counter
needDelete = false;
++addToMetaCounter;
} catch (MetaNotFoundException e) {
LOG.warn("failed add to meta. tablet[{}], backend[{}]. {}",
tabletId, backendId, e.getMessage());
needDelete = true;
}
} else {
needDelete = true;
}
}
if (needDelete) {
// drop replica
DropReplicaTask task = new DropReplicaTask(backendId, tabletId, backendTabletInfo.getSchemaHash());
batchTask.addTask(task);
LOG.warn("delete tablet[" + tabletId + " - " + backendTabletInfo.getSchemaHash()
+ "] from backend[" + backendId + "] because not found in meta");
++deleteFromBackendCounter;
}
} // end for tabletInfos
if (foundTabletsWithInvalidSchema.containsKey(tabletId)) {
// this tablet is found in meta but with invalid schema hash.
// delete it.
// This means that the meta of all backend tablets can be found in fe, we only need to process tablets with invalid Schema
if (foundTabletsWithValidSchema.size() + foundTabletsWithInvalidSchema.size() == backendTablets.size()) {
for (Long tabletId : foundTabletsWithInvalidSchema.keySet()) {
// this tablet is found in meta but with invalid schema hash. delete it.
int schemaHash = foundTabletsWithInvalidSchema.get(tabletId).getSchemaHash();
DropReplicaTask task = new DropReplicaTask(backendId, tabletId, schemaHash);
batchTask.addTask(task);
@ -703,8 +691,54 @@ public class ReportHandler extends Daemon {
+ "] because invalid schema hash");
++deleteFromBackendCounter;
}
} // end for backendTabletIds
AgentTaskExecutor.submit(batchTask);
} else {
for (Long tabletId : backendTablets.keySet()) {
if (foundTabletsWithInvalidSchema.containsKey(tabletId)) {
int schemaHash = foundTabletsWithInvalidSchema.get(tabletId).getSchemaHash();
DropReplicaTask task = new DropReplicaTask(backendId, tabletId, schemaHash);
batchTask.addTask(task);
LOG.warn("delete tablet[" + tabletId + " - " + schemaHash + "] from backend[" + backendId
+ "] because invalid schema hash");
++deleteFromBackendCounter;
continue;
}
TTablet backendTablet = backendTablets.get(tabletId);
for (TTabletInfo backendTabletInfo : backendTablet.getTabletInfos()) {
boolean needDelete = false;
if (!foundTabletsWithValidSchema.contains(tabletId)) {
if (isBackendReplicaHealthy(backendTabletInfo)) {
// if this tablet is not in meta. try adding it.
// if add failed. delete this tablet from backend.
try {
addReplica(tabletId, backendTabletInfo, backendId);
// update counter
needDelete = false;
++addToMetaCounter;
} catch (MetaNotFoundException e) {
LOG.warn("failed add to meta. tablet[{}], backend[{}]. {}",
tabletId, backendId, e.getMessage());
needDelete = true;
}
} else {
needDelete = true;
}
}
if (needDelete) {
// drop replica
DropReplicaTask task = new DropReplicaTask(backendId, tabletId, backendTabletInfo.getSchemaHash());
batchTask.addTask(task);
LOG.warn("delete tablet[" + tabletId + " - " + backendTabletInfo.getSchemaHash()
+ "] from backend[" + backendId + "] because not found in meta");
++deleteFromBackendCounter;
}
} // end for tabletInfos
} // end for backendTabletIds
}
if (batchTask.getTaskNum() != 0) {
AgentTaskExecutor.submit(batchTask);
}
LOG.info("delete {} tablet(s) from backend[{}]", deleteFromBackendCounter, backendId);
LOG.info("add {} replica(s) to meta. backend[{}]", addToMetaCounter, backendId);
@ -760,10 +794,6 @@ public class ReportHandler extends Daemon {
private static void handleRecoverTablet(ListMultimap<Long, Long> tabletRecoveryMap,
Map<Long, TTablet> backendTablets, long backendId) {
if (tabletRecoveryMap.isEmpty()) {
return;
}
// print a warn log here to indicate the exceptions on the backend
LOG.warn("find {} tablets on backend {} which is bad or misses versions that need clone or force recovery",
tabletRecoveryMap.size(), backendId);
@ -868,70 +898,13 @@ public class ReportHandler extends Daemon {
}
}
private static void handleSetTabletPartitionId(long backendId, Set<Pair<Long, Integer>> tabletWithoutPartitionId) {
LOG.info("find [{}] tablets without partition id, try to set them", tabletWithoutPartitionId.size());
if (tabletWithoutPartitionId.size() < 1) {
return;
}
private static void handleSetTabletInMemory(long backendId, List<Triple<Long, Integer, Boolean>> tabletToInMemory) {
AgentBatchTask batchTask = new AgentBatchTask();
UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(
backendId, tabletWithoutPartitionId, TTabletMetaType.PARTITIONID);
UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(backendId, tabletToInMemory);
batchTask.addTask(task);
AgentTaskExecutor.submit(batchTask);
}
private static void handleSetTabletInMemory(long backendId, Map<Long, TTablet> backendTablets) {
// <tablet id, tablet schema hash, tablet in memory>
List<Triple<Long, Integer, Boolean>> tabletToInMemory = Lists.newArrayList();
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
for (TTablet backendTablet : backendTablets.values()) {
for (TTabletInfo tabletInfo : backendTablet.tablet_infos) {
if (!tabletInfo.isSetIsInMemory()) {
continue;
}
long tabletId = tabletInfo.getTabletId();
boolean beIsInMemory = tabletInfo.is_in_memory;
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
long dbId = tabletMeta != null ? tabletMeta.getDbId() : TabletInvertedIndex.NOT_EXIST_VALUE;
long tableId = tabletMeta != null ? tabletMeta.getTableId() : TabletInvertedIndex.NOT_EXIST_VALUE;
long partitionId = tabletMeta != null ? tabletMeta.getPartitionId() : TabletInvertedIndex.NOT_EXIST_VALUE;
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db == null) {
continue;
}
OlapTable olapTable = (OlapTable) db.getTable(tableId);
if (olapTable == null) {
continue;
}
olapTable.readLock();
try {
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
continue;
}
boolean feIsInMemory = olapTable.getPartitionInfo().getIsInMemory(partitionId);
if (beIsInMemory != feIsInMemory) {
tabletToInMemory.add(new ImmutableTriple<>(tabletId, tabletInfo.getSchemaHash(), feIsInMemory));
}
} finally {
olapTable.readUnlock();
}
}
}
LOG.info("find [{}] tablets need set in memory meta", tabletToInMemory.size());
// When report, needn't synchronous
if (!tabletToInMemory.isEmpty()) {
AgentBatchTask batchTask = new AgentBatchTask();
UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(backendId, tabletToInMemory);
batchTask.addTask(task);
AgentTaskExecutor.submit(batchTask);
}
}
private static void handleClearTransactions(ListMultimap<Long, Long> transactionsToClear, long backendId) {
AgentBatchTask batchTask = new AgentBatchTask();
for (Long transactionId : transactionsToClear.keySet()) {
@ -939,7 +912,6 @@ public class ReportHandler extends Daemon {
transactionId, transactionsToClear.get(transactionId));
batchTask.addTask(clearTransactionTask);
}
AgentTaskExecutor.submit(batchTask);
}

View File

@ -27,11 +27,11 @@ import org.apache.logging.log4j.Logger;
import java.util.List;
public class UpdateDbUsedDataQuotaDaemon extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(UpdateDbUsedDataQuotaDaemon.class);
public class DbUsedDataQuotaInfoCollector extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(DbUsedDataQuotaInfoCollector.class);
public UpdateDbUsedDataQuotaDaemon() {
super("UpdateDbUsedDataQuota", Config.db_used_data_quota_update_interval_secs * 1000);
public DbUsedDataQuotaInfoCollector() {
super("DbUsedDataQuotaInfoCollector", Config.db_used_data_quota_update_interval_secs * 1000);
}
@Override

View File

@ -64,6 +64,7 @@ public class AlterTest {
FeConstants.default_scheduler_interval_millisecond = 100;
Config.dynamic_partition_enable = true;
Config.dynamic_partition_check_interval_seconds = 1;
Config.disable_storage_medium_check = true;
UtFrameUtils.createMinDorisCluster(runningDir);
// create connect context

View File

@ -20,6 +20,7 @@ package org.apache.doris.catalog;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ExceptionChecker;
@ -43,6 +44,7 @@ public class CreateTableTest {
@BeforeClass
public static void beforeClass() throws Exception {
Config.disable_storage_medium_check = true;
UtFrameUtils.createMinDorisCluster(runningDir);
// create connect context
@ -131,7 +133,7 @@ public class CreateTableTest {
+ "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n"
+ "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1');"));
ConfigBase.setMutableConfig("enable_strict_storage_medium_check", "false");
ConfigBase.setMutableConfig("disable_storage_medium_check", "true");
ExceptionChecker
.expectThrowsNoException(() -> createTable("create table test.tb7(key1 int, key2 varchar(10)) \n"
+ "distributed by hash(key1) buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');"));
@ -254,7 +256,7 @@ public class CreateTableTest {
+ "duplicate key(k1, k2, k3)\n" + "distributed by hash(k1) buckets 1\n"
+ "properties('replication_num' = '1');"));
ConfigBase.setMutableConfig("enable_strict_storage_medium_check", "true");
ConfigBase.setMutableConfig("disable_storage_medium_check", "false");
ExceptionChecker
.expectThrowsWithMsg(DdlException.class, "Failed to find enough host with storage medium is SSD in all backends. need: 1",
() -> createTable("create table test.tb7(key1 int, key2 varchar(10)) distributed by hash(key1) \n"

View File

@ -61,6 +61,7 @@ public class DynamicPartitionTableTest {
public static void beforeClass() throws Exception {
FeConstants.default_scheduler_interval_millisecond = 1000;
FeConstants.runningUnitTest = true;
Config.disable_storage_medium_check = true;
UtFrameUtils.createMinDorisCluster(runningDir);