[Export] Support export job with label (#6835)

```
EXPORT TABLE xxx
...
PROPERTIES
(
    "label" = "mylabel",
    ...
);
```

And than user can use label to get the info by SHOW EXPORT stmt:
```
show export from db where label="mylabel";
```

For compatibility, if not specified, a random label will be used. And for history jobs, the label will be "export_job_id";

Not like LOAD stmt, here we specify label in `properties` because this will not cause grammatical conflicts,
and there is no need to modify the meta version of the metadata.
This commit is contained in:
Mingyu Chen
2021-10-15 10:18:11 +08:00
committed by GitHub
parent 58440b90f0
commit fcd15edbf9
14 changed files with 99 additions and 30 deletions

View File

@ -106,6 +106,7 @@ PARTITION (p1,p2)
TO "bos://bj-test-cmy/export/"
PROPERTIES
(
"label"="mylabel",
"column_separator"=",",
"exec_mem_limit"="2147483648",
"timeout" = "3600"
@ -117,6 +118,7 @@ WITH BROKER "hdfs"
);
```
* `label`: The identifier of this export job. You can use this identifier to view the job status later.
* `column_separator`: Column separator. The default is `\t`. Supports invisible characters, such as'\x07'.
* `column`: columns to be exported, separated by commas, if this parameter is not filled in, all columns of the table will be exported by default.
* `line_delimiter`: Line separator. The default is `\n`. Supports invisible characters, such as'\x07'.
@ -128,6 +130,7 @@ After submitting a job, the job status can be imported by querying the `SHOW EXP
```
JobId: 14008
Label: mylabel
State: FINISHED
Progress: 100%
TaskInfo: {"partitions":["*"],"exec mem limit":2147483648,"column separator":",","line delimiter":"\n","tablet num":1,"broker":"hdfs","coord num":1,"db":"default_cluster:db1","tbl":"tbl3"}
@ -141,6 +144,7 @@ FinishTime: 2019-06-25 17:08:34
* JobId: The unique ID of the job
* Label: Job identifier
* State: Job status:
* PENDING: Jobs to be Scheduled
* EXPORTING: Data Export

View File

@ -57,6 +57,7 @@ under the License.
[PROPERTIES ("key"="value", ...)]
The following parameters can be specified:
label: The identifier of this export job. You can use this identifier to view the job status later.
column_separator: Specifies the exported column separator, defaulting to t. Supports invisible characters, such as'\x07'.
column: Specify the columns to be exported, separated by commas. If you do not fill in this parameter, the default is to export all the columns of the table.
line_delimiter: Specifies the exported line separator, defaulting to\n. Supports invisible characters, such as'\x07'.
@ -81,8 +82,8 @@ under the License.
2. Export partitions P1 and P2 from the testTbl table to HDFS
EXPORT TABLE testTbl PARTITION (p1,p2) TO "hdfs://hdfs_host:port/a/b/c" WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");
3. Export all data in the testTbl table to hdfs, using "," as column separator
EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES ("column_separator"=",") WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");
3. Export all data in the testTbl table to hdfs, using "," as column separator, and specify label
EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES ("label" = "mylabel", "column_separator"=",") WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");
4. Export the row meet condition k1 = 1 in the testTbl table to hdfs.
EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" WHERE k1=1 WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");

View File

@ -34,6 +34,7 @@ Grammar:
WHERE
[ID = your_job_id]
[STATE = ["PENDING"|"EXPORTING"|"FINISHED"|"CANCELLED"]]
[LABEL = "your_label"]
]
[ORDER BY ...]
[LIMIT limit];
@ -57,6 +58,10 @@ Explain:
4. Show the export task of specifying dB and job_id
SHOW EXPORT FROM example_db WHERE ID = job_id;
## keyword
SHOW,EXPORT
5. Show the export task of specifying dB and label
SHOW EXPORT FROM example_db WHERE LABEL = "mylabel";
## keyword
SHOW,EXPORT

View File

@ -110,6 +110,7 @@ PARTITION (p1,p2)
TO "hdfs://host/path/to/export/"
PROPERTIES
(
"label" = "mylabel",
"column_separator"=",",
"columns":"col1,col2"
"exec_mem_limit"="2147483648",
@ -122,6 +123,7 @@ WITH BROKER "hdfs"
);
```
* `label`:本次导出作业的标识。后续可以使用这个标识查看作业状态。
* `column_separator`:列分隔符。默认为 `\t`。支持不可见字符,比如 '\x07'。
* columns:要导出的列,使用英文状态逗号隔开,如果不填这个参数默认是导出表的所有列
* `line_delimiter`:行分隔符。默认为 `\n`。支持不可见字符,比如 '\x07'。
@ -133,6 +135,7 @@ WITH BROKER "hdfs"
```
JobId: 14008
Label: mylabel
State: FINISHED
Progress: 100%
TaskInfo: {"partitions":["*"],"exec mem limit":2147483648,"column separator":",","line delimiter":"\n","tablet num":1,"broker":"hdfs","coord num":1,"db":"default_cluster:db1","tbl":"tbl3"}
@ -145,6 +148,7 @@ FinishTime: 2019-06-25 17:08:34
```
* JobId:作业的唯一 ID
* Label:自定义作业标识
* State:作业状态:
* PENDING:作业待调度
* EXPORTING:数据导出中

View File

@ -57,6 +57,7 @@ under the License.
[PROPERTIES ("key"="value", ...)]
可以指定如下参数:
label: 指定一个自定义作业标识。后续可以使用这个标识查看作业状态。
column_separator: 指定导出的列分隔符,默认为\t。支持不可见字符,比如 '\x07'。
column: 指定待导出的列,使用英文逗号隔开,如果不填这个参数默认是导出表的所有列。
line_delimiter: 指定导出的行分隔符,默认为\n。支持不可见字符,比如 '\x07'。
@ -81,8 +82,8 @@ under the License.
2. 将 testTbl 表中的分区p1,p2导出到 hdfs 上
EXPORT TABLE testTbl PARTITION (p1,p2) TO "hdfs://hdfs_host:port/a/b/c" WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");
3. 将 testTbl 表中的所有数据导出到 hdfs 上,以","作为列分隔符
EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES ("column_separator"=",") WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");
3. 将 testTbl 表中的所有数据导出到 hdfs 上,以","作为列分隔符,并指定label
EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES ("label" = "mylabel", "column_separator"=",") WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");
4. 将 testTbl 表中 k1 = 1 的行导出到 hdfs 上。
EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" WHERE k1=1 WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");

View File

@ -34,6 +34,7 @@ under the License.
WHERE
[ID = your_job_id]
[STATE = ["PENDING"|"EXPORTING"|"FINISHED"|"CANCELLED"]]
[LABEL = your_label]
]
[ORDER BY ...]
[LIMIT limit];
@ -57,6 +58,9 @@ under the License.
4. 展示指定db,指定job_id的导出任务
SHOW EXPORT FROM example_db WHERE ID = job_id;
5. 展示指定db,指定label的导出任务
SHOW EXPORT FROM example_db WHERE LABEL = "mylabel";
## keyword
SHOW,EXPORT

View File

@ -27,6 +27,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
@ -45,6 +46,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
// EXPORT statement, export data to dirs by broker.
//
@ -57,6 +59,7 @@ public class ExportStmt extends StatementBase {
private final static Logger LOG = LogManager.getLogger(ExportStmt.class);
public static final String TABLET_NUMBER_PER_TASK_PROP = "tablet_num_per_task";
public static final String LABEL = "label";
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
private static final String DEFAULT_LINE_DELIMITER = "\n";
@ -264,7 +267,7 @@ public class ExportStmt extends StatementBase {
properties, ExportStmt.DEFAULT_COLUMN_SEPARATOR));
this.lineDelimiter = Separator.convertSeparator(PropertyAnalyzer.analyzeLineDelimiter(
properties, ExportStmt.DEFAULT_LINE_DELIMITER));
this.columns = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS);
this.columns = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS);
// exec_mem_limit
if (properties.containsKey(LoadStmt.EXEC_MEM_LIMIT)) {
try {
@ -300,6 +303,14 @@ public class ExportStmt extends StatementBase {
// use session variables
properties.put(TABLET_NUMBER_PER_TASK_PROP, String.valueOf(Config.export_tablet_num_per_task));
}
if (properties.containsKey(LABEL)) {
FeNameFormat.checkLabel(properties.get(LABEL));
} else {
// generate a random label
String label = "export_" + UUID.randomUUID().toString();
properties.put(LABEL, label);
}
}
@Override

View File

@ -41,8 +41,7 @@ import java.util.List;
// SHOW EXPORT STATUS statement used to get status of load job.
//
// syntax:
// SHOW EXPORT [FROM db] [LIKE mask]
// TODO(lingbin): remove like predicate because export do not have label string
// SHOW EXPORT [FROM db] [where ...]
public class ShowExportStmt extends ShowStmt {
private static final Logger LOG = LogManager.getLogger(ShowExportStmt.class);
@ -52,6 +51,7 @@ public class ShowExportStmt extends ShowStmt {
private List<OrderByElement> orderByElements;
private long jobId = 0;
private String label = null;
private String stateValue = null;
private JobState jobState;
@ -91,6 +91,10 @@ public class ShowExportStmt extends ShowStmt {
return jobState;
}
public String getLabel() {
return label;
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
super.analyze(analyzer);
@ -131,6 +135,7 @@ public class ShowExportStmt extends ShowStmt {
boolean valid = true;
boolean hasJobId = false;
boolean hasState = false;
boolean hasLabel = false;
CHECK: {
// check predicate type
@ -155,6 +160,8 @@ public class ShowExportStmt extends ShowStmt {
hasJobId = true;
} else if (leftKey.equalsIgnoreCase("state")) {
hasState = true;
} else if (leftKey.equalsIgnoreCase("label")) {
hasLabel = true;
} else {
valid = false;
break CHECK;
@ -188,13 +195,19 @@ public class ShowExportStmt extends ShowStmt {
break CHECK;
}
jobId = ((IntLiteral) whereExpr.getChild(1)).getLongValue();
} else if (hasLabel) {
if (!(whereExpr.getChild(1) instanceof StringLiteral)) {
valid = false;
break CHECK;
}
label = ((StringLiteral) whereExpr.getChild(1)).getStringValue();
}
}
if (!valid) {
throw new AnalysisException("Where clause should looks like below: "
+ " ID = $your_job_id, or STATE = \"PENDING|EXPORTING|FINISHED|CANCELLED\"");
+ " ID = $your_job_id, or STATE = \"PENDING|EXPORTING|FINISHED|CANCELLED\", " +
"or label=\"xxx\"");
}
}

View File

@ -1761,8 +1761,7 @@ public class Catalog {
for (int i = 0; i < size; ++i) {
long jobId = dis.readLong();
newChecksum ^= jobId;
ExportJob job = new ExportJob();
job.readFields(dis);
ExportJob job = ExportJob.read(dis);
if (!job.isExpired(curTime)) {
exportMgr.unprotectAddJob(job);
}

View File

@ -29,7 +29,7 @@ import java.util.List;
// TODO(lingbin): think if need a sub node to show unfinished instances
public class ExportProcNode implements ProcNodeInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("JobId").add("State").add("Progress")
.add("JobId").add("Label").add("State").add("Progress")
.add("TaskInfo").add("Path")
.add("CreateTime").add("StartTime").add("FinishTime")
.add("Timeout").add("ErrorMsg")
@ -57,7 +57,7 @@ public class ExportProcNode implements ProcNodeInterface {
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
List<List<String>> jobInfos = exportMgr.getExportJobInfosByIdOrState(db.getId(), 0, null, null, LIMIT);
List<List<String>> jobInfos = exportMgr.getExportJobInfosByIdOrState(db.getId(), 0, "", null, null, LIMIT);
result.setRows(jobInfos);
return result;
}

View File

@ -294,8 +294,7 @@ public class JournalEntity implements Writable {
break;
}
case OperationType.OP_EXPORT_CREATE:
data = new ExportJob();
((ExportJob) data).readFields(in);
data = ExportJob.read(in);
isRead = true;
break;
case OperationType.OP_EXPORT_UPDATE_STATE:

View File

@ -115,8 +115,8 @@ public class ExportJob implements Writable {
}
private long id;
private String label;
private long dbId;
private String clusterName;
private long tableId;
private BrokerDesc brokerDesc;
private Expr whereExpr;
@ -204,6 +204,7 @@ public class ExportJob implements Writable {
this.columnSeparator = stmt.getColumnSeparator();
this.lineDelimiter = stmt.getLineDelimiter();
this.properties = stmt.getProperties();
this.label = this.properties.get(ExportStmt.LABEL);
String path = stmt.getPath();
Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
@ -673,9 +674,14 @@ public class ExportJob implements Writable {
&& (state == ExportJob.JobState.CANCELLED || state == ExportJob.JobState.FINISHED);
}
public String getLabel() {
return label;
}
@Override
public String toString() {
return "ExportJob [jobId=" + id
+ ", label=" + label
+ ", dbId=" + dbId
+ ", tableId=" + tableId
+ ", state=" + state
@ -690,6 +696,12 @@ public class ExportJob implements Writable {
+ "]";
}
public static ExportJob read(DataInput in) throws IOException {
ExportJob job = new ExportJob();
job.readFields(in);
return job;
}
@Override
public void write(DataOutput out) throws IOException {
// base infos
@ -742,7 +754,7 @@ public class ExportJob implements Writable {
}
}
public void readFields(DataInput in) throws IOException {
private void readFields(DataInput in) throws IOException {
isReplayed = true;
id = in.readLong();
dbId = in.readLong();
@ -758,7 +770,13 @@ public class ExportJob implements Writable {
String propertyValue = Text.readString(in);
this.properties.put(propertyKey, propertyValue);
}
// Because before 0.15, export does not contain label information.
// So for compatibility, a label will be added for historical jobs.
// This label must be guaranteed to be a certain value to prevent
// the label from being different each time.
properties.putIfAbsent(ExportStmt.LABEL, "export_" + id);
}
this.label = properties.get(ExportStmt.LABEL);
this.columns = this.properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS);
if (!Strings.isNullOrEmpty(this.columns)) {
Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();

View File

@ -23,12 +23,14 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
@ -51,13 +53,12 @@ public class ExportMgr {
// lock for export job
// lock is private and must use after db lock
private ReentrantReadWriteLock lock;
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private Map<Long, ExportJob> idToJob; // exportJobId to exportJob
private Map<Long, ExportJob> idToJob = Maps.newHashMap(); // exportJobId to exportJob
private Map<String, Long> labelToJobId = Maps.newHashMap();
public ExportMgr() {
idToJob = Maps.newHashMap();
lock = new ReentrantReadWriteLock(true);
}
public void readLock() {
@ -85,6 +86,9 @@ public class ExportMgr {
ExportJob job = createJob(jobId, stmt);
writeLock();
try {
if (labelToJobId.containsKey(job.getLabel())) {
throw new LabelAlreadyUsedException(job.getLabel());
}
unprotectAddJob(job);
Catalog.getCurrentCatalog().getEditLog().logExportCreate(job);
} finally {
@ -95,6 +99,7 @@ public class ExportMgr {
public void unprotectAddJob(ExportJob job) {
idToJob.put(job.getId(), job);
labelToJobId.putIfAbsent(job.getLabel(), job.getId());
}
private ExportJob createJob(long jobId, ExportStmt stmt) throws Exception {
@ -121,7 +126,7 @@ public class ExportMgr {
// NOTE: jobid and states may both specified, or only one of them, or neither
public List<List<String>> getExportJobInfosByIdOrState(
long dbId, long jobId, Set<ExportJob.JobState> states,
long dbId, long jobId, String label, Set<ExportJob.JobState> states,
ArrayList<OrderByPair> orderByPairs, long limit) {
long resultNum = limit == -1L ? Integer.MAX_VALUE : limit;
@ -132,15 +137,18 @@ public class ExportMgr {
for (ExportJob job : idToJob.values()) {
long id = job.getId();
ExportJob.JobState state = job.getState();
String jobLabel = job.getLabel();
if (job.getDbId() != dbId) {
continue;
}
if (jobId != 0) {
if (id != jobId) {
continue;
}
if (jobId != 0 && id != jobId) {
continue;
}
if (!Strings.isNullOrEmpty(label) && !jobLabel.equals(label)) {
continue;
}
// check auth
@ -172,6 +180,7 @@ public class ExportMgr {
List<Comparable> jobInfo = new ArrayList<Comparable>();
jobInfo.add(id);
jobInfo.add(jobLabel);
jobInfo.add(state.name());
jobInfo.add(job.getProgress() + "%");
@ -254,9 +263,9 @@ public class ExportMgr {
&& (job.getState() == ExportJob.JobState.CANCELLED
|| job.getState() == ExportJob.JobState.FINISHED)) {
iter.remove();
labelToJobId.remove(job.getLabel(), job.getId());
}
}
} finally {
writeUnlock();
}

View File

@ -1610,7 +1610,8 @@ public class ShowExecutor {
states = Sets.newHashSet(state);
}
List<List<String>> infos = exportMgr.getExportJobInfosByIdOrState(
dbId, showExportStmt.getJobId(), states, showExportStmt.getOrderByPairs(), showExportStmt.getLimit());
dbId, showExportStmt.getJobId(), showExportStmt.getLabel(), states,
showExportStmt.getOrderByPairs(), showExportStmt.getLimit());
resultSet = new ShowResultSet(showExportStmt.getMetaData(), infos);
}