[feature](new-scan) support transactional insert in new scan framework (#13858)

Support running transactional insert operation with new scan framework. eg:

admin set frontend config("enable_new_load_scan_node" = "true");
begin;
insert into tbl1 values(1,2);
insert into tbl1 values(3,4);
insert into tbl1 values(5,6);
commit;
Add some limitation to transactional insert

Do not support non-literal value in insert stmt
Fix some issue about array type:

Forbid cast other non-array type to NESTED array type, it may cause BE crash.
Add getStringValueForArray() method for Expr, to get valid string-formatted array type value.
Add useLocalSessionState=true in regression-test jdbc url
without this config, the jdbc driver will send some init cmd each time it connect to server, such as
select @@session.tx_read_only.
But when we use transactional insert, after begin command, Doris do not support any other type of
stmt except for insert, commit or rollback.
So adding this config to let the jdbc NOT send cmd when connecting.
This commit is contained in:
Mingyu Chen
2022-11-03 08:36:07 +08:00
committed by GitHub
parent 228e5afad8
commit 7b4c2cabb4
29 changed files with 389 additions and 50 deletions

View File

@ -61,7 +61,7 @@ public class ArrayLiteral extends LiteralExpr {
children = new ArrayList<>();
for (LiteralExpr expr : exprs) {
if (expr.getType() == itemType) {
if (expr.getType().equals(itemType)) {
children.add(expr);
} else {
children.add(expr.castTo(itemType));
@ -102,11 +102,17 @@ public class ArrayLiteral extends LiteralExpr {
@Override
public String getStringValue() {
List<String> list = new ArrayList<>(children.size());
children.forEach(v -> list.add(((LiteralExpr) v).getStringValue()));
children.forEach(v -> list.add(v.getStringValue()));
return "ARRAY[" + StringUtils.join(list, ", ") + "]";
}
@Override
public String getStringValueForArray() {
List<String> list = new ArrayList<>(children.size());
children.forEach(v -> list.add(v.getStringValueForArray()));
return "[" + StringUtils.join(list, ", ") + "]";
}
@Override
protected void toThrift(TExprNode msg) {
msg.node_type = TExprNodeType.ARRAY_LITERAL;
@ -164,3 +170,4 @@ public class ArrayLiteral extends LiteralExpr {
}
}
}

View File

@ -105,6 +105,11 @@ public class BoolLiteral extends LiteralExpr {
return value ? "1" : "0";
}
@Override
public String getStringValueForArray() {
return "\"" + getStringValue() + "\"";
}
@Override
public ByteBuffer getHashValue(PrimitiveType type) {
byte v = (byte) (value ? 1 : 0);

View File

@ -573,4 +573,10 @@ public class CastExpr extends Expr {
"doris::CastFunctions::cast_to_array_val", null, null, true);
}
}
@Override
public String getStringValueForArray() {
return children.get(0).getStringValueForArray();
}
}

View File

@ -557,6 +557,11 @@ public class DateLiteral extends LiteralExpr {
}
}
@Override
public String getStringValueForArray() {
return "\"" + getStringValue() + "\"";
}
public void roundCeiling(int newScale) {
Preconditions.checkArgument(type.isDatetimeV2());
long remain = Double.valueOf(microsecond % (Math.pow(10, 6 - newScale))).longValue();

View File

@ -220,6 +220,11 @@ public class DecimalLiteral extends LiteralExpr {
return value.toString();
}
@Override
public String getStringValueForArray() {
return "\"" + getStringValue() + "\"";
}
@Override
public long getLongValue() {
return value.longValue();

View File

@ -1373,8 +1373,8 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
// "cast %s to %s", this.type, targetType);
// TODO(zc): use implicit cast
if (!Type.canCastTo(this.type, targetType)) {
throw new AnalysisException("type not match, originType=" + this.type
+ ", targeType=" + targetType);
throw new AnalysisException("can not cast from origin type " + this.type
+ " to target type=" + targetType);
}
return uncheckedCastTo(targetType);
@ -1879,12 +1879,18 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
}
public String getStringValue() {
if (this instanceof LiteralExpr) {
return ((LiteralExpr) this).getStringValue();
}
return "";
}
// A special method only for array literal, all primitive type in array
// will be wrapped by double quote. eg:
// ["1", "2", "3"]
// ["a", "b", "c"]
// [["1", "2", "3"], ["1"], ["3"]]
public String getStringValueForArray() {
return null;
}
public static Expr getFirstBoundChild(Expr expr, List<TupleId> tids) {
for (Expr child : expr.getChildren()) {
if (child.isBoundByTupleIds(tids)) {
@ -2016,4 +2022,17 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
child.materializeSrcExpr();
}
}
// This is only for transactional insert operation,
// to check it the given value in insert stmt is LiteralExpr.
// And if we write "1" to a boolean column, there will be a cast(1 as boolean) expr,
// which is also accepted.
public boolean isLiteralOrCastExpr() {
if (this instanceof CastExpr) {
return children.get(0) instanceof LiteralExpr;
} else {
return this instanceof LiteralExpr;
}
}
}

View File

@ -136,6 +136,11 @@ public class FloatLiteral extends LiteralExpr {
return Double.toString(value);
}
@Override
public String getStringValueForArray() {
return "\"" + getStringValue() + "\"";
}
public static Type getDefaultTimeType(Type type) throws AnalysisException {
switch (type.getPrimitiveType()) {
case TIME:

View File

@ -266,6 +266,11 @@ public class IntLiteral extends LiteralExpr {
return Long.toString(value);
}
@Override
public String getStringValueForArray() {
return "\"" + getStringValue() + "\"";
}
@Override
public long getLongValue() {
return value;

View File

@ -96,6 +96,16 @@ public class JsonLiteral extends LiteralExpr {
msg.json_literal = new TJsonLiteral(getUnescapedValue());
}
@Override
public String getStringValue() {
return null;
}
@Override
public String getStringValueForArray() {
return null;
}
public String getUnescapedValue() {
// Unescape string exactly like Hive does. Hive's method assumes
// quotes so we add them here to reuse Hive's code.

View File

@ -190,6 +190,11 @@ public class LargeIntLiteral extends LiteralExpr {
return value.toString();
}
@Override
public String getStringValueForArray() {
return "\"" + getStringValue() + "\"";
}
@Override
public long getLongValue() {
return value.longValue();

View File

@ -187,9 +187,11 @@ public abstract class LiteralExpr extends Expr implements Comparable<LiteralExpr
// literal values to the metastore rather than to Palo backends. This is similar to
// the toSql() method, but does not perform any formatting of the string values. Neither
// method unescapes string values.
public String getStringValue() {
return null;
}
@Override
public abstract String getStringValue();
@Override
public abstract String getStringValueForArray();
public long getLongValue() {
return 0;

View File

@ -66,6 +66,16 @@ public final class MaxLiteral extends LiteralExpr {
public void write(DataOutput out) throws IOException {
}
@Override
public String getStringValue() {
return null;
}
@Override
public String getStringValueForArray() {
return null;
}
public void readFields(DataInput in) throws IOException {
}

View File

@ -101,6 +101,13 @@ public class NullLiteral extends LiteralExpr {
return "NULL";
}
// the null value inside an array is represented as "null", for exampe:
// [null, null]. Not same as other primitive type to represent as \N.
@Override
public String getStringValueForArray() {
return "null";
}
@Override
public long getLongValue() {
return 0;

View File

@ -151,6 +151,11 @@ public class StringLiteral extends LiteralExpr {
return value;
}
@Override
public String getStringValueForArray() {
return "\"" + getStringValue() + "\"";
}
@Override
public long getLongValue() {
return Long.valueOf(value);

View File

@ -1203,3 +1203,4 @@ public enum PrimitiveType {
}
}
}

View File

@ -483,13 +483,17 @@ public abstract class Type {
return false;
}
public static boolean canCastTo(Type t1, Type t2) {
if (t1.isScalarType() && t2.isScalarType()) {
return ScalarType.canCastTo((ScalarType) t1, (ScalarType) t2);
} else if (t1.isArrayType() && t2.isArrayType()) {
return ArrayType.canCastTo((ArrayType) t1, (ArrayType) t2);
public static boolean canCastTo(Type sourceType, Type targetType) {
if (sourceType.isScalarType() && targetType.isScalarType()) {
return ScalarType.canCastTo((ScalarType) sourceType, (ScalarType) targetType);
} else if (sourceType.isArrayType() && targetType.isArrayType()) {
return ArrayType.canCastTo((ArrayType) sourceType, (ArrayType) targetType);
} else if (targetType.isArrayType() && !((ArrayType) targetType).getItemType().isScalarType()
&& !sourceType.isNull()) {
// TODO: current not support cast any non-array type(except for null) to nested array type.
return false;
}
return t1.isNull() || t1.getPrimitiveType().isCharFamily();
return sourceType.isNull() || sourceType.getPrimitiveType().isCharFamily();
}
/**
@ -1718,3 +1722,4 @@ public abstract class Type {
return false;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.qe;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.proto.InternalService;
@ -31,6 +32,7 @@ import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TBrokerRangeDesc;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentParamsList;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRangeParams;
@ -75,8 +77,15 @@ public class InsertStreamTxnExecutor {
tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.params.per_node_scan_ranges.entrySet()) {
for (TScanRangeParams scanRangeParams : entry.getValue()) {
for (TBrokerRangeDesc desc : scanRangeParams.scan_range.broker_scan_range.ranges) {
desc.setFormatType(TFileFormatType.FORMAT_PROTO);
if (Config.enable_new_load_scan_node && Config.enable_vectorized_load) {
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
TFileFormatType.FORMAT_PROTO);
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
TFileCompressType.PLAIN);
} else {
for (TBrokerRangeDesc desc : scanRangeParams.scan_range.broker_scan_range.ranges) {
desc.setFormatType(TFileFormatType.FORMAT_PROTO);
}
}
}
}

View File

@ -220,14 +220,20 @@ public class StmtExecutor implements ProfileWriter {
this.context.setStatementContext(statementContext);
}
public static InternalService.PDataRow getRowStringValue(List<Expr> cols) {
if (cols.size() == 0) {
public static InternalService.PDataRow getRowStringValue(List<Expr> cols) throws UserException {
if (cols.isEmpty()) {
return null;
}
InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder();
for (Expr expr : cols) {
if (!expr.isLiteralOrCastExpr()) {
throw new UserException(
"do not support non-literal expr in transactional insert operation: " + expr.toSql());
}
if (expr instanceof NullLiteral) {
row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
} else if (expr instanceof ArrayLiteral) {
row.addColBuilder().setValue(expr.getStringValueForArray());
} else {
row.addColBuilder().setValue(expr.getStringValue());
}
@ -540,8 +546,8 @@ public class StmtExecutor implements ProfileWriter {
queryType = "Insert";
}
} catch (Throwable t) {
LOG.warn("handle insert stmt fail", t);
// the transaction of this insert may already begun, we will abort it at outer finally block.
LOG.warn("handle insert stmt fail: {}", t.getMessage());
// the transaction of this insert may already begin, we will abort it at outer finally block.
throw t;
}
} else if (parsedStmt instanceof DdlStmt) {
@ -1777,3 +1783,4 @@ public class StmtExecutor implements ProfileWriter {
return parsedStmt;
}
}

View File

@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
public class ArrayLiteralTest {
@Test
public void testGetStringValueForArray() throws AnalysisException {
IntLiteral intLiteral1 = new IntLiteral(1);
FloatLiteral floatLiteral = new FloatLiteral("2.15");
BoolLiteral boolLiteral = new BoolLiteral(true);
StringLiteral stringLiteral = new StringLiteral("shortstring");
LargeIntLiteral largeIntLiteral = new LargeIntLiteral("1000000000000000000000");
NullLiteral nullLiteral = new NullLiteral();
DateLiteral dateLiteral = new DateLiteral("2022-10-10", Type.DATE);
DateLiteral datetimeLiteral = new DateLiteral("2022-10-10 12:10:10", Type.DATETIME);
ArrayLiteral arrayLiteral1 = new ArrayLiteral(intLiteral1, floatLiteral);
Assert.assertEquals("[\"1.0\", \"2.15\"]", arrayLiteral1.getStringValueForArray());
ArrayLiteral arrayLiteral2 = new ArrayLiteral(boolLiteral, boolLiteral);
Assert.assertEquals("[\"1\", \"1\"]", arrayLiteral2.getStringValueForArray());
ArrayLiteral arrayLiteral3 = new ArrayLiteral(stringLiteral, stringLiteral);
Assert.assertEquals("[\"shortstring\", \"shortstring\"]", arrayLiteral3.getStringValueForArray());
ArrayLiteral arrayLiteral4 = new ArrayLiteral(largeIntLiteral, largeIntLiteral);
Assert.assertEquals("[\"1000000000000000000000\", \"1000000000000000000000\"]", arrayLiteral4.getStringValueForArray());
ArrayLiteral arrayLiteral5 = new ArrayLiteral(nullLiteral, nullLiteral);
Assert.assertEquals("[null, null]", arrayLiteral5.getStringValueForArray());
ArrayLiteral arrayLiteral6 = new ArrayLiteral(dateLiteral, dateLiteral);
Assert.assertEquals("[\"2022-10-10\", \"2022-10-10\"]", arrayLiteral6.getStringValueForArray());
ArrayLiteral arrayLiteral7 = new ArrayLiteral(datetimeLiteral, datetimeLiteral);
Assert.assertEquals("[\"2022-10-10 12:10:10\", \"2022-10-10 12:10:10\"]", arrayLiteral7.getStringValueForArray());
ArrayLiteral arrayLiteral8 = new ArrayLiteral(arrayLiteral7, arrayLiteral7);
Assert.assertEquals("[[\"2022-10-10 12:10:10\", \"2022-10-10 12:10:10\"], [\"2022-10-10 12:10:10\", \"2022-10-10 12:10:10\"]]",
arrayLiteral8.getStringValueForArray());
ArrayLiteral arrayLiteral9 = new ArrayLiteral();
Assert.assertEquals("[]", arrayLiteral9.getStringValueForArray());
}
}

View File

@ -21,9 +21,13 @@ import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.utframe.UtFrameUtils;
@ -104,7 +108,47 @@ public class InsertArrayStmtTest {
Assert.assertSame(PrimitiveType.INT, ((ArrayType) arrayLiteral.getType()).getItemType().getPrimitiveType());
connectContext.setQueryId(new TUniqueId(3, 0));
ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "type not match",
ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "can not cast from origin type",
() -> parseAndAnalyze("insert into test.table1 values (1, [[1, 2], [3, 4]]);"));
}
@Test
public void testTransactionalInsert() throws Exception {
Config.enable_new_load_scan_node = true;
ExceptionChecker.expectThrowsNoException(
() -> createTable("CREATE TABLE test.`txn_insert_tbl` (\n"
+ " `k1` int(11) NULL,\n"
+ " `k2` double NULL,\n"
+ " `k3` varchar(100) NULL,\n"
+ " `k4` array<int(11)> NULL,\n"
+ " `k5` array<boolean> NULL\n"
+ ") ENGINE=OLAP\n"
+ "DUPLICATE KEY(`k1`)\n"
+ "COMMENT 'OLAP'\n"
+ "DISTRIBUTED BY HASH(`k1`) BUCKETS 1\n"
+ "PROPERTIES (\n"
+ "\"replication_allocation\" = \"tag.location.default: 1\",\n"
+ "\"in_memory\" = \"false\",\n"
+ "\"storage_format\" = \"V2\",\n"
+ "\"disable_auto_compaction\" = \"false\"\n"
+ ");"));
SqlParser parser = new SqlParser(new SqlScanner(
new StringReader("begin"), connectContext.getSessionVariable().getSqlMode()
));
TransactionBeginStmt beginStmt = (TransactionBeginStmt) SqlParserUtils.getFirstStmt(parser);
StmtExecutor stmtExecutor = new StmtExecutor(connectContext, beginStmt);
stmtExecutor.execute();
parser = new SqlParser(new SqlScanner(
new StringReader("insert into test.txn_insert_tbl values(2, 3.3, \"xyz\", [1], [1, 0]);"),
connectContext.getSessionVariable().getSqlMode()
));
InsertStmt insertStmt = (InsertStmt) SqlParserUtils.getFirstStmt(parser);
stmtExecutor = new StmtExecutor(connectContext, insertStmt);
stmtExecutor.execute();
QueryState state = connectContext.getState();
Assert.assertEquals(MysqlStateType.OK, state.getStateType());
}
}