[enhancement](random_sink) change tablet search algorithm from random to round-robin for random distribution table (#26611)
1. fix race condition problem when get tablet load index 2. change tablet search algorithm from random to round-robin for random distribution table when load_to_single_tablet set to false
This commit is contained in:
@ -207,7 +207,7 @@ import org.apache.doris.persist.TruncateTableInfo;
|
||||
import org.apache.doris.persist.meta.MetaHeader;
|
||||
import org.apache.doris.persist.meta.MetaReader;
|
||||
import org.apache.doris.persist.meta.MetaWriter;
|
||||
import org.apache.doris.planner.SingleTabletLoadRecorderMgr;
|
||||
import org.apache.doris.planner.TabletLoadIndexRecorderMgr;
|
||||
import org.apache.doris.plugin.PluginInfo;
|
||||
import org.apache.doris.plugin.PluginMgr;
|
||||
import org.apache.doris.policy.PolicyMgr;
|
||||
@ -331,7 +331,7 @@ public class Env {
|
||||
private LoadManager loadManager;
|
||||
private ProgressManager progressManager;
|
||||
private StreamLoadRecordMgr streamLoadRecordMgr;
|
||||
private SingleTabletLoadRecorderMgr singleTabletLoadRecorderMgr;
|
||||
private TabletLoadIndexRecorderMgr tabletLoadIndexRecorderMgr;
|
||||
private RoutineLoadManager routineLoadManager;
|
||||
private SqlBlockRuleMgr sqlBlockRuleMgr;
|
||||
private ExportMgr exportMgr;
|
||||
@ -682,7 +682,7 @@ public class Env {
|
||||
this.progressManager = new ProgressManager();
|
||||
this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager",
|
||||
Config.fetch_stream_load_record_interval_second * 1000L);
|
||||
this.singleTabletLoadRecorderMgr = new SingleTabletLoadRecorderMgr();
|
||||
this.tabletLoadIndexRecorderMgr = new TabletLoadIndexRecorderMgr();
|
||||
this.loadEtlChecker = new LoadEtlChecker(loadManager);
|
||||
this.loadLoadingChecker = new LoadLoadingChecker(loadManager);
|
||||
this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager);
|
||||
@ -1549,7 +1549,7 @@ public class Env {
|
||||
cooldownConfHandler.start();
|
||||
}
|
||||
streamLoadRecordMgr.start();
|
||||
singleTabletLoadRecorderMgr.start();
|
||||
tabletLoadIndexRecorderMgr.start();
|
||||
getInternalCatalog().getIcebergTableCreationRecordMgr().start();
|
||||
new InternalSchemaInitializer().start();
|
||||
if (Config.enable_hms_events_incremental_sync) {
|
||||
@ -3758,8 +3758,8 @@ public class Env {
|
||||
return streamLoadRecordMgr;
|
||||
}
|
||||
|
||||
public SingleTabletLoadRecorderMgr getSingleTabletLoadRecorderMgr() {
|
||||
return singleTabletLoadRecorderMgr;
|
||||
public TabletLoadIndexRecorderMgr getTabletLoadIndexRecorderMgr() {
|
||||
return tabletLoadIndexRecorderMgr;
|
||||
}
|
||||
|
||||
public IcebergTableCreationRecordMgr getIcebergTableCreationRecordMgr() {
|
||||
|
||||
@ -23,6 +23,7 @@ import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DistributionInfo;
|
||||
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.HashDistributionInfo;
|
||||
import org.apache.doris.catalog.Index;
|
||||
@ -109,8 +110,6 @@ public class OlapTableSink extends DataSink {
|
||||
|
||||
private boolean isStrictMode = false;
|
||||
|
||||
private boolean loadToSingleTablet;
|
||||
|
||||
public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds,
|
||||
boolean singleReplicaLoad) {
|
||||
this.dstTable = dstTable;
|
||||
@ -134,7 +133,6 @@ public class OlapTableSink extends DataSink {
|
||||
"if load_to_single_tablet set to true," + " the olap table must be with random distribution");
|
||||
}
|
||||
tSink.setLoadToSingleTablet(loadToSingleTablet);
|
||||
this.loadToSingleTablet = loadToSingleTablet;
|
||||
tDataSink = new TDataSink(getDataSinkType());
|
||||
tDataSink.setOlapTableSink(tSink);
|
||||
|
||||
@ -344,11 +342,12 @@ public class OlapTableSink extends DataSink {
|
||||
tPartition.setNumBuckets(index.getTablets().size());
|
||||
}
|
||||
tPartition.setIsMutable(table.getPartitionInfo().getIsMutable(partitionId));
|
||||
if (loadToSingleTablet) {
|
||||
int tabletIndex = Env.getCurrentEnv().getSingleTabletLoadRecorderMgr()
|
||||
.getCurrentLoadTabletIndex(dbId, table.getId(), partitionId);
|
||||
if (partition.getDistributionInfo().getType() == DistributionInfoType.RANDOM) {
|
||||
int tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
|
||||
.getCurrentTabletLoadIndex(dbId, table.getId(), partition);
|
||||
tPartition.setLoadTabletIdx(tabletIndex);
|
||||
}
|
||||
|
||||
partitionParam.addToPartitions(tPartition);
|
||||
|
||||
DistributionInfo distInfo = partition.getDistributionInfo();
|
||||
@ -403,9 +402,10 @@ public class OlapTableSink extends DataSink {
|
||||
index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList()))));
|
||||
tPartition.setNumBuckets(index.getTablets().size());
|
||||
}
|
||||
if (loadToSingleTablet) {
|
||||
int tabletIndex = Env.getCurrentEnv().getSingleTabletLoadRecorderMgr()
|
||||
.getCurrentLoadTabletIndex(dbId, table.getId(), partition.getId());
|
||||
|
||||
if (partition.getDistributionInfo().getType() == DistributionInfoType.RANDOM) {
|
||||
int tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
|
||||
.getCurrentTabletLoadIndex(dbId, table.getId(), partition);
|
||||
tPartition.setLoadTabletIdx(tabletIndex);
|
||||
}
|
||||
partitionParam.addToPartitions(tPartition);
|
||||
|
||||
@ -1,112 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MaterializedIndex;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class SingleTabletLoadRecorderMgr extends MasterDaemon {
|
||||
private static final Logger LOG = LogManager.getLogger(SingleTabletLoadRecorderMgr.class);
|
||||
private static final long EXPIRY_TIME_INTERVAL_MS = 86400000; // 1 * 24 * 60 * 60 * 1000, 1 days
|
||||
|
||||
// <<db_id, table_id, partition_id> -> load_tablet_record>
|
||||
// 0 =< load_tablet_index < number_buckets
|
||||
private final ConcurrentHashMap<Triple<Long, Long, Long>, TabletUpdateRecord> loadTabletRecordMap =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
public SingleTabletLoadRecorderMgr() {
|
||||
super("single_tablet_load_recorder", EXPIRY_TIME_INTERVAL_MS);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
long expiryTime = System.currentTimeMillis() - EXPIRY_TIME_INTERVAL_MS;
|
||||
loadTabletRecordMap.entrySet().removeIf(entry ->
|
||||
entry.getValue().getUpdateTimestamp() < expiryTime
|
||||
);
|
||||
LOG.info("Remove expired load tablet record successfully.");
|
||||
}
|
||||
|
||||
public int getCurrentLoadTabletIndex(long dbId, long tableId, long partitionId) throws UserException {
|
||||
Triple<Long, Long, Long> key = Triple.of(dbId, tableId, partitionId);
|
||||
TabletUpdateRecord record = loadTabletRecordMap.get(key);
|
||||
int numBuckets = -1;
|
||||
if (record == null) {
|
||||
numBuckets = getNumBuckets(dbId, tableId, partitionId);
|
||||
}
|
||||
return createOrUpdateLoadTabletRecord(key, numBuckets);
|
||||
}
|
||||
|
||||
private int getNumBuckets(long dbId, long tableId, long partitionId) throws UserException {
|
||||
OlapTable olapTable = (OlapTable) Env.getCurrentInternalCatalog().getDb(dbId)
|
||||
.flatMap(db -> db.getTable(tableId)).filter(t -> t.getType() == TableIf.TableType.OLAP)
|
||||
.orElse(null);
|
||||
if (olapTable == null) {
|
||||
throw new UserException("Olap table[" + dbId + "." + tableId + "] is not exist.");
|
||||
}
|
||||
return olapTable.getPartition(partitionId)
|
||||
.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)
|
||||
.get(0).getTablets().size();
|
||||
}
|
||||
|
||||
private int createOrUpdateLoadTabletRecord(Triple<Long, Long, Long> key, int numBuckets) {
|
||||
TabletUpdateRecord record = loadTabletRecordMap.compute(key, (k, existingRecord) -> {
|
||||
if (existingRecord == null) {
|
||||
return new TabletUpdateRecord(0, numBuckets);
|
||||
} else {
|
||||
existingRecord.updateRecord();
|
||||
return existingRecord;
|
||||
}
|
||||
});
|
||||
return record.getTabletIndex();
|
||||
}
|
||||
|
||||
static class TabletUpdateRecord {
|
||||
@Getter
|
||||
// 0 =< load_tablet_index < number_buckets
|
||||
int tabletIndex;
|
||||
int numBuckets;
|
||||
@Getter
|
||||
long updateTimestamp = System.currentTimeMillis();
|
||||
|
||||
TabletUpdateRecord(int tabletIndex, int numBuckets) {
|
||||
this.tabletIndex = tabletIndex;
|
||||
this.numBuckets = numBuckets;
|
||||
}
|
||||
|
||||
public synchronized void updateRecord() {
|
||||
this.tabletIndex = this.tabletIndex + 1 >= numBuckets ? 0 : this.tabletIndex + 1;
|
||||
// To reduce the compute time cost, only update timestamp when index is 0
|
||||
if (this.tabletIndex == 0) {
|
||||
this.updateTimestamp = System.currentTimeMillis();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,89 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class TabletLoadIndexRecorderMgr extends MasterDaemon {
|
||||
private static final Logger LOG = LogManager.getLogger(TabletLoadIndexRecorderMgr.class);
|
||||
private static final long TABLET_LOAD_INDEX_KEEP_MAX_TIME_MS = 86400000; // 1 * 24 * 60 * 60 * 1000, 1 days
|
||||
private static final long TABLET_LOAD_INDEX_EXPIRE_CHECK_INTERVAL_MS = 3600000; // 1 hour
|
||||
private static final int TIMES_FOR_UPDATE_TIMESTAMP = 1000;
|
||||
|
||||
// <<db_id, table_id, partition_id> -> load_tablet_record>
|
||||
// 0 =< load_tablet_index < number_buckets
|
||||
private final ConcurrentHashMap<Triple<Long, Long, Long>, TabletLoadIndexRecord> loadTabletRecordMap =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
public TabletLoadIndexRecorderMgr() {
|
||||
super("tablet_load_index_recorder", TABLET_LOAD_INDEX_EXPIRE_CHECK_INTERVAL_MS);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
int originRecordSize = loadTabletRecordMap.size();
|
||||
long expireTime = System.currentTimeMillis() - TABLET_LOAD_INDEX_KEEP_MAX_TIME_MS;
|
||||
loadTabletRecordMap.entrySet().removeIf(entry ->
|
||||
entry.getValue().getUpdateTimestamp() < expireTime
|
||||
);
|
||||
int currentRecordSize = loadTabletRecordMap.size();
|
||||
LOG.info("Remove expired load tablet index record successfully, before {}, current {}",
|
||||
originRecordSize, currentRecordSize);
|
||||
}
|
||||
|
||||
public int getCurrentTabletLoadIndex(long dbId, long tableId, Partition partition) throws UserException {
|
||||
Triple<Long, Long, Long> key = Triple.of(dbId, tableId, partition.getId());
|
||||
return loadTabletRecordMap.compute(key, (k, existingRecord) ->
|
||||
existingRecord == null ? new TabletLoadIndexRecord(partition.getVisibleVersion() - 1,
|
||||
partition.getDistributionInfo().getBucketNum()) : existingRecord).getAndIncrement();
|
||||
}
|
||||
|
||||
static class TabletLoadIndexRecord {
|
||||
int loadIndex;
|
||||
int numBuckets;
|
||||
@Getter
|
||||
long updateTimestamp = System.currentTimeMillis();
|
||||
|
||||
TabletLoadIndexRecord(long initialIndex, int numBuckets) {
|
||||
this.loadIndex = (int) (initialIndex % numBuckets);
|
||||
this.numBuckets = numBuckets;
|
||||
}
|
||||
|
||||
public synchronized int getAndIncrement() {
|
||||
int tabletLoadIndex = loadIndex % numBuckets;
|
||||
loadIndex++;
|
||||
// To reduce the compute time cost, only update timestamp when load index is
|
||||
// greater than or equal to both TIMES_FOR_UPDATE_TIMESTAMP and numBuckets
|
||||
if (loadIndex >= Math.max(TIMES_FOR_UPDATE_TIMESTAMP, numBuckets)) {
|
||||
loadIndex = loadIndex % numBuckets;
|
||||
this.updateTimestamp = System.currentTimeMillis();
|
||||
}
|
||||
return tabletLoadIndex;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user