Add a loaded rows in SHOW LOAD result (#1686)

Loaded rows will be updated periodically by query report. So that
user can see that a load job is still running or being blocked.
This commit is contained in:
Mingyu Chen
2019-08-27 14:13:47 +08:00
committed by ZHAO Chun
parent 58801c6ab0
commit a1b92768dd
14 changed files with 156 additions and 70 deletions

View File

@ -115,9 +115,9 @@ Status GzipDecompressor::decompress(
*input_bytes_read = input_len - _z_strm.avail_in;
*decompressed_len = output_max_len - _z_strm.avail_out;
LOG(INFO) << "gzip dec ret: " << ret
<< " input_bytes_read: " << *input_bytes_read
<< " decompressed_len: " << *decompressed_len;
VLOG(10) << "gzip dec ret: " << ret
<< " input_bytes_read: " << *input_bytes_read
<< " decompressed_len: " << *decompressed_len;
if (ret == Z_BUF_ERROR) {
// Z_BUF_ERROR indicates that inflate() could not consume more input or

View File

@ -548,6 +548,9 @@ Status OlapTableSink::open(RuntimeState* state) {
Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) {
SCOPED_TIMER(_profile->total_time_counter());
_number_input_rows += input_batch->num_rows();
// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
state->update_num_rows_load_total(input_batch->num_rows());
RowBatch* batch = input_batch;
if (!_output_expr_ctxs.empty()) {
SCOPED_RAW_TIMER(&_convert_batch_ns);
@ -620,7 +623,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns);
// _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + state->num_rows_load_unselected();
state->update_num_rows_load_total(num_rows_load_total);
state->set_num_rows_load_total(num_rows_load_total);
state->update_num_rows_load_filtered(_number_filtered_rows);
// print log of add batch time of all node, for tracing load performance easily

View File

@ -263,48 +263,57 @@ void FragmentExecState::coordinator_callback(
params.__set_fragment_instance_id(_fragment_instance_id);
exec_status.set_t_status(&params);
params.__set_done(done);
profile->to_thrift(&params.profile);
params.__isset.profile = true;
RuntimeState* runtime_state = _executor.runtime_state();
if (!runtime_state->output_files().empty()) {
params.__isset.delta_urls = true;
for (auto& it : runtime_state->output_files()) {
params.delta_urls.push_back(to_http_path(it));
}
}
if (runtime_state->num_rows_load_total() > 0 ||
runtime_state->num_rows_load_filtered() > 0) {
params.__isset.load_counters = true;
// TODO(zc)
static std::string s_dpp_normal_all = "dpp.norm.ALL";
static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
params.load_counters.emplace(
s_dpp_normal_all, std::to_string(runtime_state->num_rows_load_success()));
params.load_counters.emplace(
s_dpp_abnormal_all, std::to_string(runtime_state->num_rows_load_filtered()));
}
if (!runtime_state->get_error_log_file_path().empty()) {
params.__set_tracking_url(
to_load_error_http_path(runtime_state->get_error_log_file_path()));
}
if (!runtime_state->export_output_files().empty()) {
params.__isset.export_files = true;
params.export_files = runtime_state->export_output_files();
}
if (!runtime_state->tablet_commit_infos().empty()) {
params.__isset.commitInfos = true;
params.commitInfos.reserve(runtime_state->tablet_commit_infos().size());
for (auto& info : runtime_state->tablet_commit_infos()) {
params.commitInfos.push_back(info);
}
}
DCHECK(runtime_state != NULL);
// Send new errors to coordinator
runtime_state->get_unreported_errors(&(params.error_log));
params.__isset.error_log = (params.error_log.size() > 0);
if (runtime_state->query_options().query_type == TQueryType::LOAD && !done && status.ok()) {
// this is a load plan, and load is not finished, just make a brief report
params.__set_loaded_rows(runtime_state->num_rows_load_total());
} else {
if (runtime_state->query_options().query_type == TQueryType::LOAD) {
params.__set_loaded_rows(runtime_state->num_rows_load_total());
}
profile->to_thrift(&params.profile);
params.__isset.profile = true;
if (!runtime_state->output_files().empty()) {
params.__isset.delta_urls = true;
for (auto& it : runtime_state->output_files()) {
params.delta_urls.push_back(to_http_path(it));
}
}
if (runtime_state->num_rows_load_total() > 0 ||
runtime_state->num_rows_load_filtered() > 0) {
params.__isset.load_counters = true;
// TODO(zc)
static std::string s_dpp_normal_all = "dpp.norm.ALL";
static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
params.load_counters.emplace(
s_dpp_normal_all, std::to_string(runtime_state->num_rows_load_success()));
params.load_counters.emplace(
s_dpp_abnormal_all, std::to_string(runtime_state->num_rows_load_filtered()));
}
if (!runtime_state->get_error_log_file_path().empty()) {
params.__set_tracking_url(
to_load_error_http_path(runtime_state->get_error_log_file_path()));
}
if (!runtime_state->export_output_files().empty()) {
params.__isset.export_files = true;
params.export_files = runtime_state->export_output_files();
}
if (!runtime_state->tablet_commit_infos().empty()) {
params.__isset.commitInfos = true;
params.commitInfos.reserve(runtime_state->tablet_commit_infos().size());
for (auto& info : runtime_state->tablet_commit_infos()) {
params.commitInfos.push_back(info);
}
}
// Send new errors to coordinator
runtime_state->get_unreported_errors(&(params.error_log));
params.__isset.error_log = (params.error_log.size() > 0);
}
TReportExecStatusResult res;
Status rpc_status;

View File

@ -444,14 +444,6 @@ void PlanFragmentExecutor::send_report(bool done) {
return;
}
// If this is a load plan, and it is not finished, and it still running ok,
// no need to report it.
// This is case for the case that the load plan's _is_report_success is always true,
// but we only need the last report when plan is done.
if (_runtime_state->query_options().query_type == TQueryType::LOAD && !done && status.ok()) {
return;
}
// This will send a report even if we are cancelled. If the query completed correctly
// but fragments still need to be cancelled (e.g. limit reached), the coordinator will
// be waiting for a final report and profile.

View File

@ -432,6 +432,10 @@ public:
_num_rows_load_total.fetch_add(num_rows);
}
void set_num_rows_load_total(int64_t num_rows) {
_num_rows_load_total.store(num_rows);
}
void update_num_rows_load_filtered(int64_t num_rows) {
_num_rows_load_filtered.fetch_add(num_rows);
}

View File

@ -255,6 +255,7 @@ mysql> show load order by createtime desc limit 1\G
LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://192.168.1.1:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e2725415317
LoadedRows: 82393000
```
下面主要介绍了查看导入命令返回结果集中参数意义:
@ -288,6 +289,7 @@ LoadFinishTime: 2019-07-27 11:50:16
+ Type
导入任务的类型。Broker load 的 type 取值只有 BROKER。
+ EtlInfo
主要显示了导入的数据量指标 ```dpp.norm.ALL 和 dpp.abnorm.ALL```。用户可以根据这两个指标验证当前导入任务的错误率是否超过 max\_filter\_ratio。
@ -328,6 +330,14 @@ LoadFinishTime: 2019-07-27 11:50:16
导入任务的错误数据样例,访问 URL 地址既可获取本次导入的错误数据样例。当本次导入不存在错误数据时,URL 字段则为 N/A。
+ JobDetails
显示一些作业的详细运行状态。包括导入文件的个数、总大小(字节)、子任务个数、已处理的行数等。
```{"LoadedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}```
其中已处理的行数,每 5 秒更新一次。该行数仅用于展示当前的进度,不代表最终实际的处理行数。实际处理行数以 EtlInfo 中显示的为准。
### 取消导入
当 Broker load 作业状态不为 CANCELLED 或 FINISHED 时,可以被用户手动取消。取消时需要指定待取消导入任务的 Label 。取消导入命令语法可执行 ```HELP CANCEL LOAD```查看。

View File

@ -35,7 +35,7 @@ public class LoadProcDir implements ProcDirInterface {
.add("JobId").add("Label").add("State").add("Progress")
.add("Type").add("EtlInfo").add("TaskInfo").add("ErrorMsg").add("CreateTime")
.add("EtlStartTime").add("EtlFinishTime").add("LoadStartTime").add("LoadFinishTime")
.add("URL")
.add("URL").add("JobDetails")
.build();
// label and state column index of result

View File

@ -63,6 +63,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
/**
* There are 3 steps in BrokerLoadJob: BrokerPendingTask, LoadLoadingTask, CommitAndPublishTxn.
@ -237,8 +238,14 @@ public class BrokerLoadJob extends LoadJob {
} else {
// retry task
idToTasks.remove(loadTask.getSignature());
if (loadTask instanceof LoadLoadingTask) {
loadStatistic.numLoadedRowsMap.remove(((LoadLoadingTask) loadTask).getLoadId());
}
loadTask.updateRetryInfo();
idToTasks.put(loadTask.getSignature(), loadTask);
if (loadTask instanceof LoadLoadingTask) {
loadStatistic.numLoadedRowsMap.put(((LoadLoadingTask) loadTask).getLoadId(), new AtomicLong(0));
}
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
return;
}
@ -356,6 +363,7 @@ public class BrokerLoadJob extends LoadJob {
attachment.getFileNumByTable(tableId));
// Add tasks into list and pool
idToTasks.put(task.getSignature(), task);
loadStatistic.numLoadedRowsMap.put(loadId, new AtomicLong(0));
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task);
}
} finally {

View File

@ -90,6 +90,7 @@ public class BrokerLoadPendingTask extends LoadTask {
groupNum++;
}
((BrokerLoadJob) callback).setLoadFileInfo(totalFileNum, totalFileSize);
((BrokerPendingTaskAttachment) attachment).addFileStatus(tableId, fileStatusList);
LOG.info("get {} files to be loaded. total size: {}. cost: {} ms, job: {}",
totalFileNum, totalFileSize, (System.currentTimeMillis() - start), callback.getCallbackId());

View File

@ -58,6 +58,7 @@ import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -68,6 +69,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public abstract class LoadJob extends AbstractTxnStateChangeCallback implements LoadTaskCallback, Writable {
@ -123,6 +125,32 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
// this request id is only used for checking if a load begin request is a duplicate request.
protected TUniqueId requestId;
protected LoadStatistic loadStatistic = new LoadStatistic();
public static class LoadStatistic {
// number of rows processed on BE, this number will be updated periodically by query report.
// A load job may has several load tasks, so the map key is load task's plan load id.
public Map<TUniqueId, AtomicLong> numLoadedRowsMap = Maps.newConcurrentMap();
// number of file to be loaded
public int fileNum = 0;
public long totalFileSizeB = 0;
public String toJson() {
long total = 0;
for (AtomicLong atomicLong : numLoadedRowsMap.values()) {
total += atomicLong.get();
}
Map<String, Object> details = Maps.newHashMap();
details.put("LoadedRows", total);
details.put("FileNumber", fileNum);
details.put("FileSize", totalFileSizeB);
details.put("TaskNumber", numLoadedRowsMap.size());
Gson gson = new Gson();
return gson.toJson(details);
}
}
// only for log replay
public LoadJob() {
}
@ -194,6 +222,18 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
return transactionId;
}
public void updateLoadedRows(TUniqueId loadId, long loadedRows) {
AtomicLong atomicLong = loadStatistic.numLoadedRowsMap.get(loadId);
if (atomicLong != null) {
atomicLong.set(loadedRows);
}
}
public void setLoadFileInfo(int fileNum, long fileSize) {
this.loadStatistic.fileNum = fileNum;
this.loadStatistic.totalFileSizeB = fileSize;
}
public TUniqueId getRequestId() {
return requestId;
}
@ -474,6 +514,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
}
}
idToTasks.clear();
loadStatistic.numLoadedRowsMap.clear();
// set failMsg and state
this.failMsg = failMsg;
@ -607,6 +648,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
jobInfo.add(TimeUtils.longToTimeString(finishTimestamp));
// tracking url
jobInfo.add(loadingStatus.getTrackingUrl());
jobInfo.add(loadStatistic.toJson());
return jobInfo;
} finally {
readUnlock();

View File

@ -545,16 +545,6 @@ public class LoadManager implements Writable{
lock.writeLock().unlock();
}
@Override
public void write(DataOutput out) throws IOException {
List<LoadJob> loadJobs = idToLoadJob.values().stream().filter(this::needSave).collect(Collectors.toList());
out.writeInt(loadJobs.size());
for (LoadJob loadJob : loadJobs) {
loadJob.write(out);
}
}
// If load job will be removed by cleaner later, it will not be saved in image.
private boolean needSave(LoadJob loadJob) {
if (!loadJob.isCompleted()) {
@ -569,6 +559,23 @@ public class LoadManager implements Writable{
return false;
}
public void updateJobLoadedRows(Long jobId, TUniqueId loadId, long loadedRows) {
LoadJob job = idToLoadJob.get(jobId);
if (job != null) {
job.updateLoadedRows(loadId, loadedRows);
}
}
@Override
public void write(DataOutput out) throws IOException {
List<LoadJob> loadJobs = idToLoadJob.values().stream().filter(this::needSave).collect(Collectors.toList());
out.writeInt(loadJobs.size());
for (LoadJob loadJob : loadJobs) {
loadJob.write(out);
}
}
@Override
public void readFields(DataInput in) throws IOException {
int size = in.readInt();

View File

@ -45,8 +45,8 @@ import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ResultSink;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.proto.PPlanFragmentCancelReason;
import org.apache.doris.proto.PExecPlanFragmentResult;
import org.apache.doris.proto.PPlanFragmentCancelReason;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
@ -210,7 +210,7 @@ public class Coordinator {
nextInstanceId.setLo(queryId.lo + 1);
}
// Used for pull load task coordinator
// Used for broker load task/export task coordinator
public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable,
List<PlanFragment> fragments, List<ScanNode> scanNodes, String cluster) {
this.isBlockQuery = true;
@ -1105,7 +1105,9 @@ public class Coordinator {
// duplicate packet
return;
}
execState.profile.update(params.profile);
if (params.isSetProfile()) {
execState.profile.update(params.profile);
}
done = params.done;
execState.done = params.done;
} finally {
@ -1148,6 +1150,10 @@ public class Coordinator {
updateCommitInfos(params.getCommitInfos());
}
profileDoneSignal.markedCountDown(params.getFragment_instance_id(), -1L);
}
if (params.isSetLoaded_rows()) {
Catalog.getCurrentCatalog().getLoadManager().updateJobLoadedRows(jobId, params.query_id, params.loaded_rows);
}
return;

View File

@ -105,15 +105,17 @@ public final class QeProcessorImpl implements QeProcessor {
@Override
public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr) {
LOG.info("ReportExecStatus(): fragment_instance_id={}, query id={}, backend num: {}, ip: {}",
DebugUtil.printId(params.fragment_instance_id), DebugUtil.printId(params.query_id),
params.backend_num, beAddr);
LOG.debug("params: {}", params);
if (params.isSetProfile()) {
LOG.info("ReportExecStatus(): fragment_instance_id={}, query id={}, backend num: {}, ip: {}",
DebugUtil.printId(params.fragment_instance_id), DebugUtil.printId(params.query_id),
params.backend_num, beAddr);
LOG.debug("params: {}", params);
}
final TReportExecStatusResult result = new TReportExecStatusResult();
final QueryInfo info = coordinatorMap.get(params.query_id);
if (info == null) {
result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR));
LOG.info("ReportExecStatus() runtime error");
LOG.info("ReportExecStatus() runtime error, query {} does not exist", params.query_id);
return result;
}
try {

View File

@ -367,6 +367,8 @@ struct TReportExecStatusParams {
13: optional list<string> export_files
14: optional list<Types.TTabletCommitInfo> commitInfos
15: optional i64 loaded_rows
}
struct TFeResult {