diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 62153c8f8f..1b59bf0b6b 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -160,10 +160,6 @@ int HttpStreamAction::on_header(HttpRequest* req) { ctx->load_type = TLoadType::MANUL_LOAD; ctx->load_src_type = TLoadSourceType::RAW; - ctx->label = req->header(HTTP_LABEL_KEY); - if (ctx->label.empty()) { - ctx->label = generate_uuid_string(); - } ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true"); LOG(INFO) << "new income streaming load request." << ctx->brief() @@ -315,6 +311,7 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req, ctx->db = ctx->put_result.params.db_name; ctx->table = ctx->put_result.params.table_name; ctx->txn_id = ctx->put_result.params.txn_conf.txn_id; + ctx->label = ctx->put_result.params.import_label; ctx->put_result.params.__set_wal_id(ctx->wal_id); if (ctx->group_commit) { diff --git a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md index 1bb2e6b85d..946c057198 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md @@ -303,6 +303,12 @@ curl --location-trusted -u user:passwd [-H "sql: ${load_sql}"...] -T data.file - curl --location-trusted -u root: -T test.csv -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream("format" = "CSV", "column_separator" = "," ) where age >= 30" http://127.0.0.1:28030/api/_http_stream ``` +#### 相关参数 + +1. label: 用户可以通过指定Label的方式来导入数据 +``` +curl -v --location-trusted -u root: -H "sql: insert into test.t1(c1, c2) WITH LABEL label1 select c1,c2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\")" -T example.csv http://127.0.0.1:8030/api/_http_stream +``` ### 返回结果 diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index b06cfc91d1..d34913bba5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2079,6 +2079,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.getParams().setTableName(parsedStmt.getTbl()); // The txn_id here is obtained from the NativeInsertStmt result.getParams().setTxnConf(new TTxnParams().setTxnId(txn_id)); + result.getParams().setImportLabel(parsedStmt.getLabel()); if (parsedStmt.isGroupCommitTvf) { result.getParams().params.setGroupCommit(true); } diff --git a/regression-test/data/load_p0/http_stream/test_http_stream.out b/regression-test/data/load_p0/http_stream/test_http_stream.out index 4471c47db0..5b6d8411bf 100644 --- a/regression-test/data/load_p0/http_stream/test_http_stream.out +++ b/regression-test/data/load_p0/http_stream/test_http_stream.out @@ -569,3 +569,16 @@ 龚强 948 龚静 641 +-- !sql14 -- +10000 aa +10001 bb +10002 cc +10003 dd +10004 ee +10005 ff +10006 gg +10007 hh +10008 ii +10009 jj +10010 kk + diff --git a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy index 5d991c40c0..bbfc2c30f0 100644 --- a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy +++ b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy @@ -600,231 +600,43 @@ suite("test_http_stream", "p0") { try_sql "DROP TABLE IF EXISTS ${tableName13}" } - // 14. test parquet orc case - def tableName14 = "test_parquet_orc_case" + // 14. test label + def tableName14 = "test_http_stream_label" + def label = UUID.randomUUID().toString().replaceAll("-", "") + try { - sql """ DROP TABLE IF EXISTS ${tableName14} """ sql """ - CREATE TABLE IF NOT EXISTS ${tableName14} ( - `WatchId` char(128), - `JavaEnable` smallint, - `Title` string, - `GoodEvent` smallint, - `EventTime` datetime, - `EventDate` date, - `CounterId` bigint, - `ClientIp` bigint, - `ClientIp6` char(50), - `RegionId` bigint, - `UserId` string, - `CounterClass` tinyint, - `Os` smallint, - `UserAgent` smallint, - `Url` string, - `Referer` string, - `Urldomain` string, - `RefererDomain` string, - `Refresh` smallint, - `IsRobot` smallint, - `RefererCategories` string, - `UrlCategories` string, - `UrlRegions` string, - `RefererRegions` string, - `ResolutionWidth` int, - `ResolutionHeight` int, - `ResolutionDepth` smallint, - `FlashMajor` smallint, - `FlashMinor` smallint, - `FlashMinor2` string, - `NetMajor` smallint, - `NetMinor` smallint, - `UserAgentMajor` int, - `UserAgentMinor` char(4), - `CookieEnable` smallint, - `JavascriptEnable` smallint, - `IsMobile` smallint, - `MobilePhone` smallint, - `MobilePhoneModel` string, - `Params` string, - `IpNetworkId` bigint, - `TraficSourceId` tinyint, - `SearchEngineId` int, - `SearchPhrase` string, - `AdvEngineId` smallint, - `IsArtifical` smallint, - `WindowClientWidth` int, - `WindowClientHeight` int, - `ClientTimeZone` smallint, - `ClientEventTime` datetime, - `SilverLightVersion1` smallint, - `SilverlightVersion2` smallint, - `SilverlightVersion3` bigint, - `SilverlightVersion4` int, - `PageCharset` string, - `CodeVersion` bigint, - `IsLink` smallint, - `IsDownload` smallint, - `IsNotBounce` smallint, - `FUniqId` string, - `Hid` bigint, - `IsOldCounter` smallint, - `IsEvent` smallint, - `IsParameter` smallint, - `DontCountHits` smallint, - `WithHash` smallint, - `HitColor` char(2), - `UtcEventTime` datetime, - `Age` smallint, - `Sex` smallint, - `Income` smallint, - `Interests` int, - `Robotness` smallint, - `GeneralInterests` string, - `RemoteIp` bigint, - `RemoteIp6` char(50), - `WindowName` int, - `OpenerName` int, - `historylength` smallint, - `BrowserLanguage` char(4), - `BrowserCountry` char(4), - `SocialNetwork` string, - `SocialAction` string, - `HttpError` int, - `SendTiming` int, - `DnsTiming` int, - `ConnectTiming` int, - `ResponseStartTiming` int, - `ResponseEndTiming` int, - `FetchTiming` int, - `RedirectTiming` int, - `DomInteractiveTiming` int, - `DomContentLoadedTiming` int, - `DomCompleteTiming` int, - `LoadEventStartTiming` int, - `LoadEventEndTiming` int, - `NsToDomContentLoadedTiming` int, - `FirstPaintTiming` int, - `RedirectCount` tinyint, - `SocialSourceNetworkId` smallint, - `SocialSourcePage` string, - `ParamPrice` bigint, - `ParamOrderId` string, - `ParamCurrency` char(6), - `ParamCurrencyId` int, - `GoalsReached` string, - `OpenStatServiceName` string, - `OpenStatCampaignId` string, - `OpenStatAdId` string, - `OpenStatSourceId` string, - `UtmSource` string, - `UtmMedium` string, - `UtmCampaign` string, - `UtmContent` string, - `UtmTerm` string, - `FromTag` string, - `HasGclId` smallint, - `RefererHash` string, - `UrlHash` string, - `ClId` bigint, - `YclId` string, - `ShareService` string, - `ShareUrl` string, - `ShareTitle` string, - `ParsedParamsKey1` string, - `ParsedParamsKey2` string, - `ParsedParamsKey3` string, - `ParsedParamsKey4` string, - `ParsedParamsKey5` string, - `ParsedParamsValueDouble` double, - `IsLandId` char(40), - `RequestNum` bigint, - `RequestTry` smallint - ) ENGINE=OLAP - DUPLICATE KEY(`WatchId`, `JavaEnable`) - DISTRIBUTED BY HASH(`WatchId`, `JavaEnable`) BUCKETS 3 - PROPERTIES ("replication_num" = "1"); + CREATE TABLE IF NOT EXISTS ${tableName14} ( + id int, + name CHAR(10) + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) """ - // streamLoad { - // set 'version', '1' - // set 'sql', """ - // insert into ${db}.${tableName14} select * from http_stream("format"="parquet") - // """ - // time 10000 - // set 'format', 'parquet' - // file 'test_http_stream_parquet_case.parquet' - // check { result, exception, startTime, endTime -> - // if (exception != null) { - // throw exception - // } - // log.info("http_stream result: ${result}".toString()) - // def json = parseJson(result) - // assertEquals("success", json.Status.toLowerCase()) - // } - // } - // qt_sql13 "select * from ${tableName14} order by WatchId" - sql """truncate table ${tableName14}""" - - // streamLoad { - // set 'version', '1' - // set 'sql', """ - // insert into ${db}.${tableName14} select * from http_stream("format"="parquet") - // """ - // time 10000 - // set 'format', 'parquet' - // file 'test_http_stream_parquet_case.parquet' - // check { result, exception, startTime, endTime -> - // if (exception != null) { - // throw exception - // } - // log.info("http_stream result: ${result}".toString()) - // def json = parseJson(result) - // assertEquals("success", json.Status.toLowerCase()) - // } - // } - // qt_sql13 "select * from ${tableName14} order by WatchId" - sql """truncate table ${tableName14}""" - - // streamLoad { - // set 'version', '1' - // set 'sql', """ - // insert into ${db}.${tableName14} select * from http_stream("format"="parquet") - // """ - // time 10000 - // set 'format', 'parquet' - // file 'test_http_stream_parquet_case.parquet' - // check { result, exception, startTime, endTime -> - // if (exception != null) { - // throw exception - // } - // log.info("http_stream result: ${result}".toString()) - // def json = parseJson(result) - // assertEquals("success", json.Status.toLowerCase()) - // } - // } - // qt_sql13 "select * from ${tableName14} order by WatchId" - sql """truncate table ${tableName14}""" - - // streamLoad { - // set 'version', '1' - // set 'sql', """ - // insert into ${db}.${tableName14} select * from http_stream("format"="orc") - // """ - // time 10000 - // set 'format', 'orc' - // file 'test_http_stream_orc_case.orc' - // check { result, exception, startTime, endTime -> - // if (exception != null) { - // throw exception - // } - // log.info("http_stream result: ${result}".toString()) - // def json = parseJson(result) - // assertEquals("success", json.Status.toLowerCase()) - // } - // } - // qt_sql13 "select * from ${tableName14} order by WatchId" - sql """truncate table ${tableName14}""" + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${db}.${tableName14} WITH LABEL ${label} select c1, c2 from http_stream("format"="csv", "column_separator"="--") + """ + time 10000 + file 'test_http_stream_column_separator.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("http_stream result: ${result}".toString()) + def json = parseJson(result) + assertEquals(label, json.Label.toLowerCase()) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(11, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + qt_sql14 "select id, name from ${tableName14} order by id" } finally { try_sql "DROP TABLE IF EXISTS ${tableName14}" }