diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 7296ae4628..46b8aa44ba 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -517,32 +517,32 @@ void FragmentMgr::_exec_actual(std::shared_ptr exec_state, Fi Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { if (params.txn_conf.need_txn) { - StreamLoadContext* stream_load_cxt = new StreamLoadContext(_exec_env); - stream_load_cxt->db = params.txn_conf.db; - stream_load_cxt->db_id = params.txn_conf.db_id; - stream_load_cxt->table = params.txn_conf.tbl; - stream_load_cxt->txn_id = params.txn_conf.txn_id; - stream_load_cxt->id = UniqueId(params.params.query_id); - stream_load_cxt->put_result.params = params; - stream_load_cxt->use_streaming = true; - stream_load_cxt->load_type = TLoadType::MANUL_LOAD; - stream_load_cxt->load_src_type = TLoadSourceType::RAW; - stream_load_cxt->label = params.import_label; - stream_load_cxt->format = TFileFormatType::FORMAT_CSV_PLAIN; - stream_load_cxt->timeout_second = 3600; - stream_load_cxt->auth.auth_code_uuid = params.txn_conf.auth_code_uuid; - stream_load_cxt->need_commit_self = true; - stream_load_cxt->need_rollback = true; + StreamLoadContext* stream_load_ctx = new StreamLoadContext(_exec_env); + stream_load_ctx->db = params.txn_conf.db; + stream_load_ctx->db_id = params.txn_conf.db_id; + stream_load_ctx->table = params.txn_conf.tbl; + stream_load_ctx->txn_id = params.txn_conf.txn_id; + stream_load_ctx->id = UniqueId(params.params.query_id); + stream_load_ctx->put_result.params = params; + stream_load_ctx->use_streaming = true; + stream_load_ctx->load_type = TLoadType::MANUL_LOAD; + stream_load_ctx->load_src_type = TLoadSourceType::RAW; + stream_load_ctx->label = params.import_label; + stream_load_ctx->format = TFileFormatType::FORMAT_CSV_PLAIN; + stream_load_ctx->timeout_second = 3600; + stream_load_ctx->auth.auth_code_uuid = params.txn_conf.auth_code_uuid; + stream_load_ctx->need_commit_self = true; + stream_load_ctx->need_rollback = true; // total_length == -1 means read one message from pipe in once time, don't care the length. auto pipe = std::make_shared(kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, -1 /* total_length */, true /* use_proto */); - stream_load_cxt->body_sink = pipe; - stream_load_cxt->max_filter_ratio = params.txn_conf.max_filter_ratio; + stream_load_ctx->body_sink = pipe; + stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio; - RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(stream_load_cxt->id, pipe)); + RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(stream_load_ctx->id, pipe)); - RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_cxt)); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx)); set_pipe(params.params.fragment_instance_id, pipe); return Status::OK(); } else { diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 20c8d71a96..b7661e4109 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -51,6 +51,7 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte _decompressor(nullptr), _skip_lines(0) { _file_format_type = _params.format_type; + _is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO; _file_compress_type = _params.compress_type; _size = _range.size; @@ -119,7 +120,9 @@ Status CsvReader::init_reader(bool is_load) { case TFileFormatType::FORMAT_CSV_DEFLATE: _line_reader.reset(new PlainTextLineReader(_profile, real_reader, _decompressor.get(), _size, _line_delimiter, _line_delimiter_length)); - + break; + case TFileFormatType::FORMAT_PROTO: + _line_reader.reset(new PlainBinaryLineReader(real_reader)); break; default: return Status::InternalError( @@ -209,6 +212,7 @@ Status CsvReader::_create_decompressor() { } } else { switch (_file_format_type) { + case TFileFormatType::FORMAT_PROTO: case TFileFormatType::FORMAT_CSV_PLAIN: compress_type = CompressType::UNCOMPRESSED; break; @@ -265,7 +269,7 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, size_t* ro } Status CsvReader::_line_split_to_values(const Slice& line, bool* success) { - if (!validate_utf8(line.data, line.size)) { + if (!_is_proto_format && !validate_utf8(line.data, line.size)) { if (!_is_load) { return Status::InternalError("Only support csv data in utf8 codec"); } else { diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index d1ab1ebb26..cbb1b2c882 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -76,6 +76,7 @@ private: std::unique_ptr _decompressor; TFileFormatType::type _file_format_type; + bool _is_proto_format; TFileCompressType::type _file_compress_type; int64_t _size; // When we fetch range start from 0, header_type="csv_with_names" skip first line diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 9df3722b95..2ee83b014a 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -489,7 +489,8 @@ Status VFileScanner::_get_next_reader() { case TFileFormatType::FORMAT_CSV_BZ2: case TFileFormatType::FORMAT_CSV_LZ4FRAME: case TFileFormatType::FORMAT_CSV_LZOP: - case TFileFormatType::FORMAT_CSV_DEFLATE: { + case TFileFormatType::FORMAT_CSV_DEFLATE: + case TFileFormatType::FORMAT_PROTO: { _cur_reader.reset( new CsvReader(_state, _profile, &_counter, _params, range, _file_slot_descs)); init_status = ((CsvReader*)(_cur_reader.get()))->init_reader(_is_load); diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp index 25b14ba3a9..68f3b05cc1 100644 --- a/be/src/vec/exprs/vcast_expr.cpp +++ b/be/src/vec/exprs/vcast_expr.cpp @@ -46,7 +46,6 @@ doris::Status VCastExpr::prepare(doris::RuntimeState* state, const doris::RowDes argument_template.reserve(2); argument_template.emplace_back(std::move(child_column), child->data_type(), child_name); argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name); - _function = SimpleFunctionFactory::instance().get_function(function_name, argument_template, _data_type); diff --git a/be/src/vec/functions/simple_function_factory.h b/be/src/vec/functions/simple_function_factory.h index cc7fca5008..3ffe5107d6 100644 --- a/be/src/vec/functions/simple_function_factory.h +++ b/be/src/vec/functions/simple_function_factory.h @@ -148,7 +148,7 @@ public: return iter->second()->build(arguments, return_type); } - LOG(WARNING) << fmt::format("Function signature {} is not founded", key_str); + LOG(WARNING) << fmt::format("Function signature {} is not found", key_str); return nullptr; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ArrayLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ArrayLiteral.java index 989488610d..c0331c0318 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ArrayLiteral.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ArrayLiteral.java @@ -61,7 +61,7 @@ public class ArrayLiteral extends LiteralExpr { children = new ArrayList<>(); for (LiteralExpr expr : exprs) { - if (expr.getType() == itemType) { + if (expr.getType().equals(itemType)) { children.add(expr); } else { children.add(expr.castTo(itemType)); @@ -102,11 +102,17 @@ public class ArrayLiteral extends LiteralExpr { @Override public String getStringValue() { List list = new ArrayList<>(children.size()); - children.forEach(v -> list.add(((LiteralExpr) v).getStringValue())); - + children.forEach(v -> list.add(v.getStringValue())); return "ARRAY[" + StringUtils.join(list, ", ") + "]"; } + @Override + public String getStringValueForArray() { + List list = new ArrayList<>(children.size()); + children.forEach(v -> list.add(v.getStringValueForArray())); + return "[" + StringUtils.join(list, ", ") + "]"; + } + @Override protected void toThrift(TExprNode msg) { msg.node_type = TExprNodeType.ARRAY_LITERAL; @@ -164,3 +170,4 @@ public class ArrayLiteral extends LiteralExpr { } } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BoolLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BoolLiteral.java index e434668286..c37a620235 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BoolLiteral.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BoolLiteral.java @@ -105,6 +105,11 @@ public class BoolLiteral extends LiteralExpr { return value ? "1" : "0"; } + @Override + public String getStringValueForArray() { + return "\"" + getStringValue() + "\""; + } + @Override public ByteBuffer getHashValue(PrimitiveType type) { byte v = (byte) (value ? 1 : 0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java index 87bfff145a..f72c3cce8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java @@ -573,4 +573,10 @@ public class CastExpr extends Expr { "doris::CastFunctions::cast_to_array_val", null, null, true); } } + + @Override + public String getStringValueForArray() { + return children.get(0).getStringValueForArray(); + } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DateLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DateLiteral.java index 9cd48b53ae..83f4473114 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DateLiteral.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DateLiteral.java @@ -557,6 +557,11 @@ public class DateLiteral extends LiteralExpr { } } + @Override + public String getStringValueForArray() { + return "\"" + getStringValue() + "\""; + } + public void roundCeiling(int newScale) { Preconditions.checkArgument(type.isDatetimeV2()); long remain = Double.valueOf(microsecond % (Math.pow(10, 6 - newScale))).longValue(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DecimalLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DecimalLiteral.java index 53ca2ba9ca..1969c4bbed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DecimalLiteral.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DecimalLiteral.java @@ -220,6 +220,11 @@ public class DecimalLiteral extends LiteralExpr { return value.toString(); } + @Override + public String getStringValueForArray() { + return "\"" + getStringValue() + "\""; + } + @Override public long getLongValue() { return value.longValue(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index 7753472864..fa60ac7e7d 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -1373,8 +1373,8 @@ public abstract class Expr extends TreeNode implements ParseNode, Cloneabl // "cast %s to %s", this.type, targetType); // TODO(zc): use implicit cast if (!Type.canCastTo(this.type, targetType)) { - throw new AnalysisException("type not match, originType=" + this.type - + ", targeType=" + targetType); + throw new AnalysisException("can not cast from origin type " + this.type + + " to target type=" + targetType); } return uncheckedCastTo(targetType); @@ -1879,12 +1879,18 @@ public abstract class Expr extends TreeNode implements ParseNode, Cloneabl } public String getStringValue() { - if (this instanceof LiteralExpr) { - return ((LiteralExpr) this).getStringValue(); - } return ""; } + // A special method only for array literal, all primitive type in array + // will be wrapped by double quote. eg: + // ["1", "2", "3"] + // ["a", "b", "c"] + // [["1", "2", "3"], ["1"], ["3"]] + public String getStringValueForArray() { + return null; + } + public static Expr getFirstBoundChild(Expr expr, List tids) { for (Expr child : expr.getChildren()) { if (child.isBoundByTupleIds(tids)) { @@ -2016,4 +2022,17 @@ public abstract class Expr extends TreeNode implements ParseNode, Cloneabl child.materializeSrcExpr(); } } + + // This is only for transactional insert operation, + // to check it the given value in insert stmt is LiteralExpr. + // And if we write "1" to a boolean column, there will be a cast(1 as boolean) expr, + // which is also accepted. + public boolean isLiteralOrCastExpr() { + if (this instanceof CastExpr) { + return children.get(0) instanceof LiteralExpr; + } else { + return this instanceof LiteralExpr; + } + } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/FloatLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/FloatLiteral.java index 75d5961d43..9a382c7cf2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FloatLiteral.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FloatLiteral.java @@ -136,6 +136,11 @@ public class FloatLiteral extends LiteralExpr { return Double.toString(value); } + @Override + public String getStringValueForArray() { + return "\"" + getStringValue() + "\""; + } + public static Type getDefaultTimeType(Type type) throws AnalysisException { switch (type.getPrimitiveType()) { case TIME: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java index 6a66f9d2c4..248a08e5e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java @@ -266,6 +266,11 @@ public class IntLiteral extends LiteralExpr { return Long.toString(value); } + @Override + public String getStringValueForArray() { + return "\"" + getStringValue() + "\""; + } + @Override public long getLongValue() { return value; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/JsonLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/JsonLiteral.java index 459a126bad..d573d07985 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/JsonLiteral.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/JsonLiteral.java @@ -96,6 +96,16 @@ public class JsonLiteral extends LiteralExpr { msg.json_literal = new TJsonLiteral(getUnescapedValue()); } + @Override + public String getStringValue() { + return null; + } + + @Override + public String getStringValueForArray() { + return null; + } + public String getUnescapedValue() { // Unescape string exactly like Hive does. Hive's method assumes // quotes so we add them here to reuse Hive's code. diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java index b570b4e0b3..7fb5518a40 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java @@ -190,6 +190,11 @@ public class LargeIntLiteral extends LiteralExpr { return value.toString(); } + @Override + public String getStringValueForArray() { + return "\"" + getStringValue() + "\""; + } + @Override public long getLongValue() { return value.longValue(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java index 831773b539..0528e33eed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java @@ -187,9 +187,11 @@ public abstract class LiteralExpr extends Expr implements Comparable> entry : tRequest.params.per_node_scan_ranges.entrySet()) { for (TScanRangeParams scanRangeParams : entry.getValue()) { - for (TBrokerRangeDesc desc : scanRangeParams.scan_range.broker_scan_range.ranges) { - desc.setFormatType(TFileFormatType.FORMAT_PROTO); + if (Config.enable_new_load_scan_node && Config.enable_vectorized_load) { + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( + TFileFormatType.FORMAT_PROTO); + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType( + TFileCompressType.PLAIN); + } else { + for (TBrokerRangeDesc desc : scanRangeParams.scan_range.broker_scan_range.ranges) { + desc.setFormatType(TFileFormatType.FORMAT_PROTO); + } } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 52764e84bb..d60b7fb97d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -220,14 +220,20 @@ public class StmtExecutor implements ProfileWriter { this.context.setStatementContext(statementContext); } - public static InternalService.PDataRow getRowStringValue(List cols) { - if (cols.size() == 0) { + public static InternalService.PDataRow getRowStringValue(List cols) throws UserException { + if (cols.isEmpty()) { return null; } InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder(); for (Expr expr : cols) { + if (!expr.isLiteralOrCastExpr()) { + throw new UserException( + "do not support non-literal expr in transactional insert operation: " + expr.toSql()); + } if (expr instanceof NullLiteral) { row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD); + } else if (expr instanceof ArrayLiteral) { + row.addColBuilder().setValue(expr.getStringValueForArray()); } else { row.addColBuilder().setValue(expr.getStringValue()); } @@ -540,8 +546,8 @@ public class StmtExecutor implements ProfileWriter { queryType = "Insert"; } } catch (Throwable t) { - LOG.warn("handle insert stmt fail", t); - // the transaction of this insert may already begun, we will abort it at outer finally block. + LOG.warn("handle insert stmt fail: {}", t.getMessage()); + // the transaction of this insert may already begin, we will abort it at outer finally block. throw t; } } else if (parsedStmt instanceof DdlStmt) { @@ -1777,3 +1783,4 @@ public class StmtExecutor implements ProfileWriter { return parsedStmt; } } + diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/ArrayLiteralTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/ArrayLiteralTest.java new file mode 100644 index 0000000000..73e4f8f843 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/ArrayLiteralTest.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; + +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +public class ArrayLiteralTest { + @Test + public void testGetStringValueForArray() throws AnalysisException { + IntLiteral intLiteral1 = new IntLiteral(1); + FloatLiteral floatLiteral = new FloatLiteral("2.15"); + BoolLiteral boolLiteral = new BoolLiteral(true); + StringLiteral stringLiteral = new StringLiteral("shortstring"); + LargeIntLiteral largeIntLiteral = new LargeIntLiteral("1000000000000000000000"); + NullLiteral nullLiteral = new NullLiteral(); + DateLiteral dateLiteral = new DateLiteral("2022-10-10", Type.DATE); + DateLiteral datetimeLiteral = new DateLiteral("2022-10-10 12:10:10", Type.DATETIME); + ArrayLiteral arrayLiteral1 = new ArrayLiteral(intLiteral1, floatLiteral); + Assert.assertEquals("[\"1.0\", \"2.15\"]", arrayLiteral1.getStringValueForArray()); + + ArrayLiteral arrayLiteral2 = new ArrayLiteral(boolLiteral, boolLiteral); + Assert.assertEquals("[\"1\", \"1\"]", arrayLiteral2.getStringValueForArray()); + + ArrayLiteral arrayLiteral3 = new ArrayLiteral(stringLiteral, stringLiteral); + Assert.assertEquals("[\"shortstring\", \"shortstring\"]", arrayLiteral3.getStringValueForArray()); + + ArrayLiteral arrayLiteral4 = new ArrayLiteral(largeIntLiteral, largeIntLiteral); + Assert.assertEquals("[\"1000000000000000000000\", \"1000000000000000000000\"]", arrayLiteral4.getStringValueForArray()); + + ArrayLiteral arrayLiteral5 = new ArrayLiteral(nullLiteral, nullLiteral); + Assert.assertEquals("[null, null]", arrayLiteral5.getStringValueForArray()); + + ArrayLiteral arrayLiteral6 = new ArrayLiteral(dateLiteral, dateLiteral); + Assert.assertEquals("[\"2022-10-10\", \"2022-10-10\"]", arrayLiteral6.getStringValueForArray()); + + ArrayLiteral arrayLiteral7 = new ArrayLiteral(datetimeLiteral, datetimeLiteral); + Assert.assertEquals("[\"2022-10-10 12:10:10\", \"2022-10-10 12:10:10\"]", arrayLiteral7.getStringValueForArray()); + + ArrayLiteral arrayLiteral8 = new ArrayLiteral(arrayLiteral7, arrayLiteral7); + Assert.assertEquals("[[\"2022-10-10 12:10:10\", \"2022-10-10 12:10:10\"], [\"2022-10-10 12:10:10\", \"2022-10-10 12:10:10\"]]", + arrayLiteral8.getStringValueForArray()); + + ArrayLiteral arrayLiteral9 = new ArrayLiteral(); + Assert.assertEquals("[]", arrayLiteral9.getStringValueForArray()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertArrayStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertArrayStmtTest.java index 5c69308331..c82e1b0844 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertArrayStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertArrayStmtTest.java @@ -21,9 +21,13 @@ import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState; +import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.qe.StmtExecutor; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.utframe.UtFrameUtils; @@ -104,7 +108,47 @@ public class InsertArrayStmtTest { Assert.assertSame(PrimitiveType.INT, ((ArrayType) arrayLiteral.getType()).getItemType().getPrimitiveType()); connectContext.setQueryId(new TUniqueId(3, 0)); - ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "type not match", + ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "can not cast from origin type", () -> parseAndAnalyze("insert into test.table1 values (1, [[1, 2], [3, 4]]);")); } + + @Test + public void testTransactionalInsert() throws Exception { + Config.enable_new_load_scan_node = true; + ExceptionChecker.expectThrowsNoException( + () -> createTable("CREATE TABLE test.`txn_insert_tbl` (\n" + + " `k1` int(11) NULL,\n" + + " `k2` double NULL,\n" + + " `k3` varchar(100) NULL,\n" + + " `k4` array NULL,\n" + + " `k5` array NULL\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(`k1`)\n" + + "COMMENT 'OLAP'\n" + + "DISTRIBUTED BY HASH(`k1`) BUCKETS 1\n" + + "PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"in_memory\" = \"false\",\n" + + "\"storage_format\" = \"V2\",\n" + + "\"disable_auto_compaction\" = \"false\"\n" + + ");")); + + SqlParser parser = new SqlParser(new SqlScanner( + new StringReader("begin"), connectContext.getSessionVariable().getSqlMode() + )); + TransactionBeginStmt beginStmt = (TransactionBeginStmt) SqlParserUtils.getFirstStmt(parser); + StmtExecutor stmtExecutor = new StmtExecutor(connectContext, beginStmt); + stmtExecutor.execute(); + + parser = new SqlParser(new SqlScanner( + new StringReader("insert into test.txn_insert_tbl values(2, 3.3, \"xyz\", [1], [1, 0]);"), + connectContext.getSessionVariable().getSqlMode() + )); + InsertStmt insertStmt = (InsertStmt) SqlParserUtils.getFirstStmt(parser); + stmtExecutor = new StmtExecutor(connectContext, insertStmt); + stmtExecutor.execute(); + QueryState state = connectContext.getState(); + Assert.assertEquals(MysqlStateType.OK, state.getStateType()); + } } + diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index 6791f7cacd..49d20902c1 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -20,7 +20,10 @@ // **Note**: default db will be create if not exist defaultDb = "regression_test" -jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?" +// add useLocalSessionState so that the jdbc will not send +// init cmd like: select @@session.tx_read_only +// at each time we connect. +jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true" jdbcUser = "root" jdbcPassword = "" diff --git a/regression-test/data/insert_p0/txn_insert.out b/regression-test/data/insert_p0/txn_insert.out new file mode 100644 index 0000000000..5d66d45a39 --- /dev/null +++ b/regression-test/data/insert_p0/txn_insert.out @@ -0,0 +1,29 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- +\N \N \N [NULL] [NULL, 0] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] + +-- !select2 -- +\N \N \N [NULL] [NULL, 0] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] + +-- !select3 -- +\N \N \N [NULL] [NULL, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + +-- !select4 -- +\N \N \N [NULL] [NULL, 0] +1 2.2 abc [] [] +1 2.2 abc [] [] +1 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] +2 3.3 xyz [1] [1, 0] + diff --git a/regression-test/suites/insert_p0/txn_insert.groovy b/regression-test/suites/insert_p0/txn_insert.groovy new file mode 100644 index 0000000000..64477ee8c3 --- /dev/null +++ b/regression-test/suites/insert_p0/txn_insert.groovy @@ -0,0 +1,75 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// The cases is copied from https://github.com/trinodb/trino/tree/master +// /testing/trino-product-tests/src/main/resources/sql-tests/testcases +// and modified by Doris. + +suite("txn_insert") { + def table = "txn_insert_tbl" + sql """ DROP TABLE IF EXISTS $table """ + sql """ + create table $table ( + k1 int, + k2 double, + k3 varchar(100), + k4 array, + k5 array + ) distributed by hash(k1) buckets 1 + properties("replication_num" = "1"); + """ + + // begin and commit + sql """begin""" + sql """insert into $table values(1, 2.2, "abc", [], [])""" + sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" + sql """insert into $table values(null, null, null, [null], [null, 0])""" + sql "commit" + sql "sync" + order_qt_select1 """select * from $table""" + + // begin and rollback + sql "begin" + sql """insert into $table values(1, 2.2, "abc", [], [])""" + sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" + sql "rollback" + sql "sync" + order_qt_select2 """select * from $table""" + + // begin 2 times and commit + sql "begin" + sql """insert into $table values(1, 2.2, "abc", [], [])""" + sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" + sql "begin" + sql """insert into $table values(1, 2.2, "abc", [], [])""" + sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" + sql "commit" + sql "sync" + order_qt_select3 """select * from $table""" + + // begin 2 times and rollback + sql "begin" + sql """insert into $table values(1, 2.2, "abc", [], [])""" + sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" + sql "begin" + sql """insert into $table values(1, 2.2, "abc", [], [])""" + sql """insert into $table values(2, 3.3, "xyz", [1], [1, 0])""" + sql "rollback" + sql "sync" + order_qt_select4 """select * from $table""" +}