[Optimize] Add session variable `max_fetch_remote_schema_tablet_count… (#37505)

pick from #37217
This commit is contained in:
lihangyu
2024-07-11 10:04:20 +08:00
committed by GitHub
parent 9f4e7346fb
commit e1cb568d11
7 changed files with 97 additions and 1 deletions

View File

@ -92,6 +92,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -2650,6 +2651,55 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
return tablets;
}
// Get sample tablets for remote desc schema
// 1. Estimate tablets for a partition, 1 at least
// 2. Pick the partition sorted with id in desc order, greater id with the newest partition
// 3. Truncate to sampleSize
public List<Tablet> getSampleTablets(int sampleSize) {
List<Tablet> sampleTablets = new ArrayList<>();
// Filter partition with empty data
Collection<Partition> partitions = getPartitions()
.stream()
.filter(partition -> partition.getVisibleVersion() > Partition.PARTITION_INIT_VERSION)
.collect(Collectors.toList());
if (partitions.isEmpty()) {
return sampleTablets;
}
// 1. Estimate tablets for a partition, 1 at least
int estimatePartitionTablets = Math.max(sampleSize / partitions.size(), 1);
// 2. Sort the partitions by id in descending order (greater id means the newest partition)
List<Partition> sortedPartitions = partitions.stream().sorted(new Comparator<Partition>() {
@Override
public int compare(Partition p1, Partition p2) {
// compare with desc order
return Long.compare(p2.getId(), p1.getId());
}
}).collect(Collectors.toList());
// 3. Collect tablets from partitions
for (Partition partition : sortedPartitions) {
List<Tablet> targetTablets = new ArrayList<>(partition.getBaseIndex().getTablets());
Collections.shuffle(targetTablets);
if (!targetTablets.isEmpty()) {
// Ensure we do not exceed the available number of tablets
int tabletsToFetch = Math.min(targetTablets.size(), estimatePartitionTablets);
sampleTablets.addAll(targetTablets.subList(0, tabletsToFetch));
}
if (sampleTablets.size() >= sampleSize) {
break;
}
}
// 4. Truncate to sample size if needed
if (sampleTablets.size() > sampleSize) {
sampleTablets = sampleTablets.subList(0, sampleSize);
}
return sampleTablets;
}
// During `getNextVersion` and `updateVisibleVersionAndTime` period,
// the write lock on the table should be held continuously
public void updateVisibleVersionAndTime(long visibleVersion, long visibleVersionTime) {

View File

@ -25,6 +25,7 @@ import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@ -63,7 +64,8 @@ public class RemoteIndexSchemaProcDir implements ProcDirInterface {
table.readLock();
try {
OlapTable olapTable = (OlapTable) table;
tablets = olapTable.getAllTablets();
// Get sample tablets for remote desc schema
tablets = olapTable.getSampleTablets(ConnectContext.get().getSessionVariable().maxFetchRemoteTabletCount);
} finally {
table.readUnlock();
}

View File

@ -23,11 +23,13 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@ -62,6 +64,13 @@ public class RemoteIndexSchemaProcNode implements ProcNodeInterface {
tablets.add(tablet);
}
}
// Get the maximum number of Remote Tablets that can be fetched
int maxFetchCount = ConnectContext.get().getSessionVariable().maxFetchRemoteTabletCount;
// If the number of tablets is greater than the maximum fetch count, randomly select maxFetchCount tablets
if (tablets.size() > maxFetchCount) {
Collections.shuffle(tablets);
tablets = tablets.subList(0, maxFetchCount);
}
List<Column> remoteSchema = new FetchRemoteTabletSchemaUtil(tablets).fetch();
this.schema.addAll(remoteSchema);
return IndexSchemaProcNode.createResult(this.schema, this.bfColumns);

View File

@ -92,6 +92,8 @@ public class FetchRemoteTabletSchemaUtil {
Long backendId = entry.getKey();
Set<Long> tabletIds = entry.getValue();
Backend backend = Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId);
LOG.debug("fetch schema from coord backend {}, sample tablets count {}",
backend.getId(), tabletIds.size());
// only need alive be
if (!backend.isAlive()) {
continue;

View File

@ -588,6 +588,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String FETCH_REMOTE_SCHEMA_TIMEOUT_SECONDS = "fetch_remote_schema_timeout_seconds";
public static final String MAX_FETCH_REMOTE_TABLET_COUNT = "max_fetch_remote_schema_tablet_count";
// CLOUD_VARIABLES_BEGIN
public static final String CLOUD_CLUSTER = "cloud_cluster";
public static final String DISABLE_EMPTY_PARTITION_PRUNE = "disable_empty_partition_prune";
@ -1839,6 +1841,9 @@ public class SessionVariable implements Serializable, Writable {
// fetch remote schema rpc timeout
@VariableMgr.VarAttr(name = FETCH_REMOTE_SCHEMA_TIMEOUT_SECONDS, fuzzy = true)
public long fetchRemoteSchemaTimeoutSeconds = 120;
// max tablet count for fetch remote schema
@VariableMgr.VarAttr(name = MAX_FETCH_REMOTE_TABLET_COUNT, fuzzy = true)
public int maxFetchRemoteTabletCount = 512;
@VariableMgr.VarAttr(
name = ENABLE_JOIN_SPILL,