From 5a6e5cfd074cc9fda8848ea0249423a6e3098bbc Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 18 Dec 2018 12:44:36 +0800 Subject: [PATCH] Add log to detect empty load file (#445) We find that a load file may not be generated for rollup tablet, add a log to observe. --- be/src/agent/pusher.cpp | 1 + be/src/agent/task_worker_pool.cpp | 55 +++++++++++-------- .../org/apache/doris/load/LoadChecker.java | 5 ++ .../org/apache/doris/task/LoadEtlTask.java | 27 ++++----- 4 files changed, 50 insertions(+), 38 deletions(-) diff --git a/be/src/agent/pusher.cpp b/be/src/agent/pusher.cpp index cbf2f70e63..eab1f1b51a 100644 --- a/be/src/agent/pusher.cpp +++ b/be/src/agent/pusher.cpp @@ -117,6 +117,7 @@ void Pusher::_get_file_name_from_path(const string& file_path, string* file_name AgentStatus Pusher::process(vector* tablet_infos) { AgentStatus status = DORIS_SUCCESS; + // Remote file not empty, need to download if (_push_req.__isset.http_file_path) { // Get file length and timeout diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 6d93337827..751f1a3b69 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -750,36 +750,45 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { LOG(INFO) << "get push task. signature: " << agent_task_req.signature << " user: " << user << " priority: " << priority; + vector tablet_infos; if (push_req.push_type == TPushType::LOAD || push_req.push_type == TPushType::LOAD_DELETE) { -#ifndef BE_TEST - Pusher pusher(worker_pool_this->_env->olap_engine(), push_req); - status = pusher.init(); -#else - status = worker_pool_this->_pusher->init(); -#endif + if (!push_req.__isset.http_file_path) { + LOG(WARNING) << "push request does not set load file for tablet: " + << agent_task_req.signature; + status = DORIS_FILE_DOWNLOAD_NOT_EXIST; + } if (status == DORIS_SUCCESS) { - uint32_t retry_time = 0; - while (retry_time < PUSH_MAX_RETRY) { #ifndef BE_TEST - status = pusher.process(&tablet_infos); + Pusher pusher(worker_pool_this->_env->olap_engine(), push_req); + status = pusher.init(); #else - status = worker_pool_this->_pusher->process(&tablet_infos); + status = worker_pool_this->_pusher->init(); #endif - if (status == DORIS_PUSH_HAD_LOADED) { - OLAP_LOG_WARNING("transaction exists when realtime push, " - "but unfinished, do not report to fe, signature: %ld", - agent_task_req.signature); - break; // not retry any more - } - // Internal error, need retry - if (status == DORIS_ERROR) { - OLAP_LOG_WARNING("push internal error, need retry.signature: %ld", - agent_task_req.signature); - retry_time += 1; - } else { - break; + + if (status == DORIS_SUCCESS) { + uint32_t retry_time = 0; + while (retry_time < PUSH_MAX_RETRY) { +#ifndef BE_TEST + status = pusher.process(&tablet_infos); +#else + status = worker_pool_this->_pusher->process(&tablet_infos); +#endif + if (status == DORIS_PUSH_HAD_LOADED) { + OLAP_LOG_WARNING("transaction exists when realtime push, " + "but unfinished, do not report to fe, signature: %ld", + agent_task_req.signature); + break; // not retry any more + } + // Internal error, need retry + if (status == DORIS_ERROR) { + OLAP_LOG_WARNING("push internal error, need retry.signature: %ld", + agent_task_req.signature); + retry_time += 1; + } else { + break; + } } } } diff --git a/fe/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/src/main/java/org/apache/doris/load/LoadChecker.java index 91a8ba348a..97d5b8c191 100644 --- a/fe/src/main/java/org/apache/doris/load/LoadChecker.java +++ b/fe/src/main/java/org/apache/doris/load/LoadChecker.java @@ -448,6 +448,11 @@ public class LoadChecker extends Daemon { type = TPushType.LOAD_DELETE; } + if (type == TPushType.LOAD && (filePath == null || fileSize < 0)) { + LOG.warn("get empty load file for tablet {}", tabletId); + continue; + } + // add task to batchTask Set allReplicas = new HashSet(); Set finishedReplicas = new HashSet(); diff --git a/fe/src/main/java/org/apache/doris/task/LoadEtlTask.java b/fe/src/main/java/org/apache/doris/task/LoadEtlTask.java index 350a388db9..236775cde2 100644 --- a/fe/src/main/java/org/apache/doris/task/LoadEtlTask.java +++ b/fe/src/main/java/org/apache/doris/task/LoadEtlTask.java @@ -242,19 +242,15 @@ public abstract class LoadEtlTask extends MasterTask { db.readLock(); try { table = (OlapTable) db.getTable(tableId); - } finally { - db.readUnlock(); - } - if (table == null) { - throw new LoadException("table does not exist. id: " + tableId); - } - - TableLoadInfo tableLoadInfo = tableEntry.getValue(); - for (Entry partitionEntry : tableLoadInfo.getIdToPartitionLoadInfo().entrySet()) { - long partitionId = partitionEntry.getKey(); - boolean needLoad = false; - db.readLock(); - try { + if (table == null) { + throw new LoadException("table does not exist. id: " + tableId); + } + + TableLoadInfo tableLoadInfo = tableEntry.getValue(); + for (Entry partitionEntry : tableLoadInfo.getIdToPartitionLoadInfo().entrySet()) { + long partitionId = partitionEntry.getKey(); + boolean needLoad = false; + Partition partition = table.getPartition(partitionId); if (partition == null) { throw new LoadException("partition does not exist. id: " + partitionId); @@ -292,9 +288,10 @@ public abstract class LoadEtlTask extends MasterTask { // partition might have no load data partitionEntry.getValue().setNeedLoad(needLoad); - } finally { - db.readUnlock(); + } + } finally { + db.readUnlock(); } }