[Feature](x-load) support config min replica num for loading data (#21118)

This commit is contained in:
yujun
2023-10-11 21:07:35 +08:00
committed by GitHub
parent ba87f7d3a3
commit 73c3e3ab55
52 changed files with 1563 additions and 223 deletions

View File

@ -210,6 +210,10 @@ public class Alter {
} else if (currentAlterOps.checkIsBeingSynced(alterClauses)) {
olapTable.setIsBeingSynced(currentAlterOps.isBeingSynced(alterClauses));
needProcessOutsideTableLock = true;
} else if (currentAlterOps.checkMinLoadReplicaNum(alterClauses)) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
processModifyMinLoadReplicaNum(db, olapTable, alterClause);
} else if (currentAlterOps.checkBinlogConfigChange(alterClauses)) {
if (!Config.enable_feature_binlog) {
throw new DdlException("Binlog feature is not enabled");
@ -886,6 +890,37 @@ public class Alter {
}
}
private void processModifyMinLoadReplicaNum(Database db, OlapTable olapTable, AlterClause alterClause)
throws DdlException {
Map<String, String> properties = alterClause.getProperties();
short minLoadReplicaNum = -1;
try {
minLoadReplicaNum = PropertyAnalyzer.analyzeMinLoadReplicaNum(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
ReplicaAllocation replicaAlloc = olapTable.getDefaultReplicaAllocation();
if (minLoadReplicaNum > replicaAlloc.getTotalReplicaNum()) {
throw new DdlException("Failed to check min load replica num [" + minLoadReplicaNum + "] <= "
+ "default replica num [" + replicaAlloc.getTotalReplicaNum() + "]");
}
if (olapTable.dynamicPartitionExists()) {
replicaAlloc = olapTable.getTableProperty().getDynamicPartitionProperty().getReplicaAllocation();
if (!replicaAlloc.isNotSet() && minLoadReplicaNum > replicaAlloc.getTotalReplicaNum()) {
throw new DdlException("Failed to check min load replica num [" + minLoadReplicaNum + "] <= "
+ "dynamic partition replica num [" + replicaAlloc.getTotalReplicaNum() + "]");
}
}
properties.put(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM, Short.toString(minLoadReplicaNum));
olapTable.setMinLoadReplicaNum(minLoadReplicaNum);
olapTable.writeLockOrDdlException();
try {
Env.getCurrentEnv().modifyTableProperties(db, olapTable, properties);
} finally {
olapTable.writeUnlock();
}
}
public AlterHandler getSchemaChangeHandler() {
return schemaChangeHandler;
}

View File

@ -84,6 +84,12 @@ public class AlterOperations {
).anyMatch(clause -> clause.getProperties().containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED));
}
public boolean checkMinLoadReplicaNum(List<AlterClause> alterClauses) {
return alterClauses.stream().filter(clause ->
clause instanceof ModifyTablePropertiesClause
).anyMatch(clause -> clause.getProperties().containsKey(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM));
}
public boolean checkBinlogConfigChange(List<AlterClause> alterClauses) {
return alterClauses.stream().filter(clause ->
clause instanceof ModifyTablePropertiesClause

View File

@ -46,6 +46,8 @@ public class ModifyTablePropertiesClause extends AlterTableClause {
private boolean isBeingSynced = false;
private short minLoadReplicaNum = -1;
public void setIsBeingSynced(boolean isBeingSynced) {
this.isBeingSynced = isBeingSynced;
}
@ -117,6 +119,9 @@ public class ModifyTablePropertiesClause extends AlterTableClause {
this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TABLET_TYPE)) {
throw new AnalysisException("Alter tablet type not supported");
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM)) {
// do nothing, will be alter in Alter.processAlterOlapTable
this.needTableStable = false;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY)) {
this.needTableStable = false;
String storagePolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, "");

View File

@ -3111,6 +3111,10 @@ public class Env {
sb.append("\"").append(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION).append("\" = \"");
sb.append(replicaAlloc.toCreateStmt()).append("\"");
// min load replica num
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM).append("\" = \"");
sb.append(olapTable.getMinLoadReplicaNum()).append("\"");
// bloom filter
Set<String> bfColumnNames = olapTable.getCopiedBfColumns();
if (bfColumnNames != null) {
@ -4646,6 +4650,7 @@ public class Env {
tableProperty.modifyTableProperties(properties);
}
tableProperty.buildInMemory()
.buildMinLoadReplicaNum()
.buildStoragePolicy()
.buildIsBeingSynced()
.buildCompactionPolicy()

View File

@ -1854,6 +1854,36 @@ public class OlapTable extends Table {
tableProperty.buildEnableLightSchemaChange();
}
public short getMinLoadReplicaNum() {
if (tableProperty != null) {
return tableProperty.getMinLoadReplicaNum();
}
return -1;
}
public void setMinLoadReplicaNum(short minLoadReplicaNum) {
TableProperty tableProperty = getOrCreatTableProperty();
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM,
Short.valueOf(minLoadReplicaNum).toString());
tableProperty.buildMinLoadReplicaNum();
}
public int getLoadRequiredReplicaNum(long partitionId) {
int totalReplicaNum = partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum();
int minLoadReplicaNum = getMinLoadReplicaNum();
if (minLoadReplicaNum > 0) {
return Math.min(minLoadReplicaNum, totalReplicaNum);
}
int quorum = totalReplicaNum / 2 + 1;
if (Config.min_load_replica_num > 0) {
return Math.min(quorum, Config.min_load_replica_num);
}
return quorum;
}
public void setStoragePolicy(String storagePolicy) throws UserException {
if (!Config.enable_storage_policy && !Strings.isNullOrEmpty(storagePolicy)) {
throw new UserException("storage policy feature is disabled by default. "

View File

@ -58,6 +58,7 @@ public class TableProperty implements Writable {
private DynamicPartitionProperty dynamicPartitionProperty = new DynamicPartitionProperty(Maps.newHashMap());
private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
private boolean isInMemory = false;
private short minLoadReplicaNum = -1;
private String storagePolicy = "";
private Boolean isBeingSynced = null;
@ -121,6 +122,7 @@ public class TableProperty implements Writable {
break;
case OperationType.OP_MODIFY_IN_MEMORY:
buildInMemory();
buildMinLoadReplicaNum();
buildStoragePolicy();
buildIsBeingSynced();
buildCompactionPolicy();
@ -275,6 +277,16 @@ public class TableProperty implements Writable {
return timeSeriesCompactionTimeThresholdSeconds;
}
public TableProperty buildMinLoadReplicaNum() {
minLoadReplicaNum = Short.parseShort(
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM, "-1"));
return this;
}
public short getMinLoadReplicaNum() {
return minLoadReplicaNum;
}
public TableProperty buildStoragePolicy() {
storagePolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, "");
return this;
@ -487,6 +499,7 @@ public class TableProperty implements Writable {
TableProperty tableProperty = GsonUtils.GSON.fromJson(Text.readString(in), TableProperty.class)
.executeBuildDynamicProperty()
.buildInMemory()
.buildMinLoadReplicaNum()
.buildStorageFormat()
.buildDataSortInfo()
.buildCompressionType()

View File

@ -34,7 +34,6 @@ import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.AddResult;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.persist.ColocatePersistInfo;
@ -73,7 +72,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
if (INSTANCE == null) {
synchronized (ColocateTableCheckerAndBalancer.class) {
if (INSTANCE == null) {
INSTANCE = new ColocateTableCheckerAndBalancer(FeConstants.tablet_checker_interval_ms);
INSTANCE = new ColocateTableCheckerAndBalancer(Config.tablet_checker_interval_ms);
}
}
}

View File

@ -32,7 +32,6 @@ import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.clone.TabletScheduler.AddResult;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.GaugeMetric;
@ -127,7 +126,7 @@ public class TabletChecker extends MasterDaemon {
public TabletChecker(Env env, SystemInfoService infoService, TabletScheduler tabletScheduler,
TabletSchedulerStat stat) {
super("tablet checker", FeConstants.tablet_checker_interval_ms);
super("tablet checker", Config.tablet_checker_interval_ms);
this.env = env;
this.infoService = infoService;
this.tabletScheduler = tabletScheduler;

View File

@ -309,7 +309,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
} else {
decommissionTime = -1;
if (code == SubCode.WAITING_SLOT && type != Type.BALANCE) {
return failedSchedCounter > 30 * 1000 / FeConstants.tablet_schedule_interval_ms;
return failedSchedCounter > 30 * 1000 / Config.tablet_schedule_interval_ms;
} else {
return failedSchedCounter > 10;
}

View File

@ -151,7 +151,7 @@ public class TabletScheduler extends MasterDaemon {
public TabletScheduler(Env env, SystemInfoService infoService, TabletInvertedIndex invertedIndex,
TabletSchedulerStat stat, String rebalancerType) {
super("tablet scheduler", FeConstants.tablet_schedule_interval_ms);
super("tablet scheduler", Config.tablet_schedule_interval_ms);
this.env = env;
this.infoService = infoService;
this.invertedIndex = invertedIndex;

View File

@ -64,9 +64,6 @@ public class FeConstants {
// use \N to indicate NULL
public static String null_string = "\\N";
public static long tablet_checker_interval_ms = 20 * 1000L;
public static long tablet_schedule_interval_ms = 1000L;
public static String FS_PREFIX_S3 = "s3";
public static String FS_PREFIX_S3A = "s3a";
public static String FS_PREFIX_S3N = "s3n";

View File

@ -19,6 +19,8 @@ package org.apache.doris.common.util;
import org.apache.doris.common.Config;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -34,39 +36,82 @@ public class DebugPointUtil {
private static final Map<String, DebugPoint> debugPoints = new ConcurrentHashMap<>();
private static class DebugPoint {
public static class DebugPoint {
public AtomicInteger executeNum = new AtomicInteger(0);
public int executeLimit = -1;
public long expireTime = -1;
// params
public Map<String, String> params = Maps.newHashMap();
public <E> E param(String key, E defaultValue) {
Preconditions.checkState(defaultValue != null);
String value = params.get(key);
if (value == null) {
return defaultValue;
}
if (defaultValue instanceof Boolean) {
return (E) Boolean.valueOf(value);
}
if (defaultValue instanceof Byte) {
return (E) Byte.valueOf(value);
}
if (defaultValue instanceof Character) {
Preconditions.checkState(value.length() == 1);
return (E) Character.valueOf(value.charAt(0));
}
if (defaultValue instanceof Short) {
return (E) Short.valueOf(value);
}
if (defaultValue instanceof Integer) {
return (E) Integer.valueOf(value);
}
if (defaultValue instanceof Long) {
return (E) Long.valueOf(value);
}
if (defaultValue instanceof Float) {
return (E) Float.valueOf(value);
}
if (defaultValue instanceof Double) {
return (E) Double.valueOf(value);
}
if (defaultValue instanceof String) {
return (E) value;
}
Preconditions.checkState(false, "Can not convert with default value=" + defaultValue);
return defaultValue;
}
}
public static boolean isEnable(String debugPointName) {
return getDebugPoint(debugPointName) != null;
}
public static DebugPoint getDebugPoint(String debugPointName) {
if (!Config.enable_debug_points) {
return false;
return null;
}
DebugPoint debugPoint = debugPoints.get(debugPointName);
if (debugPoint == null) {
return false;
return null;
}
if ((debugPoint.expireTime > 0 && System.currentTimeMillis() >= debugPoint.expireTime)
|| (debugPoint.executeLimit > 0 && debugPoint.executeNum.incrementAndGet() > debugPoint.executeLimit)) {
debugPoints.remove(debugPointName);
return false;
return null;
}
return true;
return debugPoint;
}
public static void addDebugPoint(String name, int executeLimit, long timeoutSecond) {
DebugPoint debugPoint = new DebugPoint();
debugPoint.executeLimit = executeLimit;
if (timeoutSecond > 0) {
debugPoint.expireTime = System.currentTimeMillis() + timeoutSecond * 1000;
}
public static void addDebugPoint(String name, DebugPoint debugPoint) {
debugPoints.put(name, debugPoint);
LOG.info("add debug point: name={}, execute={}, timeout seconds={}", name, executeLimit, timeoutSecond);
LOG.info("add debug point: name={}, params={}", name, debugPoint.params);
}
public static void removeDebugPoint(String name) {

View File

@ -639,6 +639,10 @@ public class DynamicPartitionUtil {
} else {
replicaAlloc = olapTable.getDefaultReplicaAllocation();
}
if (olapTable.getMinLoadReplicaNum() > replicaAlloc.getTotalReplicaNum()) {
throw new DdlException("Failed to check min load replica num [" + olapTable.getMinLoadReplicaNum()
+ "] <= dynamic partition replica num [" + replicaAlloc.getTotalReplicaNum() + "]");
}
checkReplicaAllocation(replicaAlloc, hotPartitionNum, db);
if (properties.containsKey(DynamicPartitionProperty.RESERVED_HISTORY_PERIODS)) {

View File

@ -61,6 +61,7 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_SHORT_KEY = "short_key";
public static final String PROPERTIES_REPLICATION_NUM = "replication_num";
public static final String PROPERTIES_REPLICATION_ALLOCATION = "replication_allocation";
public static final String PROPERTIES_MIN_LOAD_REPLICA_NUM = "min_load_replica_num";
public static final String PROPERTIES_STORAGE_TYPE = "storage_type";
public static final String PROPERTIES_STORAGE_MEDIUM = "storage_medium";
public static final String PROPERTIES_STORAGE_COOLDOWN_TIME = "storage_cooldown_time";
@ -345,6 +346,24 @@ public class PropertyAnalyzer {
return replicationNum;
}
public static short analyzeMinLoadReplicaNum(Map<String, String> properties) throws AnalysisException {
short minLoadReplicaNum = -1;
if (properties != null && properties.containsKey(PROPERTIES_MIN_LOAD_REPLICA_NUM)) {
try {
minLoadReplicaNum = Short.parseShort(properties.get(PROPERTIES_MIN_LOAD_REPLICA_NUM));
} catch (Exception e) {
throw new AnalysisException(e.getMessage());
}
if (minLoadReplicaNum <= 0 && minLoadReplicaNum != -1) {
throw new AnalysisException("min_load_replica_num should > 0 or =-1");
}
properties.remove(PROPERTIES_MIN_LOAD_REPLICA_NUM);
}
return minLoadReplicaNum;
}
public static String analyzeColumnSeparator(Map<String, String> properties, String oldColumnSeparator) {
String columnSeparator = oldColumnSeparator;
if (properties != null && properties.containsKey(PROPERTIES_COLUMN_SEPARATOR)) {

View File

@ -2062,6 +2062,18 @@ public class InternalCatalog implements CatalogIf<Database> {
// this should be done before create partition.
Map<String, String> properties = stmt.getProperties();
short minLoadReplicaNum = -1;
try {
minLoadReplicaNum = PropertyAnalyzer.analyzeMinLoadReplicaNum(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
if (minLoadReplicaNum > replicaAlloc.getTotalReplicaNum()) {
throw new DdlException("Failed to check min load replica num [" + minLoadReplicaNum + "] <= "
+ "default replica num [" + replicaAlloc.getTotalReplicaNum() + "]");
}
olapTable.setMinLoadReplicaNum(minLoadReplicaNum);
// get use light schema change
Boolean enableLightSchemaChange;
try {

View File

@ -19,6 +19,7 @@ package org.apache.doris.httpv2.rest;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
@ -40,7 +41,7 @@ import javax.servlet.http.HttpServletResponse;
public class DebugPointAction extends RestBaseController {
@RequestMapping(path = "/api/debug_point/add/{debugPoint}", method = RequestMethod.POST)
protected Object addDebugPoint(@PathVariable("debugPoint") String debugPoint,
protected Object addDebugPoint(@PathVariable("debugPoint") String name,
@RequestParam(name = "execute", required = false, defaultValue = "") String execute,
@RequestParam(name = "timeout", required = false, defaultValue = "") String timeout,
HttpServletRequest request, HttpServletResponse response) {
@ -50,28 +51,38 @@ public class DebugPointAction extends RestBaseController {
}
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
if (Strings.isNullOrEmpty(debugPoint)) {
if (Strings.isNullOrEmpty(name)) {
return ResponseEntityBuilder.badRequest("Empty debug point name.");
}
int executeLimit = -1;
DebugPoint debugPoint = new DebugPoint();
if (!Strings.isNullOrEmpty(execute)) {
try {
executeLimit = Integer.valueOf(execute);
debugPoint.executeLimit = Integer.valueOf(execute);
} catch (Exception e) {
return ResponseEntityBuilder.badRequest(
"Invalid execute format: " + execute + ", err " + e.getMessage());
}
}
long timeoutSeconds = -1;
if (!Strings.isNullOrEmpty(timeout)) {
try {
timeoutSeconds = Long.valueOf(timeout);
long timeoutSeconds = Long.valueOf(timeout);
if (timeoutSeconds > 0) {
debugPoint.expireTime = System.currentTimeMillis() + timeoutSeconds * 1000;
}
} catch (Exception e) {
return ResponseEntityBuilder.badRequest(
"Invalid timeout format: " + timeout + ", err " + e.getMessage());
}
}
DebugPointUtil.addDebugPoint(debugPoint, executeLimit, timeoutSeconds);
request.getParameterMap().forEach((key, values) -> {
if (values != null && values.length > 0) {
debugPoint.params.put(key, values[0]);
}
});
DebugPointUtil.addDebugPoint(name, debugPoint);
return ResponseEntityBuilder.ok();
}

View File

@ -38,6 +38,7 @@ import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
@ -46,6 +47,8 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TColumn;
@ -80,6 +83,7 @@ import org.apache.logging.log4j.Logger;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -429,19 +433,25 @@ public class OlapTableSink extends DataSink {
Multimap<Long, Long> allBePathsMap = HashMultimap.create();
for (Long partitionId : partitionIds) {
Partition partition = table.getPartition(partitionId);
int quorum = table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + 1;
int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partition.getId());
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
// we should ensure the replica backend is alive
// otherwise, there will be a 'unknown node id, id=xxx' error for stream load
for (Tablet tablet : index.getTablets()) {
Multimap<Long, Long> bePathsMap = tablet.getNormalReplicaBackendPathMap();
if (bePathsMap.keySet().size() < quorum) {
if (bePathsMap.keySet().size() < loadRequiredReplicaNum) {
throw new UserException(InternalErrorCode.REPLICA_FEW_ERR,
"tablet " + tablet.getId() + " alive replica num " + bePathsMap.keySet().size()
+ " < quorum replica num " + quorum
+ " < load required replica num " + loadRequiredReplicaNum
+ ", alive backends: [" + StringUtils.join(bePathsMap.keySet(), ",") + "]");
}
debugWriteRandomChooseSink(tablet, partition.getVisibleVersion(), bePathsMap);
if (bePathsMap.keySet().isEmpty()) {
throw new UserException(InternalErrorCode.REPLICA_FEW_ERR,
"tablet " + tablet.getId() + " no available replica");
}
if (singleReplicaLoad) {
Long[] nodes = bePathsMap.keySet().toArray(new Long[0]);
Random random = new SecureRandom();
@ -474,6 +484,39 @@ public class OlapTableSink extends DataSink {
return Arrays.asList(locationParam, slaveLocationParam);
}
private void debugWriteRandomChooseSink(Tablet tablet, long version, Multimap<Long, Long> bePathsMap) {
DebugPoint debugPoint = DebugPointUtil.getDebugPoint("OlapTableSink.write_random_choose_sink");
if (debugPoint == null) {
return;
}
boolean needCatchup = debugPoint.param("needCatchUp", false);
int sinkNum = debugPoint.param("sinkNum", 0);
if (sinkNum == 0) {
sinkNum = new SecureRandom().nextInt() % bePathsMap.size() + 1;
}
List<Long> candidatePaths = tablet.getReplicas().stream()
.filter(replica -> !needCatchup || replica.getVersion() >= version)
.map(Replica::getPathHash)
.collect(Collectors.toList());
if (sinkNum > 0 && sinkNum < candidatePaths.size()) {
Collections.shuffle(candidatePaths);
while (candidatePaths.size() > sinkNum) {
candidatePaths.remove(candidatePaths.size() - 1);
}
}
Multimap<Long, Long> result = HashMultimap.create();
bePathsMap.forEach((tabletId, pathHash) -> {
if (candidatePaths.contains(pathHash)) {
result.put(tabletId, pathHash);
}
});
bePathsMap.clear();
bePathsMap.putAll(result);
}
private TPaloNodesInfo createPaloNodesInfo() {
TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();

View File

@ -535,8 +535,7 @@ public class DatabaseTransactionMgr {
transactionState.prolongPublishTimeout();
}
int quorumReplicaNum = table.getPartitionInfo()
.getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + 1;
int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partition.getId());
for (MaterializedIndex index : allIndices) {
for (Tablet tablet : index.getTablets()) {
tabletSuccReplicas.clear();
@ -574,13 +573,13 @@ public class DatabaseTransactionMgr {
}
int successReplicaNum = tabletSuccReplicas.size();
if (successReplicaNum < quorumReplicaNum) {
if (successReplicaNum < loadRequiredReplicaNum) {
String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
tabletVersionFailedReplicas);
String errMsg = String.format("Failed to commit txn %s, cause tablet %s succ replica "
+ "num %s < quorum replica num %s. table %s, partition %s, this tablet detail: %s",
transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum, tableId,
String errMsg = String.format("Failed to commit txn %s, cause tablet %s succ replica num %s"
+ " < load required replica num %s. table %s, partition %s, this tablet detail: %s",
transactionId, tablet.getId(), successReplicaNum, loadRequiredReplicaNum, tableId,
partition.getId(), writeDetail);
LOG.info(errMsg);
@ -957,7 +956,7 @@ public class DatabaseTransactionMgr {
transactionState);
continue;
}
PartitionInfo partitionInfo = table.getPartitionInfo();
Iterator<PartitionCommitInfo> partitionCommitInfoIterator
= tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
while (partitionCommitInfoIterator.hasNext()) {
@ -983,8 +982,8 @@ public class DatabaseTransactionMgr {
transactionState.setErrorMsg(errMsg);
return;
}
int quorumReplicaNum = partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum() / 2 + 1;
int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partitionId);
List<MaterializedIndex> allIndices;
if (transactionState.getLoadedTblIndexes().isEmpty()) {
allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
@ -1017,15 +1016,15 @@ public class DatabaseTransactionMgr {
}
int healthReplicaNum = tabletSuccReplicas.size();
if (healthReplicaNum >= quorumReplicaNum) {
if (healthReplicaNum >= loadRequiredReplicaNum) {
if (!tabletWriteFailedReplicas.isEmpty() || !tabletVersionFailedReplicas.isEmpty()) {
String writeDetail = getTabletWriteDetail(tabletSuccReplicas,
tabletWriteFailedReplicas, tabletVersionFailedReplicas);
LOG.info("publish version quorum succ for transaction {} on tablet {} with version"
+ " {}, and has failed replicas, quorum num {}. table {}, partition {},"
+ " tablet detail: {}",
+ " {}, and has failed replicas, load require replica num {}. table {}, "
+ "partition {}, tablet detail: {}",
transactionState, tablet.getId(), partitionCommitInfo.getVersion(),
quorumReplicaNum, tableId, partitionId, writeDetail);
loadRequiredReplicaNum, tableId, partitionId, writeDetail);
}
continue;
}
@ -1050,23 +1049,23 @@ public class DatabaseTransactionMgr {
// ahead, otherwise data may be lost and thre
// publish task hangs forever.
LOG.info("publish version timeout succ for transaction {} on tablet {} with version"
+ " {}, and has failed replicas, quorum num {}. table {}, partition {},"
+ " tablet detail: {}",
+ " {}, and has failed replicas, load require replica num {}. table {}, "
+ "partition {}, tablet detail: {}",
transactionState, tablet.getId(), partitionCommitInfo.getVersion(),
quorumReplicaNum, tableId, partitionId, writeDetail);
loadRequiredReplicaNum, tableId, partitionId, writeDetail);
} else {
publishResult = PublishResult.FAILED;
String errMsg = String.format("publish on tablet %d failed."
+ " succeed replica num %d less than quorum %d."
+ " succeed replica num %d < load required replica num %d."
+ " table: %d, partition: %d, publish version: %d",
tablet.getId(), healthReplicaNum, quorumReplicaNum, tableId,
tablet.getId(), healthReplicaNum, loadRequiredReplicaNum, tableId,
partitionId, partition.getVisibleVersion() + 1);
transactionState.setErrorMsg(errMsg);
LOG.info("publish version failed for transaction {} on tablet {} with version"
+ " {}, and has failed replicas, quorum num {}. table {}, partition {},"
+ " tablet detail: {}",
+ " {}, and has failed replicas, load required replica num {}. table {}, "
+ "partition {}, tablet detail: {}",
transactionState, tablet.getId(), partitionCommitInfo.getVersion(),
quorumReplicaNum, tableId, partitionId, writeDetail);
loadRequiredReplicaNum, tableId, partitionId, writeDetail);
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.transaction;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.system.SystemInfoService;
@ -59,6 +60,9 @@ public class PublishVersionDaemon extends MasterDaemon {
}
private void publishVersion() {
if (DebugPointUtil.isEnable("PublishVersionDaemon.stop_publish")) {
return;
}
GlobalTransactionMgr globalTransactionMgr = Env.getCurrentGlobalTransactionMgr();
List<TransactionState> readyTransactionStates = globalTransactionMgr.getReadyToPublishTransactions();
if (readyTransactionStates.isEmpty()) {
@ -147,7 +151,8 @@ public class PublishVersionDaemon extends MasterDaemon {
.anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId()));
transactionState.setTableIdToTotalNumDeltaRows(tableIdToNumDeltaRows);
boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout();
boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout()
|| DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
if (shouldFinishTxn) {
try {
// one transaction exception should not affect other transaction