[Enhancement](broker-load) broker load show stmt support display cluster name if specified (#19392)
This commit is contained in:
@ -838,7 +838,6 @@ nonterminal DataDescription data_desc, mysql_data_desc;
|
||||
nonterminal List<DataDescription> data_desc_list;
|
||||
nonterminal LabelName job_label;
|
||||
nonterminal String opt_with_label;
|
||||
nonterminal String opt_system;
|
||||
nonterminal BrokerDesc opt_broker;
|
||||
nonterminal ResourceDesc resource_desc;
|
||||
nonterminal List<String> opt_col_list, opt_dup_keys, opt_columns_from_path;
|
||||
@ -2276,11 +2275,10 @@ load_stmt ::=
|
||||
KW_LOAD KW_LABEL job_label:label
|
||||
LPAREN data_desc_list:dataDescList RPAREN
|
||||
opt_broker:broker
|
||||
opt_system:system
|
||||
opt_properties:properties
|
||||
opt_comment:comment
|
||||
{:
|
||||
RESULT = new UnifiedLoadStmt(label, dataDescList, broker, system, properties, comment, LoadType.BROKER_LOAD);
|
||||
RESULT = new UnifiedLoadStmt(label, dataDescList, broker, properties, comment, LoadType.BROKER_LOAD);
|
||||
:}
|
||||
| KW_LOAD KW_LABEL job_label:label
|
||||
LPAREN data_desc_list:dataDescList RPAREN
|
||||
@ -2550,16 +2548,6 @@ opt_col_mapping_list ::=
|
||||
:}
|
||||
;
|
||||
|
||||
opt_system ::=
|
||||
{:
|
||||
RESULT = null;
|
||||
:}
|
||||
| KW_BY ident_or_text:system
|
||||
{:
|
||||
RESULT = system;
|
||||
:}
|
||||
;
|
||||
|
||||
opt_broker ::=
|
||||
{:
|
||||
RESULT = null;
|
||||
|
||||
@ -50,7 +50,6 @@ import java.util.Map.Entry;
|
||||
// LOAD LABEL load_label
|
||||
// (data_desc, ...)
|
||||
// [broker_desc]
|
||||
// [BY cluster]
|
||||
// [resource_desc]
|
||||
// [PROPERTIES (key1=value1, )]
|
||||
//
|
||||
@ -127,7 +126,6 @@ public class LoadStmt extends DdlStmt {
|
||||
private final LabelName label;
|
||||
private final List<DataDescription> dataDescriptions;
|
||||
private final BrokerDesc brokerDesc;
|
||||
private final String cluster;
|
||||
private final ResourceDesc resourceDesc;
|
||||
private final Map<String, String> properties;
|
||||
private String user;
|
||||
@ -217,7 +215,6 @@ public class LoadStmt extends DdlStmt {
|
||||
this.label = new LabelName();
|
||||
this.dataDescriptions = Lists.newArrayList(dataDescription);
|
||||
this.brokerDesc = null;
|
||||
this.cluster = null;
|
||||
this.resourceDesc = null;
|
||||
this.properties = properties;
|
||||
this.user = null;
|
||||
@ -230,11 +227,10 @@ public class LoadStmt extends DdlStmt {
|
||||
}
|
||||
|
||||
public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
|
||||
BrokerDesc brokerDesc, String cluster, Map<String, String> properties, String comment) {
|
||||
BrokerDesc brokerDesc, Map<String, String> properties, String comment) {
|
||||
this.label = label;
|
||||
this.dataDescriptions = dataDescriptions;
|
||||
this.brokerDesc = brokerDesc;
|
||||
this.cluster = cluster;
|
||||
this.resourceDesc = null;
|
||||
this.properties = properties;
|
||||
this.user = null;
|
||||
@ -250,7 +246,6 @@ public class LoadStmt extends DdlStmt {
|
||||
this.label = label;
|
||||
this.dataDescriptions = dataDescriptions;
|
||||
this.brokerDesc = null;
|
||||
this.cluster = null;
|
||||
this.resourceDesc = resourceDesc;
|
||||
this.properties = properties;
|
||||
this.user = null;
|
||||
@ -273,11 +268,6 @@ public class LoadStmt extends DdlStmt {
|
||||
return brokerDesc;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public String getCluster() {
|
||||
return cluster;
|
||||
}
|
||||
|
||||
public ResourceDesc getResourceDesc() {
|
||||
return resourceDesc;
|
||||
}
|
||||
@ -507,11 +497,6 @@ public class LoadStmt extends DdlStmt {
|
||||
if (brokerDesc != null) {
|
||||
sb.append("\n").append(brokerDesc.toSql());
|
||||
}
|
||||
if (cluster != null) {
|
||||
sb.append("\nBY '");
|
||||
sb.append(cluster);
|
||||
sb.append("'");
|
||||
}
|
||||
if (resourceDesc != null) {
|
||||
sb.append("\n").append(resourceDesc.toSql());
|
||||
}
|
||||
|
||||
@ -33,7 +33,7 @@ public class UnifiedLoadStmt extends DdlStmt {
|
||||
private final StatementBase proxyStmt;
|
||||
|
||||
public UnifiedLoadStmt(LabelName label, List<DataDescription> dataDescriptions,
|
||||
BrokerDesc brokerDesc, String cluster, Map<String, String> properties, String comment, LoadType loadType) {
|
||||
BrokerDesc brokerDesc, Map<String, String> properties, String comment, LoadType loadType) {
|
||||
final ConnectContext connectContext = ConnectContext.get();
|
||||
if (connectContext != null && connectContext.getSessionVariable().isEnableUnifiedLoad()) {
|
||||
switch (loadType) {
|
||||
@ -45,7 +45,7 @@ public class UnifiedLoadStmt extends DdlStmt {
|
||||
throw new IllegalStateException("does not support load type: " + loadType);
|
||||
}
|
||||
} else {
|
||||
proxyStmt = new LoadStmt(label, dataDescriptions, brokerDesc, cluster, properties, comment);
|
||||
proxyStmt = new LoadStmt(label, dataDescriptions, brokerDesc, properties, comment);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.load.loadv2;
|
||||
|
||||
import org.apache.doris.analysis.BrokerDesc;
|
||||
import org.apache.doris.analysis.StorageBackend;
|
||||
import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
@ -39,6 +40,7 @@ import org.apache.doris.common.util.LogKey;
|
||||
import org.apache.doris.common.util.MetaLockUtils;
|
||||
import org.apache.doris.common.util.ProfileManager.ProfileType;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
import org.apache.doris.load.BrokerFileGroup;
|
||||
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
|
||||
import org.apache.doris.load.EtlJobType;
|
||||
@ -59,6 +61,7 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
@ -387,4 +390,18 @@ public class BrokerLoadJob extends BulkLoadJob {
|
||||
super.afterVisible(txnState, txnOperated);
|
||||
writeProfile();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getResourceName() {
|
||||
StorageBackend.StorageType storageType = brokerDesc.getStorageType();
|
||||
if (storageType == StorageBackend.StorageType.BROKER) {
|
||||
return brokerDesc.getName();
|
||||
} else if (storageType == StorageBackend.StorageType.S3) {
|
||||
return Optional.ofNullable(brokerDesc.getProperties())
|
||||
.map(o -> o.get(S3Properties.Env.ENDPOINT))
|
||||
.orElse("s3_cluster");
|
||||
} else {
|
||||
return storageType.name().toLowerCase().concat("_cluster");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -124,7 +124,7 @@ public abstract class BulkLoadJob extends LoadJob {
|
||||
break;
|
||||
case SPARK:
|
||||
bulkLoadJob = new SparkLoadJob(db.getId(), stmt.getLabel().getLabelName(), stmt.getResourceDesc(),
|
||||
stmt.getOrigStmt(), stmt.getUserInfo());
|
||||
stmt.getOrigStmt(), stmt.getUserInfo());
|
||||
break;
|
||||
case MINI:
|
||||
case DELETE:
|
||||
|
||||
@ -354,7 +354,7 @@ public class MultiLoadMgr {
|
||||
}
|
||||
|
||||
properties.remove(LoadStmt.KEY_COMMENT);
|
||||
LoadStmt loadStmt = new LoadStmt(commitLabel, dataDescriptions, brokerDesc, null, properties, comment);
|
||||
LoadStmt loadStmt = new LoadStmt(commitLabel, dataDescriptions, brokerDesc, properties, comment);
|
||||
loadStmt.setEtlJobType(EtlJobType.BROKER);
|
||||
loadStmt.setOrigStmt(new OriginStatement("", 0));
|
||||
loadStmt.setUserInfo(ConnectContext.get().getCurrentUserIdentity());
|
||||
|
||||
@ -110,7 +110,7 @@ public class LoadStmtTest {
|
||||
}
|
||||
};
|
||||
|
||||
LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList, null, null, null, "");
|
||||
LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList, (BrokerDesc) null, null, "");
|
||||
stmt.analyze(analyzer);
|
||||
Assert.assertEquals("testCluster:testDb", stmt.getLabel().getDbName());
|
||||
Assert.assertEquals(dataDescriptionList, stmt.getDataDescriptions());
|
||||
@ -137,7 +137,7 @@ public class LoadStmtTest {
|
||||
}
|
||||
};
|
||||
|
||||
LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), null, null, null, null, "");
|
||||
LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), null, (BrokerDesc) null, null, "");
|
||||
stmt.analyze(analyzer);
|
||||
|
||||
Assert.fail("No exception throws.");
|
||||
|
||||
Reference in New Issue
Block a user