[improvement](query) prefer to chose tablet on alive disk #39467 (#39654)

cherry pick from #39467
This commit is contained in:
yujun
2024-08-23 12:23:12 +08:00
committed by GitHub
parent 1f16daa5f6
commit 0934fbee7e
8 changed files with 190 additions and 21 deletions

View File

@ -919,13 +919,6 @@ void report_task_callback(const TMasterInfo& master_info) {
}
void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info) {
// Random sleep 1~5 seconds before doing report.
// In order to avoid the problem that the FE receives many report requests at the same time
// and can not be processed.
if (config::report_random_wait) {
random_sleep(5);
}
TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.disks = true;
@ -966,8 +959,16 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.tablets = true;
uint64_t report_version = s_report_version;
engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
uint64_t report_version;
for (int i = 0; i < 5; i++) {
request.tablets.clear();
report_version = s_report_version;
engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
if (report_version == s_report_version) {
break;
}
}
if (report_version < s_report_version) {
// TODO llj This can only reduce the possibility for report error, but can't avoid it.
// If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this

View File

@ -595,6 +595,8 @@ int main(int argc, char** argv) {
exit(1);
}
exec_env->get_storage_engine()->notify_listeners();
while (!doris::k_doris_exit) {
#if defined(LEAK_SANITIZER)
__lsan_do_leak_check();

View File

@ -151,6 +151,10 @@ public class DiskInfo implements Writable {
return pathHash != 0;
}
public boolean isAlive() {
return state == DiskState.ONLINE;
}
public boolean isStorageMediumMatch(TStorageMedium storageMedium) {
return this.storageMedium == storageMedium;
}

View File

@ -281,9 +281,11 @@ public class Tablet extends MetaObject implements Writable {
}
// for query
public List<Replica> getQueryableReplicas(long visibleVersion, boolean allowFailedVersion) {
public List<Replica> getQueryableReplicas(long visibleVersion, Map<Long, Set<Long>> backendAlivePathHashs,
boolean allowFailedVersion) {
List<Replica> allQueryableReplica = Lists.newArrayListWithCapacity(replicas.size());
List<Replica> auxiliaryReplica = Lists.newArrayListWithCapacity(replicas.size());
List<Replica> deadPathReplica = Lists.newArrayList();
for (Replica replica : replicas) {
if (replica.isBad()) {
continue;
@ -294,21 +296,31 @@ public class Tablet extends MetaObject implements Writable {
continue;
}
if (!replica.checkVersionCatchUp(visibleVersion, false)) {
continue;
}
Set<Long> thisBeAlivePaths = backendAlivePathHashs.get(replica.getBackendId());
ReplicaState state = replica.getState();
if (state.canQuery()) {
if (replica.checkVersionCatchUp(visibleVersion, false)) {
allQueryableReplica.add(replica);
}
// if thisBeAlivePaths contains pathHash = 0, it mean this be hadn't report disks state.
// should ignore this case.
if (replica.getPathHash() != -1 && thisBeAlivePaths != null
&& !thisBeAlivePaths.contains(replica.getPathHash())
&& !thisBeAlivePaths.contains(0L)) {
deadPathReplica.add(replica);
} else if (state.canQuery()) {
allQueryableReplica.add(replica);
} else if (state == ReplicaState.DECOMMISSION) {
if (replica.checkVersionCatchUp(visibleVersion, false)) {
auxiliaryReplica.add(replica);
}
auxiliaryReplica.add(replica);
}
}
if (allQueryableReplica.isEmpty()) {
allQueryableReplica = auxiliaryReplica;
}
if (allQueryableReplica.isEmpty()) {
allQueryableReplica = deadPathReplica;
}
if (Config.skip_compaction_slower_replica && allQueryableReplica.size() > 1) {
long minVersionCount = allQueryableReplica.stream().mapToLong(Replica::getVisibleVersionCount)

View File

@ -22,6 +22,7 @@ import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
@ -808,6 +809,15 @@ public class ReportHandler extends Daemon {
AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Map<Object, Object> objectPool = new HashMap<Object, Object>();
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
Set<Long> backendHealthPathHashs;
if (backend == null) {
backendHealthPathHashs = Sets.newHashSet();
} else {
backendHealthPathHashs = backend.getDisks().values().stream()
.filter(DiskInfo::isAlive)
.map(DiskInfo::getPathHash).collect(Collectors.toSet());
}
for (Long dbId : tabletDeleteFromMeta.keySet()) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
@ -863,7 +873,24 @@ public class ReportHandler extends Daemon {
long currentBackendReportVersion = Env.getCurrentSystemInfo()
.getBackendReportVersion(backendId);
if (backendReportVersion < currentBackendReportVersion) {
continue;
// if backendHealthPathHashs contains health path hash 0,
// it means this backend hadn't reported disks state,
// should ignore this case.
boolean thisReplicaOnBadDisk = replica.getPathHash() != -1L
&& !backendHealthPathHashs.contains(replica.getPathHash())
&& !backendHealthPathHashs.contains(0L);
boolean existsOtherHealthReplica = tablet.getReplicas().stream()
.anyMatch(r -> r.getBackendId() != replica.getBackendId()
&& r.getVersion() >= replica.getVersion()
&& r.getLastFailedVersion() == -1L
&& !r.isBad());
// if replica is on bad disks and there are other health replicas, still delete it.
if (!(thisReplicaOnBadDisk && existsOtherHealthReplica)) {
continue;
}
}
BinlogConfig binlogConfig = new BinlogConfig(olapTable.getBinlogConfig());

View File

@ -40,6 +40,7 @@ import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
@ -732,7 +733,7 @@ public class OlapScanNode extends ScanNode {
}
private void addScanRangeLocations(Partition partition,
List<Tablet> tablets) throws UserException {
List<Tablet> tablets, Map<Long, Set<Long>> backendAlivePathHashs) throws UserException {
long visibleVersion = partition.getVisibleVersion();
String visibleVersionStr = String.valueOf(visibleVersion);
@ -776,7 +777,8 @@ public class OlapScanNode extends ScanNode {
paloRange.setTabletId(tabletId);
// random shuffle List && only collect one copy
List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion, skipMissingVersion);
List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion,
backendAlivePathHashs, skipMissingVersion);
if (replicas.isEmpty()) {
if (ConnectContext.get().getSessionVariable().skipBadTablet) {
continue;
@ -1125,6 +1127,12 @@ public class OlapScanNode extends ScanNode {
*/
Preconditions.checkState(scanBackendIds.size() == 0);
Preconditions.checkState(scanTabletIds.size() == 0);
Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream()
.filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet()));
}
for (Long partitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(partitionId);
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
@ -1166,7 +1174,7 @@ public class OlapScanNode extends ScanNode {
totalTabletsNum += selectedTable.getTablets().size();
selectedSplitNum += tablets.size();
addScanRangeLocations(partition, tablets);
addScanRangeLocations(partition, tablets, backendAlivePathHashs);
}
}

View File

@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.system.Backend;
import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class QueryTabletTest extends TestWithFeService {
@Override
protected int backendNum() {
return 3;
}
@Test
public void testTabletOnBadDisks() throws Exception {
createDatabase("db1");
createTable("create table db1.tbl1(k1 int) distributed by hash(k1) buckets 1"
+ " properties('replication_num' = '3')");
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("db1");
OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1");
Assertions.assertNotNull(tbl);
Tablet tablet = tbl.getPartitions().iterator().next()
.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
.getTablets().iterator().next();
List<Replica> replicas = tablet.getReplicas();
Assertions.assertEquals(3, replicas.size());
for (Replica replica : replicas) {
Assertions.assertTrue(replica.getPathHash() != -1L);
}
Assertions.assertEquals(replicas,
tablet.getQueryableReplicas(1L, getAlivePathHashs(), false));
// disk mark as bad
Env.getCurrentSystemInfo().getBackend(replicas.get(0).getBackendId())
.getDisks().values().forEach(disk -> disk.setState(DiskInfo.DiskState.OFFLINE));
// lost disk
replicas.get(1).setPathHash(-123321L);
Assertions.assertEquals(Lists.newArrayList(replicas.get(2)),
tablet.getQueryableReplicas(1L, getAlivePathHashs(), false));
}
private Map<Long, Set<Long>> getAlivePathHashs() {
Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream()
.filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet()));
}
return backendAlivePathHashs;
}
}

View File

@ -39,6 +39,7 @@ import org.apache.doris.thrift.TCancelPlanFragmentParams;
import org.apache.doris.thrift.TCancelPlanFragmentResult;
import org.apache.doris.thrift.TCheckStorageFormatResult;
import org.apache.doris.thrift.TCloneReq;
import org.apache.doris.thrift.TCreateTabletReq;
import org.apache.doris.thrift.TDiskTrashInfo;
import org.apache.doris.thrift.TDropTabletReq;
import org.apache.doris.thrift.TExecPlanFragmentParams;
@ -85,7 +86,9 @@ import org.apache.thrift.TException;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
/*
* This class is used to create mock backends.
@ -193,6 +196,9 @@ public class MockedBackendFactory {
TTaskType taskType = request.getTaskType();
switch (taskType) {
case CREATE:
++reportVersion;
handleCreateTablet(request, finishTaskRequest);
break;
case ALTER:
++reportVersion;
break;
@ -200,6 +206,7 @@ public class MockedBackendFactory {
handleDropTablet(request, finishTaskRequest);
break;
case CLONE:
++reportVersion;
handleCloneTablet(request, finishTaskRequest);
break;
case STORAGE_MEDIUM_MIGRATE:
@ -225,6 +232,30 @@ public class MockedBackendFactory {
}
}
private void handleCreateTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) {
TCreateTabletReq req = request.getCreateTabletReq();
List<DiskInfo> candDisks = backendInFe.getDisks().values().stream()
.filter(disk -> req.storage_medium == disk.getStorageMedium() && disk.isAlive())
.collect(Collectors.toList());
if (candDisks.isEmpty()) {
candDisks = backendInFe.getDisks().values().stream()
.filter(DiskInfo::isAlive)
.collect(Collectors.toList());
}
DiskInfo choseDisk = candDisks.isEmpty() ? null
: candDisks.get(new Random().nextInt(candDisks.size()));
List<TTabletInfo> tabletInfos = Lists.newArrayList();
TTabletInfo tabletInfo = new TTabletInfo();
tabletInfo.setTabletId(req.tablet_id);
tabletInfo.setVersion(req.version);
tabletInfo.setPathHash(choseDisk == null ? -1L : choseDisk.getPathHash());
tabletInfo.setReplicaId(req.replica_id);
tabletInfo.setUsed(true);
tabletInfos.add(tabletInfo);
finishTaskRequest.setFinishTabletInfos(tabletInfos);
}
private void handleDropTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) {
TDropTabletReq req = request.getDropTabletReq();
long dataSize = Math.max(1, CatalogTestUtil.getTabletDataSize(req.tablet_id));