(fix)[group-commit] Fix some group commit case (#30008)

This commit is contained in:
meiyi
2024-01-16 15:08:41 +08:00
committed by yiguolei
parent 4d6de1f181
commit e1bcdc35fd
8 changed files with 84 additions and 29 deletions

View File

@ -226,7 +226,8 @@ Status GroupCommitTable::get_first_block_load_queue(
}
}
}
return Status::InternalError("can not get a block queue");
return Status::InternalError("can not get a block queue for table_id: " +
std::to_string(_table_id));
}
Status GroupCommitTable::_create_group_commit_load(

View File

@ -148,6 +148,22 @@ public class ArrayLiteral extends LiteralExpr {
return "[" + StringUtils.join(list, ", ") + "]";
}
@Override
public String getStringValueForStreamLoad() {
List<String> list = new ArrayList<>(children.size());
children.forEach(v -> {
String stringLiteral;
if (v instanceof NullLiteral) {
stringLiteral = "null";
} else {
stringLiteral = getStringLiteralForStreamLoad(v);
}
// we should use type to decide we output array is suitable for json format
list.add(stringLiteral);
});
return "[" + StringUtils.join(list, ", ") + "]";
}
@Override
protected void toThrift(TExprNode msg) {
msg.node_type = TExprNodeType.ARRAY_LITERAL;

View File

@ -2243,6 +2243,10 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
return getStringValue();
}
public String getStringValueForStreamLoad() {
return getStringValue();
}
// A special method only for array literal, all primitive type in array
// will be wrapped by double quote. eg:
// ["1", "2", "3"]

View File

@ -123,6 +123,18 @@ public abstract class LiteralExpr extends Expr implements Comparable<LiteralExpr
}
}
public static String getStringLiteralForStreamLoad(Expr v) {
if (!(v instanceof NullLiteral) && v.getType().isScalarType()
&& (Type.getNumericTypes().contains((ScalarType) v.getActualScalarType(v.getType()))
|| v.getType() == Type.BOOLEAN)) {
return v.getStringValueInFe();
} else if (v.getType().isComplexType()) {
// these type should also call getStringValueInFe which should handle special case for itself
return v.getStringValueForStreamLoad();
} else {
return v.getStringValueForArray();
}
}
/**
* Init LiteralExpr's Type information

View File

@ -39,6 +39,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
@ -1142,6 +1143,7 @@ public class NativeInsertStmt extends InsertStmt {
&& ConnectContext.get().getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
&& targetTable instanceof OlapTable
&& ((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange()
&& !targetTable.getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME)
&& !ConnectContext.get().isTxnModel()
&& getQueryStmt() instanceof SelectStmt
&& ((SelectStmt) getQueryStmt()).getTableRefs().isEmpty() && targetPartitionNames == null
@ -1187,24 +1189,29 @@ public class NativeInsertStmt extends InsertStmt {
public GroupCommitPlanner planForGroupCommit(TUniqueId queryId) throws UserException, TException {
OlapTable olapTable = (OlapTable) getTargetTable();
if (groupCommitPlanner != null && olapTable.getBaseSchemaVersion() == baseSchemaVersion) {
LOG.debug("reuse group commit plan, table={}", olapTable);
reuseGroupCommitPlan = true;
olapTable.readLock();
try {
if (groupCommitPlanner != null && olapTable.getBaseSchemaVersion() == baseSchemaVersion) {
LOG.debug("reuse group commit plan, table={}", olapTable);
reuseGroupCommitPlan = true;
return groupCommitPlanner;
}
reuseGroupCommitPlan = false;
if (!targetColumns.isEmpty()) {
Analyzer analyzerTmp = analyzer;
reset();
this.analyzer = analyzerTmp;
}
analyzeSubquery(analyzer, true);
groupCommitPlanner = new GroupCommitPlanner((Database) db, olapTable, targetColumnNames, queryId,
ConnectContext.get().getSessionVariable().getGroupCommit());
// save plan message to be reused for prepare stmt
loadId = queryId;
baseSchemaVersion = olapTable.getBaseSchemaVersion();
return groupCommitPlanner;
} finally {
olapTable.readUnlock();
}
reuseGroupCommitPlan = false;
if (!targetColumns.isEmpty()) {
Analyzer analyzerTmp = analyzer;
reset();
this.analyzer = analyzerTmp;
}
analyzeSubquery(analyzer, true);
groupCommitPlanner = new GroupCommitPlanner((Database) db, olapTable, targetColumnNames, queryId,
ConnectContext.get().getSessionVariable().getGroupCommit());
// save plan message to be reused for prepare stmt
loadId = queryId;
baseSchemaVersion = olapTable.getBaseSchemaVersion();
return groupCommitPlanner;
}
public TUniqueId getLoadId() {

View File

@ -125,6 +125,13 @@ public class StructLiteral extends LiteralExpr {
return "{" + StringUtils.join(list, ", ") + "}";
}
@Override
public String getStringValueForStreamLoad() {
List<String> list = new ArrayList<>(children.size());
children.forEach(v -> list.add(getStringLiteralForComplexType(v)));
return "{" + StringUtils.join(list, ", ") + "}";
}
@Override
protected void toThrift(TExprNode msg) {
msg.node_type = TExprNodeType.STRUCT_LITERAL;

View File

@ -25,6 +25,7 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.ProfileManager.ProfileType;
import org.apache.doris.load.loadv2.LoadStatistic;
@ -251,10 +252,12 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
|| ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
return false;
}
OlapTable targetTable = physicalOlapTableSink.getTargetTable();
return ctx.getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
&& physicalOlapTableSink.getTargetTable() instanceof OlapTable && !ctx.isTxnModel()
&& sink.getFragment().getPlanRoot() instanceof UnionNode && physicalOlapTableSink.getPartitionIds()
.isEmpty() && physicalOlapTableSink.getTargetTable().getTableProperty().getUseSchemaLightChange();
&& !ctx.isTxnModel() && sink.getFragment().getPlanRoot() instanceof UnionNode
&& physicalOlapTableSink.getPartitionIds().isEmpty() && targetTable.getTableProperty()
.getUseSchemaLightChange() && !targetTable.getQualifiedDbName()
.equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME);
}
@Override

View File

@ -295,9 +295,9 @@ public class StmtExecutor {
if (expr instanceof NullLiteral) {
row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
} else if (expr instanceof ArrayLiteral) {
row.addColBuilder().setValue(String.format("\"%s\"", expr.getStringValueInFe()));
row.addColBuilder().setValue(String.format("\"%s\"", expr.getStringValueForStreamLoad()));
} else {
String stringValue = expr.getStringValueInFe();
String stringValue = expr.getStringValueForStreamLoad();
if (stringValue.equals(NULL_VALUE_FOR_LOAD) || stringValue.startsWith("\"") || stringValue.endsWith(
"\"")) {
row.addColBuilder().setValue(String.format("\"%s\"", stringValue));
@ -1944,6 +1944,8 @@ public class StmtExecutor {
} else if (insertStmt instanceof NativeInsertStmt && ((NativeInsertStmt) insertStmt).isGroupCommit()) {
isGroupCommit = true;
NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt;
long dbId = nativeInsertStmt.getTargetTable().getDatabase().getId();
long tableId = nativeInsertStmt.getTargetTable().getId();
int maxRetry = 3;
for (int i = 0; i < maxRetry; i++) {
GroupCommitPlanner groupCommitPlanner = nativeInsertStmt.planForGroupCommit(context.queryId);
@ -1954,10 +1956,11 @@ public class StmtExecutor {
ProtocolStringList errorMsgsList = response.getStatus().getErrorMsgsList();
if (code == TStatusCode.DATA_QUALITY_ERROR && !errorMsgsList.isEmpty() && errorMsgsList.get(0)
.contains("schema version not match")) {
LOG.info("group commit insert failed. stmt: {}, backend id: {}, status: {}, "
+ "schema version: {}, retry: {}", insertStmt.getOrigStmt().originStmt,
groupCommitPlanner.getBackend().getId(),
response.getStatus(), nativeInsertStmt.getBaseSchemaVersion(), i);
LOG.info("group commit insert failed. stmt: {}, query_id: {}, db_id: {}, table_id: {}"
+ ", schema version: {}, backend_id: {}, status: {}, retry: {}",
insertStmt.getOrigStmt().originStmt, DebugUtil.printId(context.queryId()), dbId, tableId,
nativeInsertStmt.getBaseSchemaVersion(), groupCommitPlanner.getBackend().getId(),
response.getStatus(), i);
if (i < maxRetry) {
List<TableIf> tables = Lists.newArrayList(insertStmt.getTargetTable());
MetaLockUtils.readLockTables(tables);
@ -1970,12 +1973,14 @@ public class StmtExecutor {
}
continue;
} else {
errMsg = "group commit insert failed. backend id: "
errMsg = "group commit insert failed. db_id: " + dbId + ", table_id: " + tableId
+ ", query_id: " + DebugUtil.printId(context.queryId()) + ", backend_id: "
+ groupCommitPlanner.getBackend().getId() + ", status: " + response.getStatus();
}
} else if (code != TStatusCode.OK) {
errMsg = "group commit insert failed. backend id: " + groupCommitPlanner.getBackend().getId()
+ ", status: " + response.getStatus();
errMsg = "group commit insert failed. db_id: " + dbId + ", table_id: " + tableId + ", query_id: "
+ DebugUtil.printId(context.queryId()) + ", backend_id: " + groupCommitPlanner.getBackend()
.getId() + ", status: " + response.getStatus();
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
}
label = response.getLabel();