From c80c4477cf547acffcc50b4f5d727122ee946c52 Mon Sep 17 00:00:00 2001 From: lvshaokang Date: Thu, 18 May 2023 00:10:15 +0800 Subject: [PATCH] [Enhancement](broker-load) broker load show stmt support display cluster name if specified (#19392) --- fe/fe-core/src/main/cup/sql_parser.cup | 14 +------- .../org/apache/doris/analysis/LoadStmt.java | 17 +-------- .../doris/analysis/UnifiedLoadStmt.java | 4 +-- .../doris/load/loadv2/BrokerLoadJob.java | 17 +++++++++ .../apache/doris/load/loadv2/BulkLoadJob.java | 2 +- .../org/apache/doris/qe/MultiLoadMgr.java | 2 +- .../apache/doris/analysis/LoadStmtTest.java | 4 +-- .../broker_load/test_broker_load.groovy | 35 +++++++++++++++++++ 8 files changed, 60 insertions(+), 35 deletions(-) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index a9f67cf270..3ba5197f65 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -838,7 +838,6 @@ nonterminal DataDescription data_desc, mysql_data_desc; nonterminal List data_desc_list; nonterminal LabelName job_label; nonterminal String opt_with_label; -nonterminal String opt_system; nonterminal BrokerDesc opt_broker; nonterminal ResourceDesc resource_desc; nonterminal List opt_col_list, opt_dup_keys, opt_columns_from_path; @@ -2276,11 +2275,10 @@ load_stmt ::= KW_LOAD KW_LABEL job_label:label LPAREN data_desc_list:dataDescList RPAREN opt_broker:broker - opt_system:system opt_properties:properties opt_comment:comment {: - RESULT = new UnifiedLoadStmt(label, dataDescList, broker, system, properties, comment, LoadType.BROKER_LOAD); + RESULT = new UnifiedLoadStmt(label, dataDescList, broker, properties, comment, LoadType.BROKER_LOAD); :} | KW_LOAD KW_LABEL job_label:label LPAREN data_desc_list:dataDescList RPAREN @@ -2550,16 +2548,6 @@ opt_col_mapping_list ::= :} ; -opt_system ::= - {: - RESULT = null; - :} - | KW_BY ident_or_text:system - {: - RESULT = system; - :} - ; - opt_broker ::= {: RESULT = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index 2abdc1dc6b..34d2f2f064 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -50,7 +50,6 @@ import java.util.Map.Entry; // LOAD LABEL load_label // (data_desc, ...) // [broker_desc] -// [BY cluster] // [resource_desc] // [PROPERTIES (key1=value1, )] // @@ -127,7 +126,6 @@ public class LoadStmt extends DdlStmt { private final LabelName label; private final List dataDescriptions; private final BrokerDesc brokerDesc; - private final String cluster; private final ResourceDesc resourceDesc; private final Map properties; private String user; @@ -217,7 +215,6 @@ public class LoadStmt extends DdlStmt { this.label = new LabelName(); this.dataDescriptions = Lists.newArrayList(dataDescription); this.brokerDesc = null; - this.cluster = null; this.resourceDesc = null; this.properties = properties; this.user = null; @@ -230,11 +227,10 @@ public class LoadStmt extends DdlStmt { } public LoadStmt(LabelName label, List dataDescriptions, - BrokerDesc brokerDesc, String cluster, Map properties, String comment) { + BrokerDesc brokerDesc, Map properties, String comment) { this.label = label; this.dataDescriptions = dataDescriptions; this.brokerDesc = brokerDesc; - this.cluster = cluster; this.resourceDesc = null; this.properties = properties; this.user = null; @@ -250,7 +246,6 @@ public class LoadStmt extends DdlStmt { this.label = label; this.dataDescriptions = dataDescriptions; this.brokerDesc = null; - this.cluster = null; this.resourceDesc = resourceDesc; this.properties = properties; this.user = null; @@ -273,11 +268,6 @@ public class LoadStmt extends DdlStmt { return brokerDesc; } - @Deprecated - public String getCluster() { - return cluster; - } - public ResourceDesc getResourceDesc() { return resourceDesc; } @@ -507,11 +497,6 @@ public class LoadStmt extends DdlStmt { if (brokerDesc != null) { sb.append("\n").append(brokerDesc.toSql()); } - if (cluster != null) { - sb.append("\nBY '"); - sb.append(cluster); - sb.append("'"); - } if (resourceDesc != null) { sb.append("\n").append(resourceDesc.toSql()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/UnifiedLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/UnifiedLoadStmt.java index 4b36793109..368e97449e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/UnifiedLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/UnifiedLoadStmt.java @@ -33,7 +33,7 @@ public class UnifiedLoadStmt extends DdlStmt { private final StatementBase proxyStmt; public UnifiedLoadStmt(LabelName label, List dataDescriptions, - BrokerDesc brokerDesc, String cluster, Map properties, String comment, LoadType loadType) { + BrokerDesc brokerDesc, Map properties, String comment, LoadType loadType) { final ConnectContext connectContext = ConnectContext.get(); if (connectContext != null && connectContext.getSessionVariable().isEnableUnifiedLoad()) { switch (loadType) { @@ -45,7 +45,7 @@ public class UnifiedLoadStmt extends DdlStmt { throw new IllegalStateException("does not support load type: " + loadType); } } else { - proxyStmt = new LoadStmt(label, dataDescriptions, brokerDesc, cluster, properties, comment); + proxyStmt = new LoadStmt(label, dataDescriptions, brokerDesc, properties, comment); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 666228ad4a..ef41736dd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -18,6 +18,7 @@ package org.apache.doris.load.loadv2; import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -39,6 +40,7 @@ import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.ProfileManager.ProfileType; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey; import org.apache.doris.load.EtlJobType; @@ -59,6 +61,7 @@ import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.RejectedExecutionException; import java.util.stream.Collectors; @@ -387,4 +390,18 @@ public class BrokerLoadJob extends BulkLoadJob { super.afterVisible(txnState, txnOperated); writeProfile(); } + + @Override + protected String getResourceName() { + StorageBackend.StorageType storageType = brokerDesc.getStorageType(); + if (storageType == StorageBackend.StorageType.BROKER) { + return brokerDesc.getName(); + } else if (storageType == StorageBackend.StorageType.S3) { + return Optional.ofNullable(brokerDesc.getProperties()) + .map(o -> o.get(S3Properties.Env.ENDPOINT)) + .orElse("s3_cluster"); + } else { + return storageType.name().toLowerCase().concat("_cluster"); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 0ea617f2b0..a6c5b012e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -124,7 +124,7 @@ public abstract class BulkLoadJob extends LoadJob { break; case SPARK: bulkLoadJob = new SparkLoadJob(db.getId(), stmt.getLabel().getLabelName(), stmt.getResourceDesc(), - stmt.getOrigStmt(), stmt.getUserInfo()); + stmt.getOrigStmt(), stmt.getUserInfo()); break; case MINI: case DELETE: diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java index fb188590ac..b7c46de233 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java @@ -354,7 +354,7 @@ public class MultiLoadMgr { } properties.remove(LoadStmt.KEY_COMMENT); - LoadStmt loadStmt = new LoadStmt(commitLabel, dataDescriptions, brokerDesc, null, properties, comment); + LoadStmt loadStmt = new LoadStmt(commitLabel, dataDescriptions, brokerDesc, properties, comment); loadStmt.setEtlJobType(EtlJobType.BROKER); loadStmt.setOrigStmt(new OriginStatement("", 0)); loadStmt.setUserInfo(ConnectContext.get().getCurrentUserIdentity()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java index c709f51b26..cd8fc4be26 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java @@ -110,7 +110,7 @@ public class LoadStmtTest { } }; - LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList, null, null, null, ""); + LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList, (BrokerDesc) null, null, ""); stmt.analyze(analyzer); Assert.assertEquals("testCluster:testDb", stmt.getLabel().getDbName()); Assert.assertEquals(dataDescriptionList, stmt.getDataDescriptions()); @@ -137,7 +137,7 @@ public class LoadStmtTest { } }; - LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), null, null, null, null, ""); + LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), null, (BrokerDesc) null, null, ""); stmt.analyze(analyzer); Assert.fail("No exception throws."); diff --git a/regression-test/suites/load_p2/broker_load/test_broker_load.groovy b/regression-test/suites/load_p2/broker_load/test_broker_load.groovy index 5d2b68c415..21cfd03cdb 100644 --- a/regression-test/suites/load_p2/broker_load/test_broker_load.groovy +++ b/regression-test/suites/load_p2/broker_load/test_broker_load.groovy @@ -192,6 +192,39 @@ suite("test_broker_load_p2", "p2") { "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000", ] + def task_info = ["cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + ] + def error_msg = ["", "", "", @@ -278,10 +311,12 @@ suite("test_broker_load_p2", "p2") { String[][] result = sql """ show load where label="$label" order by createtime desc limit 1; """ if (result[0][2].equals("FINISHED")) { logger.info("Load FINISHED " + label) + assertTrue(result[0][6].contains(task_info[i])) assertTrue(etl_info[i] == result[0][5], "expected: " + etl_info[i] + ", actual: " + result[0][5] + ", label: $label") break; } if (result[0][2].equals("CANCELLED")) { + assertTrue(result[0][6].contains(task_info[i])) assertTrue(result[0][7].contains(error_msg[i])) break; }