diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index 27981526b6..61918a5e8e 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -43,6 +43,8 @@ std::string StreamLoadContext::to_json() const { break; case TStatusCode::LABEL_ALREADY_EXISTS: writer.String("Label Already Exists"); + writer.Key("ExistingJobStatus"); + writer.String(existing_job_status.c_str()); break; default: writer.String("Fail"); diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index f2c5c519a4..ec4ce5a858 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -172,6 +172,9 @@ public: int64_t start_nanos = 0; int64_t load_cost_nanos = 0; std::string error_url = ""; + // if label already be used, set existing job's status here + // should be RUNNING or FINISHED + std::string existing_job_status = ""; KafkaLoadInfo* kafka_info = nullptr; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 2ad2fbbf7d..bd925410fd 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -144,6 +144,9 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { if (!status.ok()) { LOG(WARNING) << "begin transaction failed, errmsg=" << status.get_error_msg() << ctx->brief(); + if (result.__isset.job_status) { + ctx->existing_job_status = result.job_status; + } return status; } ctx->txn_id = result.txnId; diff --git a/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md b/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md index f1d83f306f..ca1a30c80a 100644 --- a/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md @@ -18,13 +18,13 @@ Insert Into 命令需要通过 MySQL 协议提交,创建导入请求会同步 语法: ``` -INSERT INTO table_name [partition_info] [col_list] [query_stmt] [VALUES]; +INSERT INTO table_name [WITH LABEL label] [partition_info] [col_list] [query_stmt] [VALUES]; ``` 示例: ``` -INSERT INTO tbl2 SELECT * FROM tbl3; +INSERT INTO tbl2 WITH LABEL label1 SELECT * FROM tbl3; INSERT INTO tbl1 VALUES ("qweasdzxcqweasdzxc"), ("a"); ``` @@ -50,6 +50,14 @@ INSERT INTO tbl1 VALUES ("qweasdzxcqweasdzxc"), ("a"); *注意:VALUES 方式仅适用于导入几条数据作为导入 DEMO 的情况,完全不适用于任何测试和生产环境。Doris 系统本身也不适合单条数据导入的场景。建议使用 INSERT INTO SELECT 的方式进行批量导入。* +* WITH LABEL + + INSERT 操作作为一个导入任务,也可以指定一个 label。如果不指定,则系统会自动指定一个 UUID 作为 label。 + + 该功能需要 0.11+ 版本。 + + *注意:建议指定 Label 而不是由系统自动分配。如果由系统自动分配,但在 Insert Into 语句执行过程中,因网络错误导致连接断开等,则无法得知 Insert Into 是否成功。而如果指定 Label,则可以再次通过 Label 查看任务结果。* + ### 导入结果 Insert Into 本身就是一个 SQL 命令,所以返回的行为同 SQL 命令的返回行为。 @@ -66,13 +74,24 @@ Insert Into 本身就是一个 SQL 命令,所以返回的行为同 SQL 命令 Query OK, 100 row affected, 0 warning (0.22 sec) ``` -导入可能部分成功,则还会附加一个 Label 字段。示例如下: +如果用户指定了 Label,则会也会返回 Label +``` +Query OK, 100 row affected, 0 warning (0.22 sec) +{'label':'user_specified_label'} +``` + +导入可能部分成功,则会附加 Label 字段。示例如下: ``` Query OK, 100 row affected, 1 warning (0.23 sec) {'label':'7d66c457-658b-4a3e-bdcf-8beee872ef2c'} ``` +``` +Query OK, 100 row affected, 1 warning (0.23 sec) +{'label':'user_specified_label'} +``` + 其中 affected 表示导入的行数。warning 表示失败的行数。用户需要通过 `SHOW LOAD WHERE LABEL="xxx";` 命令,获取 url 查看错误行。 如果没有任何数据,也会返回成功,且 affected 和 warning 都是 0。 @@ -152,7 +171,7 @@ bj_store_sales schema: 由于用户是希望将一张表中的数据做 ETL 并导入到目标表中,所以应该使用 Insert into query\_stmt 方式导入。 ``` - INSERT INTO bj_store_sales SELECT id, total, user_id, sale_timestamp FROM store_sales where region = "bj"; + INSERT INTO bj_store_sales WITH LABEL `label` SELECT id, total, user_id, sale_timestamp FROM store_sales where region = "bj"; ``` ## 常见问题 diff --git a/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md index d0b7d649cf..662558edc3 100644 --- a/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md @@ -164,6 +164,7 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的 "TxnId": 1003, "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee", "Status": "Success", + "ExistingJobStatus": "FINISHED", // optional "Message": "OK", "NumberTotalRows": 1000000, "NumberLoadedRows": 1000000, @@ -190,6 +191,10 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的 "Label Already Exists":Label 重复,需更换 Label。 "Fail":导入失败。 + ++ ExistingJobStatus:已存在的 Label 对应的导入作业的状态。 + + 这个字段只有在当 Status 为 "Label Already Exists" 是才会显示。用户可以通过这个状态,知晓已存在 Label 对应的导入作业的状态。"RUNNING" 表示作业还在执行,"FINISHED" 表示作业成功。 + Message:导入错误信息。 diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/insert.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/insert.md index ad9225e1b3..2ffba1bc75 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/insert.md +++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/insert.md @@ -4,9 +4,10 @@ ``` INSERT INTO table_name + [ WITH LABEL label] [ PARTITION (, ...) ] [ (column [, ...]) ] - [ \[ hint [, ...] \] ] + [ [ hint [, ...] ] ] { VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query } ``` @@ -14,6 +15,8 @@ INSERT INTO table_name > tablet_name: 导入数据的目的表。可以是 `db_name.table_name` 形式 > +> label: 为 Insert 任务指定一个 label +> > partition_names: 指定待导入的分区,必须是 `table_name` 中存在的分区,多个分区名称用逗号分隔 > > column_name: 指定的目的列,必须是 `table_name` 中存在的列 @@ -50,27 +53,27 @@ INSERT INTO test (c1) VALUES (1); 2. 向`test`表中一次性导入多行数据 ``` -INSERT INTO test VALUES (1, 2), (3, 2 + 2) -INSERT INTO test (c1, c2) VALUES (1, 2), (3, 2 * 2) -INSERT INTO test (c1) VALUES (1), (3) -INSERT INTO test (c1, c2) VALUES (1, DEFAULT), (3, DEFAULT) +INSERT INTO test VALUES (1, 2), (3, 2 + 2); +INSERT INTO test (c1, c2) VALUES (1, 2), (3, 2 * 2); +INSERT INTO test (c1) VALUES (1), (3); +INSERT INTO test (c1, c2) VALUES (1, DEFAULT), (3, DEFAULT); ``` 其中第一条、第二条语句效果一样,向`test`表中一次性导入两条数据 第三条、第四条语句效果已知,使用`c2`列的默认值向`test`表中导入两条数据 -3. 向`test`表中同步的导入一个查询语句的返回结果 +3. 向 `test` 表中导入一个查询语句结果 ``` -INSERT INTO test [streaming] SELECT * FROM test2 -INSERT INTO test (c1, c2) [streaming] SELECT * from test2 +INSERT INTO test SELECT * FROM test2; +INSERT INTO test (c1, c2) SELECT * from test2; ``` -4. 向`test`表中异步的导入一个查询语句结果 +4. 向 `test` 表中导入一个查询语句结果,并指定 label ``` -INSERT INTO test SELECT * FROM test2 -INSERT INTO test (c1, c2) SELECT * from test2 +INSERT INTO test WITH LABEL `label1` SELECT * FROM test2; +INSERT INTO test WITH LABEL `label1` (c1, c2) SELECT * from test2; ``` 异步的导入其实是,一个同步的导入封装成了异步。填写 streaming 和不填写的**执行效率是一样**的。 diff --git a/docs/documentation/en/administrator-guide/load-data/insert-into-manual_EN.md b/docs/documentation/en/administrator-guide/load-data/insert-into-manual_EN.md index 8e8c46a40e..6d3024b2da 100644 --- a/docs/documentation/en/administrator-guide/load-data/insert-into-manual_EN.md +++ b/docs/documentation/en/administrator-guide/load-data/insert-into-manual_EN.md @@ -18,13 +18,13 @@ The Insert Into command needs to be submitted through MySQL protocol. Creating a Grammar: ``` -INSERT INTO table_name [partition_info] [col_list] [query_stmt] [VALUES]; +INSERT INTO table_name [WITH LABEL label] [partition_info] [col_list] [query_stmt] [VALUES]; ``` Examples: ``` -INSERT INTO tbl2 SELECT * FROM tbl3; +INSERT INTO tbl2 WITH LABEL label1 SELECT * FROM tbl3; INSERT INTO tbl1 VALUES ("qweasdzxcqweasdzxc"), ("a"); ``` @@ -49,12 +49,56 @@ The following is a brief introduction to the parameters used in creating import Users can insert one or more data through VALUES grammar. *Note: VALUES is only suitable for importing several pieces of data as DEMO. It is totally unsuitable for any test and production environment. Doris system itself is not suitable for single data import scenarios. It is recommended to use INSERT INTO SELECT for batch import.* + +* WITH LABEL + + INSERT as a load job, it can also be with a label. If not with a label, Doris will use a UUID as label. + + This feature needs Doris version 0.11+. + + *Note: It is recommended that Label be specified rather than automatically allocated by the system. If the system allocates automatically, but during the execution of the Insert Into statement, the connection is disconnected due to network errors, etc., then it is impossible to know whether Insert Into is successful. If you specify Label, you can view the task results again through Label.* ### Load results Insert Into itself is an SQL command, so the return behavior is the same as the return behavior of the SQL command. -If the import fails, the return statement fails to execute. If the import succeeds, the return statement executes successfully and a Label field is appended. +If the load fails, the error will be returned. Examples are as follows: + + +``` +ERROR 1064 (HY000): All partitions have no load data. url: http://ip:port/api/_load_error_log?File=_shard_14/error_log_insert_stmt_f435264d82f342e4-a33764f5f0dfbf00_f4364d82f342e4_a764f344e4_a764f5f5f0df0df0dbf00 +``` + +Where URL can be used to query the wrong data, see the following **view error line** summary. + +If the load succeeds, the success will be returned. Examples are as follows: + +``` +Query OK, 100 row affected, 0 warning (0.22 sec) +``` + +If the user specifies Label, the label will be returned as well. + +``` +Query OK, 100 row affected, 0 warning (0.22 sec) +{label':'user_specified_label'} +``` + +If the load may be partially successful, the Label field is appended. Examples are as follows: + +``` +Query OK, 100 row affected, 1 warning (0.23 sec) +{label':'7d66c457-658b-4a3e-bdcf-8beee872ef2c'} +``` + +``` +Query OK, 100 row affected, 1 warning (0.23 sec) +{label':'user_specified_label'} +``` + +Where affected represents the number of rows loaded. Warning denotes the number of rows that failed. Users need to view the wrong line through `SHOW LOAD WHERE LABEL='xxx';` command, and get url to view the errors. + +If there is no data, it will return success, and both affected and warning are 0. Label is the identifier of the Insert Into import job. Each import job has a unique Label inside a single database. Insert Into's Label is generated by the system. Users can use the Label to asynchronously obtain the import status by querying the import command. @@ -70,7 +114,7 @@ Label is the identifier of the Insert Into import job. Each import job has a uni At the same time, the Insert Into statement receives the restriction of the Session variable `query_timeout`. You can increase the timeout time by `SET query_timeout = xxx;` in seconds. -### Session 变量 +### Session Variables + enable\_insert\_strict diff --git a/docs/documentation/en/administrator-guide/load-data/stream-load-manual_EN.md b/docs/documentation/en/administrator-guide/load-data/stream-load-manual_EN.md index ebd692adb9..687a473af6 100644 --- a/docs/documentation/en/administrator-guide/load-data/stream-load-manual_EN.md +++ b/docs/documentation/en/administrator-guide/load-data/stream-load-manual_EN.md @@ -125,6 +125,7 @@ Examples: "TxnId": 1003, "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee", "Status": "Success", + "ExistingJobStatus": "FINISHED", // optional "Message": "OK", "NumberTotalRows": 1000000, "NumberLoadedRows": 1000000, @@ -151,6 +152,10 @@ The following main explanations are given for the Stream load import result para "Label Already Exists":Label 重复,需更换 Label。 "Fail": Import failed. + ++ ExistingJobStatus: The state of the load job corresponding to the existing Label. + + This field is displayed only when the status is "Label Already Exists". The user can know the status of the load job corresponding to Label through this state. "RUNNING" means that the job is still executing, and "FINISHED" means that the job is successful. + Message: Import error messages. diff --git a/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/insert_EN.md b/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/insert_EN.md index fef0baf56a..4106f37fbe 100644 --- a/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/insert_EN.md +++ b/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/insert_EN.md @@ -1,37 +1,40 @@ -"35; INSERT -Description -'35;'35;' 35; Syntax +# INSERT +## Description +### Syntax ``` INSERT INTO table_name -[PARTICIPATION [...] +[ WITH LABEL label] +[ PARTICIPATION [...] [ (column [, ...]) ] -[ \[ hint [, ...] \] ] +[ [ hint [, ...] ] ] { VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query } ``` ### Parameters -> tablet_name: Target table for importing data. It can be in the form of `db_name. table_name'. +> tablet_name: Target table for loading data. It can be in the form of `db_name.table_name`. > -> partition_names: Specifies that the partition to be imported must be a partition that exists in `table_name', with multiple partition names separated by commas +> label: Specifies a label for Insert job. > -> column_name: The specified destination column must be a column that exists in `table_name'. +> partition_names: Specifies the partitions to be loaded, with multiple partition names separated by commas. The partitions must exist in `table_name`, > -> expression: The corresponding expression that needs to be assigned to a column +> column_name: The specified destination columns must be columns that exists in `table_name`. +> +> expression: The corresponding expression that needs to be assigned to a column. > > DEFAULT: Let the corresponding columns use default values > > query: A common query whose results are written to the target > -> hint: Indicators used to indicate `INSERT'execution. ` Both streaming `and default non `streaming'methods use synchronization to complete `INSERT' statement execution +> hint: Indicators used to indicate `INSERT` execution. ` Both streaming `and default non `streaming'methods use synchronization to complete `INSERT' statement execution > The non `streaming'mode returns a label after execution to facilitate users to query the imported status through `SHOW LOAD'. -'35;'35;' 35; Note +### Note When the `INSERT'statement is currently executed, the default behavior for data that does not conform to the target table is filtering, such as string length. However, for business scenarios where data is not filtered, the session variable `enable_insert_strict'can be set to `true' to ensure that `INSERT'will not be successfully executed when data is filtered out. -'35;'35; example +## example ` The test `table contains two columns `c1', `c2'. @@ -59,20 +62,21 @@ Insert in test (C1, C2) values (1, Default), (3, Default) The effect of the first and second statements is the same, and two data are imported into the `test'table at one time. The effect of the third and fourth statements is known, using the default value of the `c2'column to import two data into the `test' table. -3. Return results of importing a query statement synchronously into the `test'table -``` -INSERT INTO test [streaming] SELECT * FROM test2 -INSERT INTO test (c1, c2) [streaming] SELECT * from test2 -``` - -4. Import an asynchronous query result into the `test'table +3. Insert into table `test` with a query stmt. ``` INSERT INTO test SELECT * FROM test2 INSERT INTO test (c1, c2) SELECT * from test2 ``` +4. Insert into table `test` with specified label + +``` +INSERT INTO test WITH LABEL `label1` SELECT * FROM test2; +INSERT INTO test WITH LABEL `label1` (c1, c2) SELECT * from test2; +``` + Asynchronous imports are, in fact, encapsulated asynchronously by a synchronous import. Filling in streaming is as efficient as not filling in * execution. Since Doris used to import asynchronously, in order to be compatible with the old usage habits, the `INSERT'statement without streaming will still return a label. Users need to view the status of the `label' import job through the `SHOW LOAD command. diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index b6ab01ac7f..9b720b0fb0 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -378,6 +378,7 @@ nonterminal AccessPrivilege privilege_type; nonterminal DataDescription data_desc; nonterminal List data_desc_list; nonterminal LabelName job_label; +nonterminal String opt_with_label; nonterminal String opt_system; nonterminal String opt_cluster; nonterminal BrokerDesc opt_broker; @@ -2334,9 +2335,9 @@ use_stmt ::= // Insert statement insert_stmt ::= - KW_INSERT KW_INTO insert_target:target opt_col_list:cols opt_plan_hints:hints insert_source:source + KW_INSERT KW_INTO insert_target:target opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source {: - RESULT = new InsertStmt(target, cols, source, hints); + RESULT = new InsertStmt(target, label, cols, source, hints); :} // TODO(zc) add default value for SQL-2003 // | KW_INSERT KW_INTO insert_target:target KW_DEFAULT KW_VALUES @@ -2349,6 +2350,17 @@ insert_target ::= :} ; +opt_with_label ::= + /* empty */ + {: + RESULT = null; + :} + | KW_WITH KW_LABEL ident:label + {: + RESULT = label; + :} + ; + insert_source ::= query_stmt:query {: diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java index 5568eb2e0a..ff94247399 100644 --- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -89,6 +89,7 @@ public class InsertStmt extends DdlStmt { private final List planHints; private Boolean isRepartition; private boolean isStreaming = false; + private String label = null; private Map indexIdToSchemaHash = null; @@ -111,7 +112,7 @@ public class InsertStmt extends DdlStmt { List targetColumns = Lists.newArrayList(); - public InsertStmt(InsertTarget target, List cols, InsertSource source, List hints) { + public InsertStmt(InsertTarget target, String label, List cols, InsertSource source, List hints) { this.tblName = target.getTblName(); List tmpPartitions = target.getPartitions(); if (tmpPartitions != null) { @@ -120,9 +121,10 @@ public class InsertStmt extends DdlStmt { } else { targetPartitions = null; } + this.label = label; this.queryStmt = source.getQueryStmt(); this.planHints = hints; - targetColumnNames = cols; + this.targetColumnNames = cols; } // Ctor for CreateTableAsSelectStmt @@ -207,6 +209,14 @@ public class InsertStmt extends DdlStmt { return isStreaming; } + public String getLabel() { + return label; + } + + public boolean hasLabel() { + return label != null; + } + // Only valid when this statement is streaming public OlapTableSink getOlapTableSink() { return (OlapTableSink) dataSink; @@ -321,8 +331,6 @@ public class InsertStmt extends DdlStmt { } BrokerTable brokerTable = (BrokerTable) targetTable; - List paths = brokerTable.getPaths(); - if (!brokerTable.isWritable()) { throw new AnalysisException("table " + brokerTable.getName() + "is not writable. path should be an dir"); diff --git a/fe/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java b/fe/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java index 0379d6e57d..54979745b5 100644 --- a/fe/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java +++ b/fe/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java @@ -17,6 +17,10 @@ package org.apache.doris.common; +import org.apache.doris.transaction.TransactionStatus; + +import com.google.common.base.Preconditions; + /* * Author: Chenmingyu * Date: Jan 16, 2019 @@ -26,11 +30,36 @@ public class LabelAlreadyUsedException extends DdlException { private static final long serialVersionUID = -6798925248765094813L; + // status of existing load job + // RUNNING or FINISHED + private String jobStatus; + public LabelAlreadyUsedException(String label) { super("Label [" + label + "] has already been used."); } + public LabelAlreadyUsedException(String label, TransactionStatus txnStatus) { + super("Label [" + label + "] has already been used."); + switch (txnStatus) { + case UNKNOWN: + case PREPARE: + jobStatus = "RUNNING"; + break; + case COMMITTED: + case VISIBLE: + jobStatus = "FINISHED"; + break; + default: + Preconditions.checkState(false, txnStatus); + break; + } + } + public LabelAlreadyUsedException(String label, String subLabel) { super("Sub label [" + subLabel + "] has already been used."); } + + public String getJobStatus() { + return jobStatus; + } } diff --git a/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java index 2f9e664946..caa1e52dd3 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java @@ -66,8 +66,8 @@ public class TransProcDir implements ProcDirInterface { result.setNames(TITLE_NAMES); GlobalTransactionMgr transactionMgr = Catalog.getCurrentGlobalTransactionMgr(); List> infos = transactionMgr.getDbTransInfo(dbId, state.equals("running"), MAX_SHOW_ENTRIES); - // order by transactionId, asc - ListComparator> comparator = new ListComparator>(0); + // order by transactionId, desc + ListComparator> comparator = new ListComparator>(true, 0); Collections.sort(infos, comparator); for (List info : infos) { List row = new ArrayList(info.size()); diff --git a/fe/src/main/java/org/apache/doris/http/rest/GetLoadInfoAction.java b/fe/src/main/java/org/apache/doris/http/rest/GetLoadInfoAction.java index a9a8115ab0..3a28a2b388 100644 --- a/fe/src/main/java/org/apache/doris/http/rest/GetLoadInfoAction.java +++ b/fe/src/main/java/org/apache/doris/http/rest/GetLoadInfoAction.java @@ -32,13 +32,13 @@ import io.netty.handler.codec.http.HttpMethod; // Get load information of one load job public class GetLoadInfoAction extends RestBaseAction { - public GetLoadInfoAction(ActionController controller) { + public GetLoadInfoAction(ActionController controller, boolean isStreamLoad) { super(controller); } public static void registerAction(ActionController controller) throws IllegalArgException { - GetLoadInfoAction action = new GetLoadInfoAction(controller); + GetLoadInfoAction action = new GetLoadInfoAction(controller, false); controller.registerHandler(HttpMethod.GET, "/api/{" + DB_KEY + "}/_load_info", action); } @@ -61,6 +61,7 @@ public class GetLoadInfoAction extends RestBaseAction { if (redirectToMaster(request, response)) { return; } + try { catalog.getLoadInstance().getJobInfo(info); if (info.tblNames.isEmpty()) { @@ -73,13 +74,11 @@ public class GetLoadInfoAction extends RestBaseAction { } catch (DdlException | MetaNotFoundException e) { catalog.getLoadManager().getLoadJobInfo(info); } - sendResult(request, response, new Result(info)); } private static class Result extends RestBaseResult { private Load.JobInfo jobInfo; - public Result(Load.JobInfo info) { jobInfo = info; } diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index ac89629411..5ef8b4a6e6 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -593,6 +593,12 @@ public class StmtExecutor { long createTime = System.currentTimeMillis(); UUID uuid = UUID.randomUUID(); + String label = insertStmt.getLabel(); + if (label == null) { + // if label is not set, use the uuid as label + label = uuid.toString(); + } + Throwable throwable = null; long loadedRows = 0; @@ -659,7 +665,7 @@ public class StmtExecutor { 5000); } catch (Throwable t) { // if any throwable being thrown during insert operation, first we should abort this txn - LOG.warn("handle insert stmt fail: {}", DebugUtil.printId(uuid), t); + LOG.warn("handle insert stmt fail: {}", label, t); try { Catalog.getCurrentGlobalTransactionMgr().abortTransaction( insertStmt.getTransactionId(), @@ -670,8 +676,9 @@ public class StmtExecutor { LOG.warn("errors when abort txn", abortTxnException); } - if (!Config.using_old_load_usage_pattern) { - // if not using old usage pattern, the exception will be thrown to user directly without a label + if (!Config.using_old_load_usage_pattern && !insertStmt.hasLabel()) { + // if not using old usage pattern, or user not specify label, + // the exception will be thrown to user directly without a label StringBuilder sb = new StringBuilder(t.getMessage()); if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) { sb.append(". url: " + coord.getTrackingUrl()); @@ -692,10 +699,11 @@ public class StmtExecutor { // 1. NOT a streaming insert(deprecated) // 2. using_old_load_usage_pattern is set to true, means a label will be returned for user to show load. // 3. has filtered rows. so a label should be returned for user to show - if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern || filteredRows > 0) { + // 4. user specify a label for insert stmt + if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern || filteredRows > 0 || insertStmt.hasLabel()) { try { context.getCatalog().getLoadManager().recordFinishedLoadJob( - uuid.toString(), + label, insertStmt.getDb(), insertStmt.getTargetTable().getId(), EtlJobType.INSERT, @@ -712,7 +720,7 @@ public class StmtExecutor { // set to OK, which means the insert load job is successfully submitted. // and user can check the job's status by label. - context.getState().setOk(loadedRows, filteredRows, "{'label':'" + uuid.toString() + "'}"); + context.getState().setOk(loadedRows, filteredRows, "{'label':'" + label + "'}"); } else { // just return OK without label, which means this job is successfully done without any error Preconditions.checkState(loadedRows > 0 && filteredRows == 0); diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 6b722e869c..072a5b8343 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -607,6 +607,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { } catch (LabelAlreadyUsedException e) { status.setStatus_code(TStatusCode.LABEL_ALREADY_EXISTS); status.addToError_msgs(e.getMessage()); + result.setJob_status(e.getJobStatus()); } catch (UserException e) { LOG.warn("failed to begin: {}", e.getMessage()); status.setStatus_code(TStatusCode.ANALYSIS_ERROR); diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 0b7376635d..19e7a5b9ea 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -145,16 +145,16 @@ public class GlobalTransactionMgr { FeNameFormat.checkLabel(label); Map txnLabels = dbIdToTxnLabels.row(dbId); if (txnLabels != null && txnLabels.containsKey(label)) { + TransactionState existTxn = getTransactionState(txnLabels.get(label)); // check timestamp if (requestId != null) { - TransactionState existTxn = getTransactionState(txnLabels.get(label)); if (existTxn != null && existTxn.getTransactionStatus() == TransactionStatus.PREPARE && existTxn.getRequsetId() != null && existTxn.getRequsetId().equals(requestId)) { // this may be a retry request for same job, just return existing txn id. return txnLabels.get(label); } } - throw new LabelAlreadyUsedException(label); + throw new LabelAlreadyUsedException(label, existTxn.getTransactionStatus()); } if (runningTxnNums.get(dbId) != null && runningTxnNums.get(dbId) > Config.max_running_txn_num_per_db) { @@ -1326,7 +1326,7 @@ public class GlobalTransactionMgr { public TransactionIdGenerator getTransactionIDGenerator() { return this.idGenerator; } - + // this two function used to read snapshot or write snapshot public void write(DataOutput out) throws IOException { int numTransactions = idToTransactionState.size(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 3a72959554..2adbe06719 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -495,6 +495,7 @@ struct TLoadTxnBeginRequest { struct TLoadTxnBeginResult { 1: required Status.TStatus status 2: optional i64 txnId + 3: optional string job_status // if label already used, set status of existing job } // StreamLoad request, used to load a streaming to engine