diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java index ceb2ad9270..b6a632cb24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java @@ -81,7 +81,6 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** * insert into select command implementation @@ -400,8 +399,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, } GroupCommitPlanner groupCommitPlanner = new GroupCommitPlanner(physicalOlapTableSink.getDatabase(), physicalOlapTableSink.getTargetTable(), null, ctx.queryId()); - Future future = groupCommitPlanner.executeGroupCommitInsert(ctx, rows); - PGroupCommitInsertResponse response = future.get(); + PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(ctx, rows); TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); if (code == TStatusCode.DATA_QUALITY_ERROR) { LOG.info("group commit insert failed. query id: {}, backend id: {}, status: {}, " diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index 15aa639abe..5dcfdc1ba4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -17,10 +17,12 @@ package org.apache.doris.planner; - import org.apache.doris.analysis.CastExpr; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.NativeInsertStmt; import org.apache.doris.analysis.NullLiteral; +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; @@ -33,6 +35,7 @@ import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest; import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse; import org.apache.doris.proto.Types; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.system.Backend; @@ -55,10 +58,12 @@ import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.stream.Collectors; @@ -78,6 +83,11 @@ public class GroupCommitPlanner { throws UserException, TException { this.db = db; this.table = table; + if (Env.getCurrentEnv().getGroupCommitManager().isBlock(this.table.getId())) { + String msg = "insert table " + this.table.getId() + " is blocked on schema change"; + LOG.info(msg); + throw new DdlException(msg); + } TStreamLoadPutRequest streamLoadPutRequest = new TStreamLoadPutRequest(); if (targetColumnNames != null) { streamLoadPutRequest.setColumns(String.join(",", targetColumnNames)); @@ -91,7 +101,7 @@ public class GroupCommitPlanner { .setTbl(table.getName()) .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN) .setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId) - .setGroupCommit(true); + .setGroupCommit(true).setTrimDoubleQuotes(true); StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest); StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask); // Will using load id as query id in fragment @@ -114,17 +124,29 @@ public class GroupCommitPlanner { execPlanFragmentParamsBytes = ByteString.copyFrom(new TSerializer().serialize(paramsList)); } - public Future executeGroupCommitInsert(ConnectContext ctx, - List rows) throws TException, DdlException, RpcException { + public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx, + List rows) + throws DdlException, RpcException, ExecutionException, InterruptedException { backend = ctx.getInsertGroupCommit(this.table.getId()); - if (backend == null || !backend.isAlive()) { + if (backend == null || !backend.isAlive() || backend.isDecommissioned()) { List allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); if (allBackendIds.isEmpty()) { throw new DdlException("No alive backend"); } Collections.shuffle(allBackendIds); - backend = Env.getCurrentSystemInfo().getBackend(allBackendIds.get(0)); - ctx.setInsertGroupCommit(this.table.getId(), backend); + boolean find = false; + for (Long beId : allBackendIds) { + backend = Env.getCurrentSystemInfo().getBackend(beId); + if (!backend.isDecommissioned()) { + ctx.setInsertGroupCommit(this.table.getId(), backend); + find = true; + LOG.debug("choose new be {}", backend.getId()); + break; + } + } + if (!find) { + throw new DdlException("No suitable backend"); + } } PGroupCommitInsertRequest request = PGroupCommitInsertRequest.newBuilder() .setDbId(db.getId()) @@ -138,7 +160,7 @@ public class GroupCommitPlanner { .build(); Future future = BackendServiceProxy.getInstance() .groupCommitInsert(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); - return future; + return future.get(); } // only for nereids use @@ -147,40 +169,30 @@ public class GroupCommitPlanner { return null; } InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder(); - try { - List exprs = cols.subList(0, cols.size() - filterSize); - for (Expr expr : exprs) { - if (!expr.isLiteralOrCastExpr() && !(expr instanceof CastExpr)) { - if (expr.getChildren().get(0) instanceof NullLiteral) { - row.addColBuilder().setValue("\\N"); - continue; - } - throw new UserException( + List exprs = cols.subList(0, cols.size() - filterSize); + for (Expr expr : exprs) { + if (!expr.isLiteralOrCastExpr() && !(expr instanceof CastExpr)) { + if (expr.getChildren().get(0) instanceof NullLiteral) { + row.addColBuilder().setValue(StmtExecutor.NULL_VALUE_FOR_LOAD); + continue; + } + throw new UserException( "do not support non-literal expr in transactional insert operation: " + expr.toSql()); - } - if (expr instanceof NullLiteral) { - row.addColBuilder().setValue("\\N"); - } else if (expr.getType() instanceof ArrayType) { - row.addColBuilder().setValue(expr.getStringValueForArray()); - } else if (!expr.getChildren().isEmpty()) { - expr.getChildren().forEach(child -> processExprVal(child, row)); - } else { - row.addColBuilder().setValue(expr.getStringValue()); - } } - } catch (UserException e) { - throw new RuntimeException(e); + processExprVal(expr, row); } return row.build(); } private static void processExprVal(Expr expr, InternalService.PDataRow.Builder row) { - if (expr.getChildren().isEmpty()) { - row.addColBuilder().setValue(expr.getStringValue()); - return; - } - for (Expr child : expr.getChildren()) { - processExprVal(child, row); + if (expr instanceof NullLiteral) { + row.addColBuilder().setValue(StmtExecutor.NULL_VALUE_FOR_LOAD); + } else if (expr.getType() instanceof ArrayType) { + row.addColBuilder().setValue(String.format("\"%s\"", expr.getStringValueForArray())); + } else if (!expr.getChildren().isEmpty()) { + expr.getChildren().forEach(child -> processExprVal(child, row)); + } else { + row.addColBuilder().setValue(String.format("\"%s\"", expr.getStringValue())); } } @@ -192,5 +204,27 @@ public class GroupCommitPlanner { return paramsList; } + public List getRows(NativeInsertStmt stmt) throws UserException { + List rows = new ArrayList<>(); + SelectStmt selectStmt = (SelectStmt) (stmt.getQueryStmt()); + if (selectStmt.getValueList() != null) { + for (List row : selectStmt.getValueList().getRows()) { + InternalService.PDataRow data = StmtExecutor.getRowStringValue(row); + rows.add(data); + } + } else { + List exprList = new ArrayList<>(); + for (Expr resultExpr : selectStmt.getResultExprs()) { + if (resultExpr instanceof SlotRef) { + exprList.add(((SlotRef) resultExpr).getDesc().getSourceExprs().get(0)); + } else { + exprList.add(resultExpr); + } + } + InternalService.PDataRow data = StmtExecutor.getRowStringValue(exprList); + rows.add(data); + } + return rows; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 8340e3b13e..237708183c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -59,7 +59,6 @@ import org.apache.doris.analysis.SetStmt; import org.apache.doris.analysis.SetVar; import org.apache.doris.analysis.SetVar.SetVarType; import org.apache.doris.analysis.ShowStmt; -import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; import org.apache.doris.analysis.StatementBase; @@ -151,7 +150,6 @@ import org.apache.doris.rpc.RpcException; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.util.InternalQueryBuffer; -import org.apache.doris.system.Backend; import org.apache.doris.task.LoadEtlTask; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; @@ -194,7 +192,6 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -279,7 +276,7 @@ public class StmtExecutor { this.profile = new Profile("Query", context.getSessionVariable().enableProfile()); } - private static InternalService.PDataRow getRowStringValue(List cols) throws UserException { + public static InternalService.PDataRow getRowStringValue(List cols) throws UserException { if (cols.isEmpty()) { return null; } @@ -292,9 +289,9 @@ public class StmtExecutor { if (expr instanceof NullLiteral) { row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD); } else if (expr instanceof ArrayLiteral) { - row.addColBuilder().setValue(expr.getStringValueForArray()); + row.addColBuilder().setValue(String.format("\"%s\"", expr.getStringValueForArray())); } else { - row.addColBuilder().setValue(expr.getStringValue()); + row.addColBuilder().setValue(String.format("\"%s\"", expr.getStringValue())); } } return row.build(); @@ -1830,7 +1827,7 @@ public class StmtExecutor { .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN) .setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(context.queryId()) .setExecMemLimit(maxExecMemByte).setTimeout((int) timeoutSecond) - .setTimezone(timeZone).setSendBatchParallelism(sendBatchParallelism); + .setTimezone(timeZone).setSendBatchParallelism(sendBatchParallelism).setTrimDoubleQuotes(true); if (parsedStmt instanceof NativeInsertStmt && ((NativeInsertStmt) parsedStmt).getTargetColumnNames() != null) { List targetColumnNames = ((NativeInsertStmt) parsedStmt).getTargetColumnNames(); if (targetColumnNames.contains(Column.SEQUENCE_COL) || targetColumnNames.contains(Column.DELETE_SIGN)) { @@ -1896,59 +1893,12 @@ public class StmtExecutor { txnId = context.getTxnEntry().getTxnConf().getTxnId(); } else if (insertStmt instanceof NativeInsertStmt && ((NativeInsertStmt) insertStmt).isGroupCommit()) { isGroupCommit = true; - if (Env.getCurrentEnv().getGroupCommitManager().isBlock(insertStmt.getTargetTable().getId())) { - String msg = "insert table " + insertStmt.getTargetTable().getId() + " is blocked on schema change"; - LOG.info(msg); - throw new DdlException(msg); - } NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt; - Backend backend = context.getInsertGroupCommit(insertStmt.getTargetTable().getId()); - if (backend == null || !backend.isAlive() || backend.isDecommissioned()) { - List allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (allBackendIds.isEmpty()) { - throw new DdlException("No alive backend"); - } - Collections.shuffle(allBackendIds); - boolean find = false; - for (Long beId : allBackendIds) { - backend = Env.getCurrentSystemInfo().getBackend(beId); - if (!backend.isDecommissioned()) { - context.setInsertGroupCommit(insertStmt.getTargetTable().getId(), backend); - find = true; - LOG.debug("choose new be {}", backend.getId()); - break; - } - } - if (!find) { - throw new DdlException("No suitable backend"); - } - } int maxRetry = 3; for (int i = 0; i < maxRetry; i++) { GroupCommitPlanner groupCommitPlanner = nativeInsertStmt.planForGroupCommit(context.queryId); - // handle rows - List rows = new ArrayList<>(); - SelectStmt selectStmt = (SelectStmt) insertStmt.getQueryStmt(); - if (selectStmt.getValueList() != null) { - for (List row : selectStmt.getValueList().getRows()) { - InternalService.PDataRow data = getRowStringValue(row); - rows.add(data); - } - } else { - List exprList = new ArrayList<>(); - for (Expr resultExpr : selectStmt.getResultExprs()) { - if (resultExpr instanceof SlotRef) { - exprList.add(((SlotRef) resultExpr).getDesc().getSourceExprs().get(0)); - } else { - exprList.add(resultExpr); - } - } - InternalService.PDataRow data = getRowStringValue(exprList); - rows.add(data); - } - Future future = groupCommitPlanner - .executeGroupCommitInsert(context, rows); - PGroupCommitInsertResponse response = future.get(); + List rows = groupCommitPlanner.getRows(nativeInsertStmt); + PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(context, rows); TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); if (code == TStatusCode.DATA_QUALITY_ERROR) { LOG.info("group commit insert failed. stmt: {}, backend id: {}, status: {}, " diff --git a/regression-test/data/insert_p0/insert_with_null.out b/regression-test/data/insert_p0/insert_with_null.out new file mode 100644 index 0000000000..58a7e42319 --- /dev/null +++ b/regression-test/data/insert_p0/insert_with_null.out @@ -0,0 +1,53 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 "b" ["k1=v1, k2=v2"] +2 N ["k3=v3, k4=v4"] +4 null [] +5 NULL ["k5, k6"] +6 \N ["k7", "k8"] +7 abc [] + +-- !sql -- +6 \N ["k7", "k8"] + +-- !sql -- + +-- !sql -- +1 "b" ["k1=v1, k2=v2"] +2 N ["k3=v3, k4=v4"] +4 null [] +5 NULL ["k5, k6"] +6 \N ["k7", "k8"] + +-- !sql -- +6 \N ["k7", "k8"] + +-- !sql -- + +-- !sql -- +1 "b" ["k1=v1, k2=v2"] +2 N ["k3=v3, k4=v4"] +4 null [] +5 NULL ["k5, k6"] +6 \N ["k7", "k8"] +7 abc [] + +-- !sql -- +6 \N ["k7", "k8"] + +-- !sql -- + +-- !sql -- +1 "b" ["k1=v1, k2=v2"] +2 N ["k3=v3, k4=v4"] +4 null [] +5 NULL ["k5, k6"] +6 \N ["k7", "k8"] +7 abc \N + +-- !sql -- +6 \N ["k7", "k8"] + +-- !sql -- +7 abc \N + diff --git a/regression-test/suites/insert_p0/insert_with_null.groovy b/regression-test/suites/insert_p0/insert_with_null.groovy new file mode 100644 index 0000000000..1ac8f46b8c --- /dev/null +++ b/regression-test/suites/insert_p0/insert_with_null.groovy @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// The cases is copied from https://github.com/trinodb/trino/tree/master +// /testing/trino-product-tests/src/main/resources/sql-tests/testcases +// and modified by Doris. + +suite("insert_with_null") { + def table = "insert_with_null" + sql """ DROP TABLE IF EXISTS $table """ + sql """ + CREATE TABLE ${table} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + def getRowCount = { expectedRowCount -> + def retry = 0 + while (retry < 30) { + sleep(2000) + def rowCount = sql "select count(*) from ${table}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + retry++ + } + } + + def write_modes = ["insert", "txn_insert", "group_commit_legacy", "group_commit_nereids"] + + for (def write_mode : write_modes) { + sql """ DROP TABLE IF EXISTS ${table} """ + sql """ + CREATE TABLE ${table} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `desc` array NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + if (write_mode == "txn_insert") { + sql "begin" + } else if (write_mode == "group_commit_legacy") { + sql """ set enable_insert_group_commit = true; """ + sql """ set enable_nereids_dml = false; """ + } else if (write_mode == "group_commit_nereids") { + sql """ set enable_insert_group_commit = true; """ + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + sql """ set enable_fallback_to_original_planner=false; """ + } + + sql """ insert into ${table} values(1, '"b"', ["k1=v1, k2=v2"]); """ + sql """ insert into ${table} values(2, "\\N", ['k3=v3, k4=v4']); """ + // sql """ insert into ${table} values(3, "\\\\N", []); """ + sql """ insert into ${table} values(4, 'null', []); """ + sql """ insert into ${table} values(5, 'NULL', ['k5, k6']); """ + sql """ insert into ${table} values(6, null, ["k7", "k8"]); """ + if (write_mode != "txn_insert") { + sql """ insert into ${table}(id, name) values(7, 'abc'); """ + getRowCount(6) + } else { + sql "commit" + getRowCount(5) + } + + qt_sql """ select * from ${table} order by id asc; """ + qt_sql """ select * from ${table} where name is null order by id asc; """ + qt_sql """ select * from ${table} where `desc` is null order by id asc; """ + } +}