[Feature](load) Add submitter and comments to load job (#16878)

* [Feature](load) Add submitter and comments to load job
This commit is contained in:
Zhengguo Yang
2023-02-28 09:06:19 +08:00
committed by GitHub
parent dd1bd6d8f1
commit b51ce415e7
44 changed files with 511 additions and 68 deletions

View File

@ -642,7 +642,7 @@ terminal String COMMENTED_PLAN_HINTS;
nonterminal List<StatementBase> stmts;
nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt,
create_routine_load_stmt, pause_routine_load_stmt, resume_routine_load_stmt, stop_routine_load_stmt,
show_routine_load_stmt, show_routine_load_task_stmt, show_create_routine_load_stmt,
show_routine_load_stmt, show_routine_load_task_stmt, show_create_routine_load_stmt, show_create_load_stmt,
describe_stmt, alter_stmt,
use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt,
link_stmt, migrate_stmt, switch_stmt, enter_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt,
@ -1097,6 +1097,8 @@ stmt ::=
{: RESULT = stmt; :}
| show_create_routine_load_stmt : stmt
{: RESULT = stmt; :}
| show_create_load_stmt : stmt
{: RESULT = stmt; :}
| cancel_stmt : stmt
{: RESULT = stmt; :}
| delete_stmt : stmt
@ -2270,19 +2272,21 @@ load_stmt ::=
opt_broker:broker
opt_system:system
opt_properties:properties
opt_comment:comment
{:
RESULT = new LoadStmt(label, dataDescList, broker, system, properties);
RESULT = new LoadStmt(label, dataDescList, broker, system, properties, comment);
:}
| KW_LOAD KW_LABEL job_label:label
LPAREN data_desc_list:dataDescList RPAREN
resource_desc:resource
opt_properties:properties
opt_comment:comment
{:
RESULT = new LoadStmt(label, dataDescList, resource, properties);
RESULT = new LoadStmt(label, dataDescList, resource, properties, comment);
:}
| KW_LOAD mysql_data_desc:desc opt_properties:properties
| KW_LOAD mysql_data_desc:desc opt_properties:properties opt_comment:comment
{:
RESULT = new LoadStmt(desc, properties);
RESULT = new LoadStmt(desc, properties, comment);
:}
;
@ -2594,9 +2598,10 @@ create_routine_load_stmt ::=
opt_load_property_list:loadPropertyList
opt_properties:properties
KW_FROM ident:type LPAREN key_value_map:customProperties RPAREN
opt_comment:comment
{:
RESULT = new CreateRoutineLoadStmt(jobLabel, tableName, loadPropertyList,
properties, type, customProperties, mergeType);
properties, type, customProperties, mergeType, comment);
:}
;
@ -2755,6 +2760,13 @@ show_create_routine_load_stmt ::=
:}
;
show_create_load_stmt ::=
KW_SHOW KW_CREATE KW_LOAD KW_FOR job_label:jobLabel
{:
RESULT = new ShowCreateLoadStmt(jobLabel);
:}
;
// analyze statment
analyze_stmt ::=
KW_ANALYZE KW_TABLE table_name:tbl opt_col_list:cols opt_partition_names:partitionNames opt_properties:properties

View File

@ -172,6 +172,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private boolean numAsString = false;
private boolean fuzzyParse = false;
private String comment = "";
private LoadTask.MergeType mergeType;
public static final Predicate<Long> DESIRED_CONCURRENT_NUMBER_PRED = (v) -> v > 0L;
@ -184,7 +186,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
public CreateRoutineLoadStmt(LabelName labelName, String tableName, List<ParseNode> loadPropertyList,
Map<String, String> jobProperties, String typeName,
Map<String, String> dataSourceProperties, LoadTask.MergeType mergeType) {
Map<String, String> dataSourceProperties, LoadTask.MergeType mergeType,
String comment) {
this.labelName = labelName;
this.tableName = tableName;
this.loadPropertyList = loadPropertyList;
@ -192,6 +195,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
this.typeName = typeName.toUpperCase();
this.dataSourceProperties = new RoutineLoadDataSourceProperties(this.typeName, dataSourceProperties, false);
this.mergeType = mergeType;
if (comment != null) {
this.comment = comment;
}
}
public String getName() {
@ -302,6 +308,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
return this.dataSourceProperties.isOffsetsForTimes();
}
public String getComment() {
return comment;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);

View File

@ -121,6 +121,8 @@ public class LoadStmt extends DdlStmt {
public static final String KEY_SKIP_LINES = "skip_lines";
public static final String KEY_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
public static final String KEY_COMMENT = "comment";
private final LabelName label;
private final List<DataDescription> dataDescriptions;
private final BrokerDesc brokerDesc;
@ -133,6 +135,8 @@ public class LoadStmt extends DdlStmt {
private EtlJobType etlJobType = EtlJobType.UNKNOWN;
private String comment;
public static final ImmutableMap<String, Function> PROPERTIES_MAP = new ImmutableMap.Builder<String, Function>()
.put(TIMEOUT_PROPERTY, new Function<String, Long>() {
@Override
@ -208,7 +212,7 @@ public class LoadStmt extends DdlStmt {
})
.build();
public LoadStmt(DataDescription dataDescription, Map<String, String> properties) {
public LoadStmt(DataDescription dataDescription, Map<String, String> properties, String comment) {
this.label = new LabelName();
this.dataDescriptions = Lists.newArrayList(dataDescription);
this.brokerDesc = null;
@ -217,10 +221,15 @@ public class LoadStmt extends DdlStmt {
this.properties = properties;
this.user = null;
this.isMysqlLoad = true;
if (comment != null) {
this.comment = comment;
} else {
this.comment = "";
}
}
public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
BrokerDesc brokerDesc, String cluster, Map<String, String> properties) {
BrokerDesc brokerDesc, String cluster, Map<String, String> properties, String comment) {
this.label = label;
this.dataDescriptions = dataDescriptions;
this.brokerDesc = brokerDesc;
@ -228,10 +237,15 @@ public class LoadStmt extends DdlStmt {
this.resourceDesc = null;
this.properties = properties;
this.user = null;
if (comment != null) {
this.comment = comment;
} else {
this.comment = "";
}
}
public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
ResourceDesc resourceDesc, Map<String, String> properties) {
ResourceDesc resourceDesc, Map<String, String> properties, String comment) {
this.label = label;
this.dataDescriptions = dataDescriptions;
this.brokerDesc = null;
@ -239,6 +253,11 @@ public class LoadStmt extends DdlStmt {
this.resourceDesc = resourceDesc;
this.properties = properties;
this.user = null;
if (comment != null) {
this.comment = comment;
} else {
this.comment = "";
}
}
public LabelName getLabel() {
@ -450,6 +469,10 @@ public class LoadStmt extends DdlStmt {
user = ConnectContext.get().getQualifiedUser();
}
public String getComment() {
return comment;
}
@Override
public boolean needAuditEncryption() {
if (brokerDesc != null || resourceDesc != null) {

View File

@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
// SHOW CREATE LOAD statement.
public class ShowCreateLoadStmt extends ShowStmt {
private static final ShowResultSetMetaData META_DATA =
ShowResultSetMetaData.builder()
.addColumn(new Column("JobId", ScalarType.createVarchar(128)))
.addColumn(new Column("CreateStmt", ScalarType.createVarchar(65535)))
.build();
private final LabelName labelName;
public ShowCreateLoadStmt(LabelName labelName) {
this.labelName = labelName;
}
public String getDb() {
return labelName.getDbName();
}
public String getLabel() {
return labelName.getLabelName();
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)
&& !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.OPERATOR)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN/OPERATOR");
}
labelName.analyze(analyzer);
}
@Override
public ShowResultSetMetaData getMetaData() {
return META_DATA;
}
}

View File

@ -86,6 +86,8 @@ public class ShowRoutineLoadStmt extends ShowStmt {
.add("ReasonOfStateChanged")
.add("ErrorLogUrls")
.add("OtherMsg")
.add("User")
.add("Comment")
.build();
private final LabelName labelName;

View File

@ -62,10 +62,10 @@ public class ShowStreamLoadStmt extends ShowStmt {
private ArrayList<OrderByPair> orderByPairs;
private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("Label").add("Db").add("Table").add("User")
.add("Label").add("Db").add("Table")
.add("ClientIp").add("Status").add("Message").add("Url").add("TotalRows")
.add("LoadedRows").add("FilteredRows").add("UnselectedRows").add("LoadBytes")
.add("StartTime").add("FinishTime")
.add("StartTime").add("FinishTime").add("User").add("Comment")
.build();
public ShowStreamLoadStmt(String db, Expr labelExpr,

View File

@ -35,7 +35,7 @@ public class LoadProcDir implements ProcDirInterface {
.add("JobId").add("Label").add("State").add("Progress")
.add("Type").add("EtlInfo").add("TaskInfo").add("ErrorMsg").add("CreateTime")
.add("EtlStartTime").add("EtlFinishTime").add("LoadStartTime").add("LoadFinishTime")
.add("URL").add("JobDetails").add("TransactionId").add("ErrorTablets")
.add("URL").add("JobDetails").add("TransactionId").add("ErrorTablets").add("User").add("Comment")
.build();
// label and state column index of result

View File

@ -130,7 +130,7 @@ public class MultiAction extends RestBaseController {
}
Map<String, String> properties = Maps.newHashMap();
String[] keys = {LoadStmt.TIMEOUT_PROPERTY, LoadStmt.MAX_FILTER_RATIO_PROPERTY};
String[] keys = {LoadStmt.TIMEOUT_PROPERTY, LoadStmt.MAX_FILTER_RATIO_PROPERTY, LoadStmt.KEY_COMMENT};
for (String key : keys) {
String value = request.getParameter(key);
if (!Strings.isNullOrEmpty(value)) {

View File

@ -1494,6 +1494,10 @@ public class Load {
jobInfo.add(loadJob.getTransactionId());
// error tablets(not used for hadoop load, just return an empty string)
jobInfo.add("");
// user
jobInfo.add(loadJob.getUser());
// comment
jobInfo.add(loadJob.getComment());
loadJobInfos.add(jobInfo);
} // end for loadJobs

View File

@ -29,6 +29,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.load.FailMsg.CancelType;
@ -125,6 +126,10 @@ public class LoadJob implements Writable {
private long execMemLimit;
private String user = "";
private String comment = "";
// save table names for auth check
private Set<String> tableNames = Sets.newHashSet();
@ -286,6 +291,22 @@ public class LoadJob implements Writable {
}
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getComment() {
return comment;
}
public void setComment(String comment) {
this.comment = comment;
}
public long getLoadStartTimeMs() {
return loadStartTimeMs;
}
@ -778,6 +799,8 @@ public class LoadJob implements Writable {
for (String tableName : tableNames) {
Text.writeString(out, tableName);
}
Text.writeString(out, user);
Text.writeString(out, comment);
}
public void readFields(DataInput in) throws IOException {
@ -859,15 +882,6 @@ public class LoadJob implements Writable {
resourceInfo = new TResourceInfo(user, group);
}
if (version >= 3 && version < 7) {
// CHECKSTYLE OFF
// bos 3 parameters
String bosEndpoint = Text.readString(in);
String bosAccessKey = Text.readString(in);
String bosSecretAccessKey = Text.readString(in);
// CHECKSTYLE ON
}
this.priority = TPriority.valueOf(Text.readString(in));
// Broker description
@ -913,6 +927,13 @@ public class LoadJob implements Writable {
for (int i = 0; i < size; i++) {
tableNames.add(Text.readString(in));
}
if (version >= FeMetaVersion.VERSION_117) {
this.user = Text.readString(in);
this.comment = Text.readString(in);
} else {
this.user = "";
this.comment = "";
}
}
@Override

View File

@ -41,11 +41,12 @@ public class StreamLoadRecord {
private String loadBytes;
private String startTime;
private String finishTime;
private String comment;
public StreamLoadRecord(String label, String db, String table, String user, String clientIp, String status,
public StreamLoadRecord(String label, String db, String table, String clientIp, String status,
String message, String url, String totalRows, String loadedRows, String filteredRows, String unselectedRows,
String loadBytes, String startTime, String finishTime) {
String loadBytes, String startTime, String finishTime, String user, String comment) {
this.label = label;
this.db = db;
this.table = table;
@ -61,6 +62,7 @@ public class StreamLoadRecord {
this.loadBytes = loadBytes;
this.startTime = startTime;
this.finishTime = finishTime;
this.comment = comment;
}
public List<Comparable> getStreamLoadInfo() {
@ -68,7 +70,6 @@ public class StreamLoadRecord {
streamLoadInfo.add(this.label);
streamLoadInfo.add(this.db);
streamLoadInfo.add(this.table);
streamLoadInfo.add(this.user);
streamLoadInfo.add(this.clientIp);
streamLoadInfo.add(this.status);
streamLoadInfo.add(this.message);
@ -80,6 +81,8 @@ public class StreamLoadRecord {
streamLoadInfo.add(this.loadBytes);
streamLoadInfo.add(this.startTime);
streamLoadInfo.add(this.finishTime);
streamLoadInfo.add(this.user);
streamLoadInfo.add(this.comment);
return streamLoadInfo;
}

View File

@ -287,13 +287,14 @@ public class StreamLoadRecordMgr extends MasterDaemon {
}
StreamLoadRecord streamLoadRecord =
new StreamLoadRecord(streamLoadItem.getLabel(), streamLoadItem.getDb(),
streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(),
streamLoadItem.getTbl(), streamLoadItem.getUserIp(),
streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(),
String.valueOf(streamLoadItem.getTotalRows()),
String.valueOf(streamLoadItem.getLoadedRows()),
String.valueOf(streamLoadItem.getFilteredRows()),
String.valueOf(streamLoadItem.getUnselectedRows()),
String.valueOf(streamLoadItem.getLoadBytes()), startTime, finishTime);
String.valueOf(streamLoadItem.getLoadBytes()),
startTime, finishTime, streamLoadItem.getUser(), streamLoadItem.getComment());
String cluster = streamLoadItem.getCluster();
if (Strings.isNullOrEmpty(cluster)) {

View File

@ -29,6 +29,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DebugUtil;
@ -82,8 +83,6 @@ public abstract class BulkLoadJob extends LoadJob {
// the expr of columns will be reanalyze when the log is replayed
private OriginStatement originStmt;
private UserIdentity userInfo;
// include broker desc and data desc
protected BrokerFileGroupAggInfo fileGroupAggInfo = new BrokerFileGroupAggInfo();
protected List<TabletCommitInfo> commitInfos = Lists.newArrayList();
@ -138,6 +137,7 @@ public abstract class BulkLoadJob extends LoadJob {
default:
throw new DdlException("Unknown load job type.");
}
bulkLoadJob.setComment(stmt.getComment());
bulkLoadJob.setJobProperties(stmt.getProperties());
bulkLoadJob.checkAndSetDataSourceInfo((Database) db, stmt.getDataDescriptions());
return bulkLoadJob;
@ -297,7 +297,6 @@ public abstract class BulkLoadJob extends LoadJob {
super.write(out);
brokerDesc.write(out);
originStmt.write(out);
userInfo.write(out);
out.writeInt(sessionVariables.size());
for (Map.Entry<String, String> entry : sessionVariables.entrySet()) {
@ -315,12 +314,14 @@ public abstract class BulkLoadJob extends LoadJob {
brokerDesc = BrokerDesc.read(in);
originStmt = OriginStatement.read(in);
// The origin stmt does not be analyzed in here.
// The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName.
// The reason is that it will throw MetaNotFoundException when the tableId could not be found by tableName.
// The origin stmt will be analyzed after the replay is completed.
userInfo = UserIdentity.read(in);
// must set is as analyzed, because when write the user info to meta image, it will be checked.
userInfo.setIsAnalyzed();
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_117) {
userInfo = UserIdentity.read(in);
// must set is as analyzed, because when write the user info to meta image, it will be checked.
userInfo.setIsAnalyzed();
}
int size = in.readInt();
for (int i = 0; i < size; i++) {
String key = Text.readString(in);

View File

@ -17,6 +17,7 @@
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@ -50,7 +51,8 @@ public class InsertLoadJob extends LoadJob {
}
public InsertLoadJob(String label, long transactionId, long dbId, long tableId,
long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException {
long createTimestamp, String failMsg, String trackingUrl,
UserIdentity userInfo) throws MetaNotFoundException {
super(EtlJobType.INSERT, dbId, label);
this.tableId = tableId;
this.transactionId = transactionId;
@ -67,6 +69,7 @@ public class InsertLoadJob extends LoadJob {
}
this.authorizationInfo = gatherAuthInfo();
this.loadingStatus.setTrackingUrl(trackingUrl);
this.userInfo = userInfo;
}
public AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException {

View File

@ -18,6 +18,7 @@
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@ -28,6 +29,7 @@ import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
@ -130,6 +132,11 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
protected List<ErrorTabletInfo> errorTabletInfos = Lists.newArrayList();
protected UserIdentity userInfo;
protected String comment = "";
public static class LoadStatistic {
// number of rows processed on BE, this number will be updated periodically by query report.
// A load job may has several load tasks(queries), and each task has several fragments.
@ -379,6 +386,22 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
}
}
public UserIdentity getUserInfo() {
return userInfo;
}
public void setUserInfo(UserIdentity userInfo) {
this.userInfo = userInfo;
}
public String getComment() {
return comment;
}
public void setComment(String comment) {
this.comment = comment;
}
private void initDefaultJobProperties() {
long timeout = Config.broker_load_default_timeout_second;
switch (jobType) {
@ -789,6 +812,10 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
jobInfo.add(transactionId);
// error tablets
jobInfo.add(errorTabletsToJson());
// user
jobInfo.add(userInfo.getQualifiedUser());
// comment
jobInfo.add(comment);
return jobInfo;
} finally {
readUnlock();
@ -1034,6 +1061,13 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
Text.writeString(out, entry.getKey());
Text.writeString(out, String.valueOf(entry.getValue()));
}
if (userInfo == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
userInfo.write(out);
}
Text.writeString(out, comment);
}
public void readFields(DataInput in) throws IOException {
@ -1077,6 +1111,14 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
// should not happen
throw new IOException("failed to replay job property", e);
}
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_117) {
if (in.readBoolean()) {
userInfo = UserIdentity.read(in);
// must set is as analyzed, because when write the user info to meta image, it will be checked.
userInfo.setIsAnalyzed();
}
comment = Text.readString(in);
}
}
public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) {

View File

@ -21,6 +21,7 @@ import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CleanLabelStmt;
import org.apache.doris.analysis.CompoundPredicate.Operator;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
@ -31,6 +32,7 @@ import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.UserException;
@ -42,6 +44,7 @@ import org.apache.doris.load.FailMsg;
import org.apache.doris.load.FailMsg.CancelType;
import org.apache.doris.load.Load;
import org.apache.doris.persist.CleanLabelOperationLog;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.TransactionState;
@ -58,6 +61,8 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
@ -200,7 +205,8 @@ public class LoadManager implements Writable {
* Record finished load job by editLog.
**/
public void recordFinishedLoadJob(String label, long transactionId, String dbName, long tableId, EtlJobType jobType,
long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException {
long createTimestamp, String failMsg, String trackingUrl,
UserIdentity userInfo) throws MetaNotFoundException {
// get db id
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbName);
@ -209,7 +215,7 @@ public class LoadManager implements Writable {
switch (jobType) {
case INSERT:
loadJob = new InsertLoadJob(label, transactionId, db.getId(), tableId, createTimestamp, failMsg,
trackingUrl);
trackingUrl, userInfo);
break;
default:
return;
@ -423,6 +429,39 @@ public class LoadManager implements Writable {
});
}
public List<Pair<Long, String>> getCreateLoadStmt(long dbId, String label) throws DdlException {
List<Pair<Long, String>> result = new ArrayList<>();
readLock();
try {
if (dbIdToLabelToLoadJobs.containsKey(dbId)) {
Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId);
if (labelToLoadJobs.containsKey(label)) {
List<LoadJob> labelLoadJobs = labelToLoadJobs.get(label);
for (LoadJob job : labelLoadJobs) {
try {
Method getOriginStmt = job.getClass().getMethod("getOriginStmt");
if (getOriginStmt != null) {
result.add(
Pair.of(job.getId(), ((OriginStatement) getOriginStmt.invoke(job)).originStmt));
} else {
throw new DdlException("Not support load job type: " + job.getClass().getName());
}
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
throw new DdlException("Not support load job type: " + job.getClass().getName());
}
}
} else {
throw new DdlException("Label does not exist: " + label);
}
} else {
throw new DdlException("Database does not exist");
}
return result;
} finally {
readUnlock();
}
}
/**
* This method will return the jobs info which can meet the condition of input param.
*

View File

@ -232,6 +232,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
// User who submit this job. Maybe null for the old version job(before v1.1)
protected UserIdentity userIdentity;
protected String comment = "";
protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
protected LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; // default is all data is load no delete
protected Expr deleteCondition;
@ -618,6 +620,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
return !Strings.isNullOrEmpty(sequenceCol);
}
public void setComment(String comment) {
this.comment = comment;
}
public int getSizeOfRoutineLoadTaskInfoList() {
readLock();
try {
@ -1290,7 +1296,6 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
Optional<Database> database = Env.getCurrentInternalCatalog().getDb(dbId);
Optional<Table> table = database.flatMap(db -> db.getTable(tableId));
readLock();
try {
List<String> row = Lists.newArrayList();
@ -1322,6 +1327,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
row.add(Joiner.on(", ").join(errorLogUrls));
row.add(otherMsg);
row.add(userIdentity.getQualifiedUser());
row.add(comment);
return row;
} finally {
readUnlock();
@ -1562,6 +1569,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
out.writeBoolean(true);
userIdentity.write(out);
}
Text.writeString(out, comment);
}
public void readFields(DataInput in) throws IOException {
@ -1646,6 +1654,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
userIdentity = null;
}
}
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_117) {
comment = Text.readString(in);
} else {
comment = null;
}
}
public abstract void modifyProperties(AlterRoutineLoadStmt stmt) throws UserException;

View File

@ -154,6 +154,7 @@ public class RoutineLoadManager implements Writable {
}
routineLoadJob.setOrigStmt(createRoutineLoadStmt.getOrigStmt());
routineLoadJob.setComment(createRoutineLoadStmt.getComment());
addRoutineLoadJob(routineLoadJob, createRoutineLoadStmt.getDBName());
}

View File

@ -346,7 +346,15 @@ public class MultiLoadMgr {
Map<String, String> brokerProperties = Maps.newHashMap();
brokerProperties.put(BrokerDesc.MULTI_LOAD_BROKER_BACKEND_KEY, backendId.toString());
BrokerDesc brokerDesc = new BrokerDesc(BrokerDesc.MULTI_LOAD_BROKER, brokerProperties);
LoadStmt loadStmt = new LoadStmt(commitLabel, dataDescriptions, brokerDesc, null, properties);
String comment = "multi load";
if (properties.containsKey(LoadStmt.KEY_COMMENT)) {
comment = properties.get(LoadStmt.KEY_COMMENT);
properties.remove(LoadStmt.KEY_COMMENT);
}
properties.remove(LoadStmt.KEY_COMMENT);
LoadStmt loadStmt = new LoadStmt(commitLabel, dataDescriptions, brokerDesc, null, properties, comment);
loadStmt.setEtlJobType(EtlJobType.BROKER);
loadStmt.setOrigStmt(new OriginStatement("", 0));
loadStmt.setUserInfo(ConnectContext.get().getCurrentUserIdentity());

View File

@ -41,6 +41,7 @@ import org.apache.doris.analysis.ShowColumnStmt;
import org.apache.doris.analysis.ShowCreateCatalogStmt;
import org.apache.doris.analysis.ShowCreateDbStmt;
import org.apache.doris.analysis.ShowCreateFunctionStmt;
import org.apache.doris.analysis.ShowCreateLoadStmt;
import org.apache.doris.analysis.ShowCreateMaterializedViewStmt;
import org.apache.doris.analysis.ShowCreateRoutineLoadStmt;
import org.apache.doris.analysis.ShowCreateTableStmt;
@ -287,7 +288,9 @@ public class ShowExecutor {
handleShowRoutineLoadTask();
} else if (stmt instanceof ShowCreateRoutineLoadStmt) {
handleShowCreateRoutineLoad();
} else if (stmt instanceof ShowDeleteStmt) {
} else if (stmt instanceof ShowCreateLoadStmt) {
handleShowCreateLoad();
} else if (stmt instanceof ShowDeleteStmt) {
handleShowDelete();
} else if (stmt instanceof ShowAlterStmt) {
handleShowAlter();
@ -2125,6 +2128,26 @@ public class ShowExecutor {
resultSet = new ShowResultSet(showCreateRoutineLoadStmt.getMetaData(), rows);
}
private void handleShowCreateLoad() throws AnalysisException {
ShowCreateLoadStmt showCreateLoadStmt = (ShowCreateLoadStmt) stmt;
List<List<String>> rows = Lists.newArrayList();
String labelName = showCreateLoadStmt.getLabel();
Util.prohibitExternalCatalog(ctx.getDefaultCatalog(), stmt.getClass().getSimpleName());
Env env = ctx.getEnv();
DatabaseIf db = ctx.getCurrentCatalog().getDbOrAnalysisException(showCreateLoadStmt.getDb());
long dbId = db.getId();
try {
List<Pair<Long, String>> result = env.getLoadManager().getCreateLoadStmt(dbId, labelName);
rows.addAll(result.stream().map(pair -> Lists.newArrayList(String.valueOf(pair.first), pair.second))
.collect(Collectors.toList()));
} catch (DdlException e) {
LOG.warn(e.getMessage(), e);
throw new AnalysisException(e.getMessage());
}
resultSet = new ShowResultSet(showCreateLoadStmt.getMetaData(), rows);
}
private void handleShowDataSkew() throws AnalysisException {
ShowDataSkewStmt showStmt = (ShowDataSkewStmt) stmt;
try {

View File

@ -1664,7 +1664,7 @@ public class StmtExecutor implements ProfileWriter {
context.getEnv().getLoadManager()
.recordFinishedLoadJob(label, txnId, insertStmt.getDb(), insertStmt.getTargetTable().getId(),
EtlJobType.INSERT, createTime, throwable == null ? "" : throwable.getMessage(),
coord.getTrackingUrl());
coord.getTrackingUrl(), insertStmt.getUserInfo());
} catch (MetaNotFoundException e) {
LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
errMsg = "Record info of insert load with error " + e.getMessage();

View File

@ -41,6 +41,7 @@ public class CancelLoadStmtTest extends TestWithFeService {
private Analyzer analyzer;
private String dbName = "testDb";
private String tblName = "table1";
private UserIdentity userInfo = new UserIdentity("root", "localhost");
@Override
protected void runBeforeAll() throws Exception {
@ -110,11 +111,11 @@ public class CancelLoadStmtTest extends TestWithFeService {
long dbId = db.getId();
Table tbl = db.getTableNullable(tblName);
long tblId = tbl.getId();
InsertLoadJob insertLoadJob1 = new InsertLoadJob("doris_test_label", 1L, dbId, tblId, 0, "", "");
InsertLoadJob insertLoadJob1 = new InsertLoadJob("doris_test_label", 1L, dbId, tblId, 0, "", "", userInfo);
loadJobs.add(insertLoadJob1);
InsertLoadJob insertLoadJob2 = new InsertLoadJob("doris_test_label_1", 2L, dbId, tblId, 0, "", "");
InsertLoadJob insertLoadJob2 = new InsertLoadJob("doris_test_label_1", 2L, dbId, tblId, 0, "", "", userInfo);
loadJobs.add(insertLoadJob2);
InsertLoadJob insertLoadJob3 = new InsertLoadJob("doris_test_label_2", 3L, dbId, tblId, 0, "", "");
InsertLoadJob insertLoadJob3 = new InsertLoadJob("doris_test_label_2", 3L, dbId, tblId, 0, "", "", userInfo);
loadJobs.add(insertLoadJob3);
// label
stmt = new CancelLoadStmt(null, labelBinaryPredicate);

View File

@ -123,7 +123,7 @@ public class CreateRoutineLoadStmtTest {
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString,
loadPropertyList, properties,
typeName, customProperties,
LoadTask.MergeType.APPEND);
LoadTask.MergeType.APPEND, "");
new MockUp<StatementBase>() {
@Mock
@ -173,7 +173,7 @@ public class CreateRoutineLoadStmtTest {
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString,
loadPropertyList, properties,
typeName, customProperties,
LoadTask.MergeType.APPEND);
LoadTask.MergeType.APPEND, "");
new MockUp<StatementBase>() {
@Mock
public void analyze(Analyzer analyzer1) {

View File

@ -110,7 +110,7 @@ public class LoadStmtTest {
}
};
LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList, null, null, null);
LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList, null, null, null, "");
stmt.analyze(analyzer);
Assert.assertEquals("testCluster:testDb", stmt.getLabel().getDbName());
Assert.assertEquals(dataDescriptionList, stmt.getDataDescriptions());
@ -121,7 +121,7 @@ public class LoadStmtTest {
// test ResourceDesc
stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList,
new ResourceDesc(resourceName, null), null);
new ResourceDesc(resourceName, null), null, "");
stmt.analyze(analyzer);
Assert.assertEquals(EtlJobType.SPARK, stmt.getResourceDesc().getEtlJobType());
Assert.assertEquals("LOAD LABEL `testCluster:testDb`.`testLabel`\n(XXX)\nWITH RESOURCE 'spark0'",
@ -137,7 +137,7 @@ public class LoadStmtTest {
}
};
LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), null, null, null, null);
LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), null, null, null, null, "");
stmt.analyze(analyzer);
Assert.fail("No exception throws.");
@ -220,7 +220,7 @@ public class LoadStmtTest {
}
};
LoadStmt stmt = new LoadStmt(desc, Maps.newHashMap());
LoadStmt stmt = new LoadStmt(desc, Maps.newHashMap(), "");
try {
stmt.analyze(analyzer);
} catch (AnalysisException ae) {

View File

@ -17,6 +17,7 @@
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
@ -38,7 +39,8 @@ public class InsertLoadJobTest {
@Test
public void testGetTableNames(@Mocked Env env, @Mocked InternalCatalog catalog, @Injectable Database database,
@Injectable Table table) throws MetaNotFoundException {
InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1L, 1000, "", "");
UserIdentity userInfo = new UserIdentity("root", "localhost");
InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1L, 1000, "", "", userInfo);
String tableName = "table1";
new Expectations() {
{

View File

@ -17,6 +17,7 @@
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
@ -44,6 +45,7 @@ import java.util.Map;
public class LoadManagerTest {
private LoadManager loadManager;
private final String fieldName = "idToLoadJob";
private UserIdentity userInfo = UserIdentity.createAnalyzedUserIdentWithIp("root", "localhost");
@Before
public void setUp() throws Exception {
@ -82,7 +84,7 @@ public class LoadManagerTest {
};
loadManager = new LoadManager(new LoadJobScheduler());
LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, System.currentTimeMillis(), "", "");
LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, System.currentTimeMillis(), "", "", userInfo);
Deencapsulation.invoke(loadManager, "addLoadJob", job1);
File file = serializeToFile(loadManager);
@ -118,7 +120,7 @@ public class LoadManagerTest {
};
loadManager = new LoadManager(new LoadJobScheduler());
LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, System.currentTimeMillis(), "", "");
LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, System.currentTimeMillis(), "", "", userInfo);
Deencapsulation.invoke(loadManager, "addLoadJob", job1);
//make job1 don't serialize

View File

@ -316,7 +316,7 @@ public class KafkaRoutineLoadJobTest {
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString,
loadPropertyList, properties,
typeName, customProperties,
LoadTask.MergeType.APPEND);
LoadTask.MergeType.APPEND, "");
Deencapsulation.setField(createRoutineLoadStmt, "name", jobName);
return createRoutineLoadStmt;
}

View File

@ -179,12 +179,21 @@ public class RoutineLoadJobTest {
}
@Test
public void testGetShowInfo(@Mocked KafkaProgress kafkaProgress) {
public void testGetShowInfo(@Mocked KafkaProgress kafkaProgress, @Injectable UserIdentity userIdentity) {
new Expectations() {
{
userIdentity.getQualifiedUser();
minTimes = 0;
result = "root";
}
};
RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED);
ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR, TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString());
ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR,
TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString());
Deencapsulation.setField(routineLoadJob, "pauseReason", errorReason);
Deencapsulation.setField(routineLoadJob, "progress", kafkaProgress);
Deencapsulation.setField(routineLoadJob, "userIdentity", userIdentity);
List<String> showInfo = routineLoadJob.getShowInfo();
Assert.assertEquals(true, showInfo.stream().filter(entity -> !Strings.isNullOrEmpty(entity))

View File

@ -97,7 +97,7 @@ public class RoutineLoadManagerTest {
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString,
loadPropertyList, properties,
typeName, customProperties,
LoadTask.MergeType.APPEND);
LoadTask.MergeType.APPEND, "");
createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0));
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
@ -166,7 +166,7 @@ public class RoutineLoadManagerTest {
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString,
loadPropertyList, properties,
typeName, customProperties,
LoadTask.MergeType.APPEND);
LoadTask.MergeType.APPEND, "");
createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0));