diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp index d0de707bd6..aeb28a2ef8 100644 --- a/be/src/exec/decompressor.cpp +++ b/be/src/exec/decompressor.cpp @@ -115,9 +115,9 @@ Status GzipDecompressor::decompress( *input_bytes_read = input_len - _z_strm.avail_in; *decompressed_len = output_max_len - _z_strm.avail_out; - LOG(INFO) << "gzip dec ret: " << ret - << " input_bytes_read: " << *input_bytes_read - << " decompressed_len: " << *decompressed_len; + VLOG(10) << "gzip dec ret: " << ret + << " input_bytes_read: " << *input_bytes_read + << " decompressed_len: " << *decompressed_len; if (ret == Z_BUF_ERROR) { // Z_BUF_ERROR indicates that inflate() could not consume more input or diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 9b4d35cf77..9fa0a307cc 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -548,6 +548,9 @@ Status OlapTableSink::open(RuntimeState* state) { Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) { SCOPED_TIMER(_profile->total_time_counter()); _number_input_rows += input_batch->num_rows(); + // update incrementally so that FE can get the progress. + // the real 'num_rows_load_total' will be set when sink being closed. + state->update_num_rows_load_total(input_batch->num_rows()); RowBatch* batch = input_batch; if (!_output_expr_ctxs.empty()) { SCOPED_RAW_TIMER(&_convert_batch_ns); @@ -620,7 +623,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns); // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + state->num_rows_load_unselected(); - state->update_num_rows_load_total(num_rows_load_total); + state->set_num_rows_load_total(num_rows_load_total); state->update_num_rows_load_filtered(_number_filtered_rows); // print log of add batch time of all node, for tracing load performance easily diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 08e9c8bc19..e0ff0ba55a 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -263,48 +263,57 @@ void FragmentExecState::coordinator_callback( params.__set_fragment_instance_id(_fragment_instance_id); exec_status.set_t_status(¶ms); params.__set_done(done); - profile->to_thrift(¶ms.profile); - params.__isset.profile = true; RuntimeState* runtime_state = _executor.runtime_state(); - if (!runtime_state->output_files().empty()) { - params.__isset.delta_urls = true; - for (auto& it : runtime_state->output_files()) { - params.delta_urls.push_back(to_http_path(it)); - } - } - if (runtime_state->num_rows_load_total() > 0 || - runtime_state->num_rows_load_filtered() > 0) { - params.__isset.load_counters = true; - // TODO(zc) - static std::string s_dpp_normal_all = "dpp.norm.ALL"; - static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL"; - - params.load_counters.emplace( - s_dpp_normal_all, std::to_string(runtime_state->num_rows_load_success())); - params.load_counters.emplace( - s_dpp_abnormal_all, std::to_string(runtime_state->num_rows_load_filtered())); - } - if (!runtime_state->get_error_log_file_path().empty()) { - params.__set_tracking_url( - to_load_error_http_path(runtime_state->get_error_log_file_path())); - } - if (!runtime_state->export_output_files().empty()) { - params.__isset.export_files = true; - params.export_files = runtime_state->export_output_files(); - } - if (!runtime_state->tablet_commit_infos().empty()) { - params.__isset.commitInfos = true; - params.commitInfos.reserve(runtime_state->tablet_commit_infos().size()); - for (auto& info : runtime_state->tablet_commit_infos()) { - params.commitInfos.push_back(info); - } - } DCHECK(runtime_state != NULL); - - // Send new errors to coordinator - runtime_state->get_unreported_errors(&(params.error_log)); - params.__isset.error_log = (params.error_log.size() > 0); + if (runtime_state->query_options().query_type == TQueryType::LOAD && !done && status.ok()) { + // this is a load plan, and load is not finished, just make a brief report + params.__set_loaded_rows(runtime_state->num_rows_load_total()); + } else { + if (runtime_state->query_options().query_type == TQueryType::LOAD) { + params.__set_loaded_rows(runtime_state->num_rows_load_total()); + } + profile->to_thrift(¶ms.profile); + params.__isset.profile = true; + + if (!runtime_state->output_files().empty()) { + params.__isset.delta_urls = true; + for (auto& it : runtime_state->output_files()) { + params.delta_urls.push_back(to_http_path(it)); + } + } + if (runtime_state->num_rows_load_total() > 0 || + runtime_state->num_rows_load_filtered() > 0) { + params.__isset.load_counters = true; + // TODO(zc) + static std::string s_dpp_normal_all = "dpp.norm.ALL"; + static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL"; + + params.load_counters.emplace( + s_dpp_normal_all, std::to_string(runtime_state->num_rows_load_success())); + params.load_counters.emplace( + s_dpp_abnormal_all, std::to_string(runtime_state->num_rows_load_filtered())); + } + if (!runtime_state->get_error_log_file_path().empty()) { + params.__set_tracking_url( + to_load_error_http_path(runtime_state->get_error_log_file_path())); + } + if (!runtime_state->export_output_files().empty()) { + params.__isset.export_files = true; + params.export_files = runtime_state->export_output_files(); + } + if (!runtime_state->tablet_commit_infos().empty()) { + params.__isset.commitInfos = true; + params.commitInfos.reserve(runtime_state->tablet_commit_infos().size()); + for (auto& info : runtime_state->tablet_commit_infos()) { + params.commitInfos.push_back(info); + } + } + + // Send new errors to coordinator + runtime_state->get_unreported_errors(&(params.error_log)); + params.__isset.error_log = (params.error_log.size() > 0); + } TReportExecStatusResult res; Status rpc_status; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index b9663f9194..a9f805a2a1 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -444,14 +444,6 @@ void PlanFragmentExecutor::send_report(bool done) { return; } - // If this is a load plan, and it is not finished, and it still running ok, - // no need to report it. - // This is case for the case that the load plan's _is_report_success is always true, - // but we only need the last report when plan is done. - if (_runtime_state->query_options().query_type == TQueryType::LOAD && !done && status.ok()) { - return; - } - // This will send a report even if we are cancelled. If the query completed correctly // but fragments still need to be cancelled (e.g. limit reached), the coordinator will // be waiting for a final report and profile. diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index ac5e2650d1..006996aceb 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -432,6 +432,10 @@ public: _num_rows_load_total.fetch_add(num_rows); } + void set_num_rows_load_total(int64_t num_rows) { + _num_rows_load_total.store(num_rows); + } + void update_num_rows_load_filtered(int64_t num_rows) { _num_rows_load_filtered.fetch_add(num_rows); } diff --git a/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md index d33f640c10..194357fd24 100644 --- a/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md @@ -255,6 +255,7 @@ mysql> show load order by createtime desc limit 1\G LoadStartTime: 2019-07-27 11:46:44 LoadFinishTime: 2019-07-27 11:50:16 URL: http://192.168.1.1:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e2725415317 + LoadedRows: 82393000 ``` 下面主要介绍了查看导入命令返回结果集中参数意义: @@ -288,6 +289,7 @@ LoadFinishTime: 2019-07-27 11:50:16 + Type 导入任务的类型。Broker load 的 type 取值只有 BROKER。 + + EtlInfo 主要显示了导入的数据量指标 ```dpp.norm.ALL 和 dpp.abnorm.ALL```。用户可以根据这两个指标验证当前导入任务的错误率是否超过 max\_filter\_ratio。 @@ -328,6 +330,14 @@ LoadFinishTime: 2019-07-27 11:50:16 导入任务的错误数据样例,访问 URL 地址既可获取本次导入的错误数据样例。当本次导入不存在错误数据时,URL 字段则为 N/A。 ++ JobDetails + + 显示一些作业的详细运行状态。包括导入文件的个数、总大小(字节)、子任务个数、已处理的行数等。 + + ```{"LoadedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}``` + + 其中已处理的行数,每 5 秒更新一次。该行数仅用于展示当前的进度,不代表最终实际的处理行数。实际处理行数以 EtlInfo 中显示的为准。 + ### 取消导入 当 Broker load 作业状态不为 CANCELLED 或 FINISHED 时,可以被用户手动取消。取消时需要指定待取消导入任务的 Label 。取消导入命令语法可执行 ```HELP CANCEL LOAD```查看。 diff --git a/fe/src/main/java/org/apache/doris/common/proc/LoadProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/LoadProcDir.java index 4ba8a00bbe..f08e2d59c6 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/LoadProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/LoadProcDir.java @@ -35,7 +35,7 @@ public class LoadProcDir implements ProcDirInterface { .add("JobId").add("Label").add("State").add("Progress") .add("Type").add("EtlInfo").add("TaskInfo").add("ErrorMsg").add("CreateTime") .add("EtlStartTime").add("EtlFinishTime").add("LoadStartTime").add("LoadFinishTime") - .add("URL") + .add("URL").add("JobDetails") .build(); // label and state column index of result diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index f69759b933..acee0b8b7f 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -63,6 +63,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; /** * There are 3 steps in BrokerLoadJob: BrokerPendingTask, LoadLoadingTask, CommitAndPublishTxn. @@ -237,8 +238,14 @@ public class BrokerLoadJob extends LoadJob { } else { // retry task idToTasks.remove(loadTask.getSignature()); + if (loadTask instanceof LoadLoadingTask) { + loadStatistic.numLoadedRowsMap.remove(((LoadLoadingTask) loadTask).getLoadId()); + } loadTask.updateRetryInfo(); idToTasks.put(loadTask.getSignature(), loadTask); + if (loadTask instanceof LoadLoadingTask) { + loadStatistic.numLoadedRowsMap.put(((LoadLoadingTask) loadTask).getLoadId(), new AtomicLong(0)); + } Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask); return; } @@ -356,6 +363,7 @@ public class BrokerLoadJob extends LoadJob { attachment.getFileNumByTable(tableId)); // Add tasks into list and pool idToTasks.put(task.getSignature(), task); + loadStatistic.numLoadedRowsMap.put(loadId, new AtomicLong(0)); Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task); } } finally { diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index 4aba2d2a67..66bc62c182 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -90,6 +90,7 @@ public class BrokerLoadPendingTask extends LoadTask { groupNum++; } + ((BrokerLoadJob) callback).setLoadFileInfo(totalFileNum, totalFileSize); ((BrokerPendingTaskAttachment) attachment).addFileStatus(tableId, fileStatusList); LOG.info("get {} files to be loaded. total size: {}. cost: {} ms, job: {}", totalFileNum, totalFileSize, (System.currentTimeMillis() - start), callback.getCallbackId()); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 4b995f9412..bc3c66e0f5 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -58,6 +58,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.gson.Gson; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -68,6 +69,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; public abstract class LoadJob extends AbstractTxnStateChangeCallback implements LoadTaskCallback, Writable { @@ -123,6 +125,32 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements // this request id is only used for checking if a load begin request is a duplicate request. protected TUniqueId requestId; + protected LoadStatistic loadStatistic = new LoadStatistic(); + + public static class LoadStatistic { + // number of rows processed on BE, this number will be updated periodically by query report. + // A load job may has several load tasks, so the map key is load task's plan load id. + public Map numLoadedRowsMap = Maps.newConcurrentMap(); + // number of file to be loaded + public int fileNum = 0; + public long totalFileSizeB = 0; + + public String toJson() { + long total = 0; + for (AtomicLong atomicLong : numLoadedRowsMap.values()) { + total += atomicLong.get(); + } + + Map details = Maps.newHashMap(); + details.put("LoadedRows", total); + details.put("FileNumber", fileNum); + details.put("FileSize", totalFileSizeB); + details.put("TaskNumber", numLoadedRowsMap.size()); + Gson gson = new Gson(); + return gson.toJson(details); + } + } + // only for log replay public LoadJob() { } @@ -194,6 +222,18 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements return transactionId; } + public void updateLoadedRows(TUniqueId loadId, long loadedRows) { + AtomicLong atomicLong = loadStatistic.numLoadedRowsMap.get(loadId); + if (atomicLong != null) { + atomicLong.set(loadedRows); + } + } + + public void setLoadFileInfo(int fileNum, long fileSize) { + this.loadStatistic.fileNum = fileNum; + this.loadStatistic.totalFileSizeB = fileSize; + } + public TUniqueId getRequestId() { return requestId; } @@ -474,6 +514,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements } } idToTasks.clear(); + loadStatistic.numLoadedRowsMap.clear(); // set failMsg and state this.failMsg = failMsg; @@ -607,6 +648,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements jobInfo.add(TimeUtils.longToTimeString(finishTimestamp)); // tracking url jobInfo.add(loadingStatus.getTrackingUrl()); + jobInfo.add(loadStatistic.toJson()); return jobInfo; } finally { readUnlock(); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 2cd3381998..36f6d929e3 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -545,16 +545,6 @@ public class LoadManager implements Writable{ lock.writeLock().unlock(); } - @Override - public void write(DataOutput out) throws IOException { - List loadJobs = idToLoadJob.values().stream().filter(this::needSave).collect(Collectors.toList()); - - out.writeInt(loadJobs.size()); - for (LoadJob loadJob : loadJobs) { - loadJob.write(out); - } - } - // If load job will be removed by cleaner later, it will not be saved in image. private boolean needSave(LoadJob loadJob) { if (!loadJob.isCompleted()) { @@ -569,6 +559,23 @@ public class LoadManager implements Writable{ return false; } + public void updateJobLoadedRows(Long jobId, TUniqueId loadId, long loadedRows) { + LoadJob job = idToLoadJob.get(jobId); + if (job != null) { + job.updateLoadedRows(loadId, loadedRows); + } + } + + @Override + public void write(DataOutput out) throws IOException { + List loadJobs = idToLoadJob.values().stream().filter(this::needSave).collect(Collectors.toList()); + + out.writeInt(loadJobs.size()); + for (LoadJob loadJob : loadJobs) { + loadJob.write(out); + } + } + @Override public void readFields(DataInput in) throws IOException { int size = in.readInt(); diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 9d072e1180..59b8235511 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -45,8 +45,8 @@ import org.apache.doris.planner.Planner; import org.apache.doris.planner.ResultSink; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.UnionNode; -import org.apache.doris.proto.PPlanFragmentCancelReason; import org.apache.doris.proto.PExecPlanFragmentResult; +import org.apache.doris.proto.PPlanFragmentCancelReason; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.service.FrontendOptions; @@ -210,7 +210,7 @@ public class Coordinator { nextInstanceId.setLo(queryId.lo + 1); } - // Used for pull load task coordinator + // Used for broker load task/export task coordinator public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List fragments, List scanNodes, String cluster) { this.isBlockQuery = true; @@ -1105,7 +1105,9 @@ public class Coordinator { // duplicate packet return; } - execState.profile.update(params.profile); + if (params.isSetProfile()) { + execState.profile.update(params.profile); + } done = params.done; execState.done = params.done; } finally { @@ -1148,6 +1150,10 @@ public class Coordinator { updateCommitInfos(params.getCommitInfos()); } profileDoneSignal.markedCountDown(params.getFragment_instance_id(), -1L); + } + + if (params.isSetLoaded_rows()) { + Catalog.getCurrentCatalog().getLoadManager().updateJobLoadedRows(jobId, params.query_id, params.loaded_rows); } return; diff --git a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 256bac305a..90da2077d0 100644 --- a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -105,15 +105,17 @@ public final class QeProcessorImpl implements QeProcessor { @Override public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr) { - LOG.info("ReportExecStatus(): fragment_instance_id={}, query id={}, backend num: {}, ip: {}", - DebugUtil.printId(params.fragment_instance_id), DebugUtil.printId(params.query_id), - params.backend_num, beAddr); - LOG.debug("params: {}", params); + if (params.isSetProfile()) { + LOG.info("ReportExecStatus(): fragment_instance_id={}, query id={}, backend num: {}, ip: {}", + DebugUtil.printId(params.fragment_instance_id), DebugUtil.printId(params.query_id), + params.backend_num, beAddr); + LOG.debug("params: {}", params); + } final TReportExecStatusResult result = new TReportExecStatusResult(); final QueryInfo info = coordinatorMap.get(params.query_id); if (info == null) { result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR)); - LOG.info("ReportExecStatus() runtime error"); + LOG.info("ReportExecStatus() runtime error, query {} does not exist", params.query_id); return result; } try { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index cc90080ddd..95fdc88c73 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -367,6 +367,8 @@ struct TReportExecStatusParams { 13: optional list export_files 14: optional list commitInfos + + 15: optional i64 loaded_rows } struct TFeResult {