[fix](insert) txn insert and group commit should write \N string corr… (#27637)

This commit is contained in:
meiyi
2023-11-28 17:32:50 +08:00
committed by GitHub
parent f0dbce4cf5
commit 7087250b4a
5 changed files with 226 additions and 94 deletions

View File

@ -81,7 +81,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* insert into select command implementation
@ -400,8 +399,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
}
GroupCommitPlanner groupCommitPlanner = new GroupCommitPlanner(physicalOlapTableSink.getDatabase(),
physicalOlapTableSink.getTargetTable(), null, ctx.queryId());
Future<PGroupCommitInsertResponse> future = groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
PGroupCommitInsertResponse response = future.get();
PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
if (code == TStatusCode.DATA_QUALITY_ERROR) {
LOG.info("group commit insert failed. query id: {}, backend id: {}, status: {}, "

View File

@ -17,10 +17,12 @@
package org.apache.doris.planner;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.NativeInsertStmt;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
@ -33,6 +35,7 @@ import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
@ -55,10 +58,12 @@ import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@ -78,6 +83,11 @@ public class GroupCommitPlanner {
throws UserException, TException {
this.db = db;
this.table = table;
if (Env.getCurrentEnv().getGroupCommitManager().isBlock(this.table.getId())) {
String msg = "insert table " + this.table.getId() + " is blocked on schema change";
LOG.info(msg);
throw new DdlException(msg);
}
TStreamLoadPutRequest streamLoadPutRequest = new TStreamLoadPutRequest();
if (targetColumnNames != null) {
streamLoadPutRequest.setColumns(String.join(",", targetColumnNames));
@ -91,7 +101,7 @@ public class GroupCommitPlanner {
.setTbl(table.getName())
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
.setGroupCommit(true);
.setGroupCommit(true).setTrimDoubleQuotes(true);
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest);
StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask);
// Will using load id as query id in fragment
@ -114,17 +124,29 @@ public class GroupCommitPlanner {
execPlanFragmentParamsBytes = ByteString.copyFrom(new TSerializer().serialize(paramsList));
}
public Future<PGroupCommitInsertResponse> executeGroupCommitInsert(ConnectContext ctx,
List<InternalService.PDataRow> rows) throws TException, DdlException, RpcException {
public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx,
List<InternalService.PDataRow> rows)
throws DdlException, RpcException, ExecutionException, InterruptedException {
backend = ctx.getInsertGroupCommit(this.table.getId());
if (backend == null || !backend.isAlive()) {
if (backend == null || !backend.isAlive() || backend.isDecommissioned()) {
List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (allBackendIds.isEmpty()) {
throw new DdlException("No alive backend");
}
Collections.shuffle(allBackendIds);
backend = Env.getCurrentSystemInfo().getBackend(allBackendIds.get(0));
ctx.setInsertGroupCommit(this.table.getId(), backend);
boolean find = false;
for (Long beId : allBackendIds) {
backend = Env.getCurrentSystemInfo().getBackend(beId);
if (!backend.isDecommissioned()) {
ctx.setInsertGroupCommit(this.table.getId(), backend);
find = true;
LOG.debug("choose new be {}", backend.getId());
break;
}
}
if (!find) {
throw new DdlException("No suitable backend");
}
}
PGroupCommitInsertRequest request = PGroupCommitInsertRequest.newBuilder()
.setDbId(db.getId())
@ -138,7 +160,7 @@ public class GroupCommitPlanner {
.build();
Future<PGroupCommitInsertResponse> future = BackendServiceProxy.getInstance()
.groupCommitInsert(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
return future;
return future.get();
}
// only for nereids use
@ -147,40 +169,30 @@ public class GroupCommitPlanner {
return null;
}
InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder();
try {
List<Expr> exprs = cols.subList(0, cols.size() - filterSize);
for (Expr expr : exprs) {
if (!expr.isLiteralOrCastExpr() && !(expr instanceof CastExpr)) {
if (expr.getChildren().get(0) instanceof NullLiteral) {
row.addColBuilder().setValue("\\N");
continue;
}
throw new UserException(
List<Expr> exprs = cols.subList(0, cols.size() - filterSize);
for (Expr expr : exprs) {
if (!expr.isLiteralOrCastExpr() && !(expr instanceof CastExpr)) {
if (expr.getChildren().get(0) instanceof NullLiteral) {
row.addColBuilder().setValue(StmtExecutor.NULL_VALUE_FOR_LOAD);
continue;
}
throw new UserException(
"do not support non-literal expr in transactional insert operation: " + expr.toSql());
}
if (expr instanceof NullLiteral) {
row.addColBuilder().setValue("\\N");
} else if (expr.getType() instanceof ArrayType) {
row.addColBuilder().setValue(expr.getStringValueForArray());
} else if (!expr.getChildren().isEmpty()) {
expr.getChildren().forEach(child -> processExprVal(child, row));
} else {
row.addColBuilder().setValue(expr.getStringValue());
}
}
} catch (UserException e) {
throw new RuntimeException(e);
processExprVal(expr, row);
}
return row.build();
}
private static void processExprVal(Expr expr, InternalService.PDataRow.Builder row) {
if (expr.getChildren().isEmpty()) {
row.addColBuilder().setValue(expr.getStringValue());
return;
}
for (Expr child : expr.getChildren()) {
processExprVal(child, row);
if (expr instanceof NullLiteral) {
row.addColBuilder().setValue(StmtExecutor.NULL_VALUE_FOR_LOAD);
} else if (expr.getType() instanceof ArrayType) {
row.addColBuilder().setValue(String.format("\"%s\"", expr.getStringValueForArray()));
} else if (!expr.getChildren().isEmpty()) {
expr.getChildren().forEach(child -> processExprVal(child, row));
} else {
row.addColBuilder().setValue(String.format("\"%s\"", expr.getStringValue()));
}
}
@ -192,5 +204,27 @@ public class GroupCommitPlanner {
return paramsList;
}
public List<InternalService.PDataRow> getRows(NativeInsertStmt stmt) throws UserException {
List<InternalService.PDataRow> rows = new ArrayList<>();
SelectStmt selectStmt = (SelectStmt) (stmt.getQueryStmt());
if (selectStmt.getValueList() != null) {
for (List<Expr> row : selectStmt.getValueList().getRows()) {
InternalService.PDataRow data = StmtExecutor.getRowStringValue(row);
rows.add(data);
}
} else {
List<Expr> exprList = new ArrayList<>();
for (Expr resultExpr : selectStmt.getResultExprs()) {
if (resultExpr instanceof SlotRef) {
exprList.add(((SlotRef) resultExpr).getDesc().getSourceExprs().get(0));
} else {
exprList.add(resultExpr);
}
}
InternalService.PDataRow data = StmtExecutor.getRowStringValue(exprList);
rows.add(data);
}
return rows;
}
}

View File

@ -59,7 +59,6 @@ import org.apache.doris.analysis.SetStmt;
import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.SetVar.SetVarType;
import org.apache.doris.analysis.ShowStmt;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StatementBase;
@ -151,7 +150,6 @@ import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.InternalQueryBuffer;
import org.apache.doris.system.Backend;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
@ -194,7 +192,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@ -279,7 +276,7 @@ public class StmtExecutor {
this.profile = new Profile("Query", context.getSessionVariable().enableProfile());
}
private static InternalService.PDataRow getRowStringValue(List<Expr> cols) throws UserException {
public static InternalService.PDataRow getRowStringValue(List<Expr> cols) throws UserException {
if (cols.isEmpty()) {
return null;
}
@ -292,9 +289,9 @@ public class StmtExecutor {
if (expr instanceof NullLiteral) {
row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
} else if (expr instanceof ArrayLiteral) {
row.addColBuilder().setValue(expr.getStringValueForArray());
row.addColBuilder().setValue(String.format("\"%s\"", expr.getStringValueForArray()));
} else {
row.addColBuilder().setValue(expr.getStringValue());
row.addColBuilder().setValue(String.format("\"%s\"", expr.getStringValue()));
}
}
return row.build();
@ -1830,7 +1827,7 @@ public class StmtExecutor {
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(context.queryId())
.setExecMemLimit(maxExecMemByte).setTimeout((int) timeoutSecond)
.setTimezone(timeZone).setSendBatchParallelism(sendBatchParallelism);
.setTimezone(timeZone).setSendBatchParallelism(sendBatchParallelism).setTrimDoubleQuotes(true);
if (parsedStmt instanceof NativeInsertStmt && ((NativeInsertStmt) parsedStmt).getTargetColumnNames() != null) {
List<String> targetColumnNames = ((NativeInsertStmt) parsedStmt).getTargetColumnNames();
if (targetColumnNames.contains(Column.SEQUENCE_COL) || targetColumnNames.contains(Column.DELETE_SIGN)) {
@ -1896,59 +1893,12 @@ public class StmtExecutor {
txnId = context.getTxnEntry().getTxnConf().getTxnId();
} else if (insertStmt instanceof NativeInsertStmt && ((NativeInsertStmt) insertStmt).isGroupCommit()) {
isGroupCommit = true;
if (Env.getCurrentEnv().getGroupCommitManager().isBlock(insertStmt.getTargetTable().getId())) {
String msg = "insert table " + insertStmt.getTargetTable().getId() + " is blocked on schema change";
LOG.info(msg);
throw new DdlException(msg);
}
NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt;
Backend backend = context.getInsertGroupCommit(insertStmt.getTargetTable().getId());
if (backend == null || !backend.isAlive() || backend.isDecommissioned()) {
List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (allBackendIds.isEmpty()) {
throw new DdlException("No alive backend");
}
Collections.shuffle(allBackendIds);
boolean find = false;
for (Long beId : allBackendIds) {
backend = Env.getCurrentSystemInfo().getBackend(beId);
if (!backend.isDecommissioned()) {
context.setInsertGroupCommit(insertStmt.getTargetTable().getId(), backend);
find = true;
LOG.debug("choose new be {}", backend.getId());
break;
}
}
if (!find) {
throw new DdlException("No suitable backend");
}
}
int maxRetry = 3;
for (int i = 0; i < maxRetry; i++) {
GroupCommitPlanner groupCommitPlanner = nativeInsertStmt.planForGroupCommit(context.queryId);
// handle rows
List<InternalService.PDataRow> rows = new ArrayList<>();
SelectStmt selectStmt = (SelectStmt) insertStmt.getQueryStmt();
if (selectStmt.getValueList() != null) {
for (List<Expr> row : selectStmt.getValueList().getRows()) {
InternalService.PDataRow data = getRowStringValue(row);
rows.add(data);
}
} else {
List<Expr> exprList = new ArrayList<>();
for (Expr resultExpr : selectStmt.getResultExprs()) {
if (resultExpr instanceof SlotRef) {
exprList.add(((SlotRef) resultExpr).getDesc().getSourceExprs().get(0));
} else {
exprList.add(resultExpr);
}
}
InternalService.PDataRow data = getRowStringValue(exprList);
rows.add(data);
}
Future<PGroupCommitInsertResponse> future = groupCommitPlanner
.executeGroupCommitInsert(context, rows);
PGroupCommitInsertResponse response = future.get();
List<InternalService.PDataRow> rows = groupCommitPlanner.getRows(nativeInsertStmt);
PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(context, rows);
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
if (code == TStatusCode.DATA_QUALITY_ERROR) {
LOG.info("group commit insert failed. stmt: {}, backend id: {}, status: {}, "

View File

@ -0,0 +1,53 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 "b" ["k1=v1, k2=v2"]
2 N ["k3=v3, k4=v4"]
4 null []
5 NULL ["k5, k6"]
6 \N ["k7", "k8"]
7 abc []
-- !sql --
6 \N ["k7", "k8"]
-- !sql --
-- !sql --
1 "b" ["k1=v1, k2=v2"]
2 N ["k3=v3, k4=v4"]
4 null []
5 NULL ["k5, k6"]
6 \N ["k7", "k8"]
-- !sql --
6 \N ["k7", "k8"]
-- !sql --
-- !sql --
1 "b" ["k1=v1, k2=v2"]
2 N ["k3=v3, k4=v4"]
4 null []
5 NULL ["k5, k6"]
6 \N ["k7", "k8"]
7 abc []
-- !sql --
6 \N ["k7", "k8"]
-- !sql --
-- !sql --
1 "b" ["k1=v1, k2=v2"]
2 N ["k3=v3, k4=v4"]
4 null []
5 NULL ["k5, k6"]
6 \N ["k7", "k8"]
7 abc \N
-- !sql --
6 \N ["k7", "k8"]
-- !sql --
7 abc \N

View File

@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// The cases is copied from https://github.com/trinodb/trino/tree/master
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
// and modified by Doris.
suite("insert_with_null") {
def table = "insert_with_null"
sql """ DROP TABLE IF EXISTS $table """
sql """
CREATE TABLE ${table} (
`id` int(11) NOT NULL,
`name` varchar(50) NULL,
`score` int(11) NULL default "-1"
) ENGINE=OLAP
DUPLICATE KEY(`id`, `name`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
"""
def getRowCount = { expectedRowCount ->
def retry = 0
while (retry < 30) {
sleep(2000)
def rowCount = sql "select count(*) from ${table}"
logger.info("rowCount: " + rowCount + ", retry: " + retry)
if (rowCount[0][0] >= expectedRowCount) {
break
}
retry++
}
}
def write_modes = ["insert", "txn_insert", "group_commit_legacy", "group_commit_nereids"]
for (def write_mode : write_modes) {
sql """ DROP TABLE IF EXISTS ${table} """
sql """
CREATE TABLE ${table} (
`id` int(11) NOT NULL,
`name` varchar(50) NULL,
`desc` array<String> NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
"""
if (write_mode == "txn_insert") {
sql "begin"
} else if (write_mode == "group_commit_legacy") {
sql """ set enable_insert_group_commit = true; """
sql """ set enable_nereids_dml = false; """
} else if (write_mode == "group_commit_nereids") {
sql """ set enable_insert_group_commit = true; """
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
sql """ set enable_fallback_to_original_planner=false; """
}
sql """ insert into ${table} values(1, '"b"', ["k1=v1, k2=v2"]); """
sql """ insert into ${table} values(2, "\\N", ['k3=v3, k4=v4']); """
// sql """ insert into ${table} values(3, "\\\\N", []); """
sql """ insert into ${table} values(4, 'null', []); """
sql """ insert into ${table} values(5, 'NULL', ['k5, k6']); """
sql """ insert into ${table} values(6, null, ["k7", "k8"]); """
if (write_mode != "txn_insert") {
sql """ insert into ${table}(id, name) values(7, 'abc'); """
getRowCount(6)
} else {
sql "commit"
getRowCount(5)
}
qt_sql """ select * from ${table} order by id asc; """
qt_sql """ select * from ${table} where name is null order by id asc; """
qt_sql """ select * from ${table} where `desc` is null order by id asc; """
}
}