[fix](group commit) Fix some group commit problems (#28319)

This commit is contained in:
meiyi
2023-12-14 14:38:56 +08:00
committed by GitHub
parent e53cfa09da
commit ee24667b9f
15 changed files with 165 additions and 74 deletions

View File

@ -136,8 +136,14 @@ public class ArrayLiteral extends LiteralExpr {
public String getStringValueInFe() {
List<String> list = new ArrayList<>(children.size());
children.forEach(v -> {
String stringLiteral;
if (v instanceof NullLiteral) {
stringLiteral = "null";
} else {
stringLiteral = getStringLiteralForComplexType(v);
}
// we should use type to decide we output array is suitable for json format
list.add(getStringLiteralForComplexType(v));
list.add(stringLiteral);
});
return "[" + StringUtils.join(list, ", ") + "]";
}

View File

@ -155,11 +155,21 @@ public class MapLiteral extends LiteralExpr {
public String getStringValue() {
List<String> list = new ArrayList<>(children.size());
for (int i = 0; i < children.size() && i + 1 < children.size(); i += 2) {
list.add(children.get(i).getStringValue() + ":" + children.get(i + 1).getStringValue());
list.add(getStringValue(children.get(i)) + ":" + getStringValue(children.get(i + 1)));
}
return "{" + StringUtils.join(list, ", ") + "}";
}
private String getStringValue(Expr expr) {
if (expr instanceof NullLiteral) {
return "null";
}
if (expr instanceof StringLiteral) {
return "\"" + expr.getStringValue() + "\"";
}
return expr.getStringValue();
}
@Override
public String getStringValueForArray() {
List<String> list = new ArrayList<>(children.size());

View File

@ -54,6 +54,7 @@ import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.external.jdbc.JdbcTableSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TQueryOptions;
@ -1106,7 +1107,9 @@ public class NativeInsertStmt extends InsertStmt {
LOG.warn("analyze group commit failed", e);
return;
}
if (ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
boolean partialUpdate = ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate();
if (!partialUpdate && ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
&& ConnectContext.get().getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
&& targetTable instanceof OlapTable
&& !ConnectContext.get().isTxnModel()
&& getQueryStmt() instanceof SelectStmt
@ -1117,18 +1120,22 @@ public class NativeInsertStmt extends InsertStmt {
if (selectStmt.getValueList() != null) {
for (List<Expr> row : selectStmt.getValueList().getRows()) {
for (Expr expr : row) {
if (!expr.isLiteralOrCastExpr()) {
if (!(expr instanceof LiteralExpr)) {
return;
}
}
}
// Does not support: insert into tbl values();
if (selectStmt.getValueList().getFirstRow().isEmpty() && CollectionUtils.isEmpty(targetColumnNames)) {
return;
}
} else {
SelectList selectList = selectStmt.getSelectList();
if (selectList != null) {
List<SelectListItem> items = selectList.getItems();
if (items != null) {
for (SelectListItem item : items) {
if (item.getExpr() != null && !item.getExpr().isLiteralOrCastExpr()) {
if (item.getExpr() != null && !(item.getExpr() instanceof LiteralExpr)) {
return;
}
}

View File

@ -71,10 +71,21 @@ public class StructLiteral extends LiteralExpr {
return "STRUCT(" + StringUtils.join(list, ", ") + ")";
}
private String getStringValue(Expr expr) {
String stringValue = expr.getStringValue();
if (stringValue.isEmpty()) {
return "''";
}
if (expr instanceof StringLiteral) {
return "\"" + stringValue + "\"";
}
return stringValue;
}
@Override
public String getStringValue() {
List<String> list = new ArrayList<>(children.size());
children.forEach(v -> list.add(v.getStringValue()));
children.forEach(v -> list.add(getStringValue(v)));
return "{" + StringUtils.join(list, ", ") + "}";
}

View File

@ -171,7 +171,8 @@ public class PropertyAnalyzer {
private static final double MIN_FPP = 0.0001;
public static final String PROPERTIES_GROUP_COMMIT_INTERVAL_MS = "group_commit_interval_ms";
public static final int PROPERTIES_GROUP_COMMIT_INTERVAL_MS_DEFAULT_VALUE = 10000;
public static final int PROPERTIES_GROUP_COMMIT_INTERVAL_MS_DEFAULT_VALUE
= Config.group_commit_interval_ms_default_value;
// compaction policy
public static final String SIZE_BASED_COMPACTION_POLICY = "size_based";

View File

@ -46,6 +46,7 @@ import org.apache.doris.planner.UnionNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TStatusCode;
@ -219,17 +220,14 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
private boolean analyzeGroupCommit(ConnectContext ctx, DataSink sink,
PhysicalOlapTableSink<?> physicalOlapTableSink) {
if (!(sink instanceof OlapTableSink)) {
if (!(sink instanceof OlapTableSink) || !ctx.getSessionVariable().isEnableInsertGroupCommit()
|| ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
return false;
}
if (!ctx.getSessionVariable().isEnableInsertGroupCommit()) {
return false;
}
return ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
&& physicalOlapTableSink.getTargetTable() instanceof OlapTable
&& !ConnectContext.get().isTxnModel()
&& sink.getFragment().getPlanRoot() instanceof UnionNode
&& physicalOlapTableSink.getPartitionIds().isEmpty();
return ConnectContext.get().getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
&& physicalOlapTableSink.getTargetTable() instanceof OlapTable && !ConnectContext.get().isTxnModel()
&& sink.getFragment().getPlanRoot() instanceof UnionNode && physicalOlapTableSink.getPartitionIds()
.isEmpty();
}
@Override

View File

@ -212,6 +212,8 @@ public class GroupCommitPlanner {
if (selectStmt.getValueList() != null) {
for (List<Expr> row : selectStmt.getValueList().getRows()) {
InternalService.PDataRow data = StmtExecutor.getRowStringValue(row);
LOG.debug("add row: [{}]", data.getColList().stream().map(c -> c.getValue())
.collect(Collectors.joining(",")));
rows.add(data);
}
} else {
@ -224,6 +226,8 @@ public class GroupCommitPlanner {
}
}
InternalService.PDataRow data = StmtExecutor.getRowStringValue(exprList);
LOG.debug("add row: [{}]", data.getColList().stream().map(c -> c.getValue())
.collect(Collectors.joining(",")));
rows.add(data);
}
return rows;

View File

@ -387,6 +387,7 @@ public class OlapTableSink extends DataSink {
exprs.add(e.clone());
}
for (Expr e : exprs) {
e.reset();
e.analyze(funcAnalyzer);
}
partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs));

View File

@ -293,9 +293,15 @@ public class StmtExecutor {
if (expr instanceof NullLiteral) {
row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
} else if (expr instanceof ArrayLiteral) {
row.addColBuilder().setValue(String.format("\"%s\"", expr.getStringValueForArray()));
row.addColBuilder().setValue(String.format("\"%s\"", expr.getStringValueInFe()));
} else {
row.addColBuilder().setValue(String.format("\"%s\"", expr.getStringValue()));
String stringValue = expr.getStringValueInFe();
if (stringValue.equals(NULL_VALUE_FOR_LOAD) || stringValue.startsWith("\"") || stringValue.endsWith(
"\"")) {
row.addColBuilder().setValue(String.format("\"%s\"", stringValue));
} else {
row.addColBuilder().setValue(String.format("%s", stringValue));
}
}
}
return row.build();
@ -452,8 +458,10 @@ public class StmtExecutor {
// try to fall back to legacy planner
LOG.debug("nereids cannot process statement\n" + originStmt.originStmt
+ "\n because of " + e.getMessage(), e);
boolean isInsertIntoCommand = parsedStmt != null && parsedStmt instanceof LogicalPlanAdapter
&& ((LogicalPlanAdapter) parsedStmt).getLogicalPlan() instanceof InsertIntoTableCommand;
if (e instanceof NereidsException
&& !context.getSessionVariable().enableFallbackToOriginalPlanner) {
&& !context.getSessionVariable().enableFallbackToOriginalPlanner && !isInsertIntoCommand) {
LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e);
throw ((NereidsException) e).getException();
}

View File

@ -1994,7 +1994,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
parsedStmt.setOrigStmt(new OriginStatement(originStmt, 0));
parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
if (!StringUtils.isEmpty(request.getGroupCommitMode())) {
if (parsedStmt.getLabel() != null) {
if (!Config.wait_internal_group_commit_finish && parsedStmt.getLabel() != null) {
throw new AnalysisException("label and group_commit can't be set at the same time");
}
ctx.getSessionVariable().groupCommit = request.getGroupCommitMode();