From 49dec9f39d1af8e4efb30f08d0c2d871c8d86f72 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 19 Sep 2024 23:58:05 +0800 Subject: [PATCH] [branch-2.1] Picks "[opt](merge-on-write) Reduce the version not continuous logs for merge-on-write table #40946" (#40996) picks https://github.com/apache/doris/pull/40946 --- be/src/common/config.cpp | 4 +++ be/src/common/config.h | 4 +++ .../olap/task/engine_publish_version_task.cpp | 26 ++++++++++++------- .../java/org/apache/doris/common/Config.java | 4 +++ .../org/apache/doris/master/MasterImpl.java | 14 +++++++--- 5 files changed, 39 insertions(+), 13 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 473842a188..df720853c1 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1134,6 +1134,10 @@ DEFINE_mBool(enable_missing_rows_correctness_check, "false"); // When the number of missing versions is more than this value, do not directly // retry the publish and handle it through async publish. DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20"); +// When the version is not continuous for MOW table in publish phase and the gap between +// current txn's publishing version and the max version of the tablet exceeds this value, +// don't print warning log +DEFINE_mInt32(publish_version_gap_logging_threshold, "200"); // The secure path with user files, used in the `local` table function. DEFINE_mString(user_files_secure_path, "${DORIS_HOME}"); diff --git a/be/src/common/config.h b/be/src/common/config.h index a3d4b35dce..4b158f1e45 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1202,6 +1202,10 @@ DECLARE_mBool(enable_missing_rows_correctness_check); // When the number of missing versions is more than this value, do not directly // retry the publish and handle it through async publish. DECLARE_mInt32(mow_publish_max_discontinuous_version_num); +// When the version is not continuous for MOW table in publish phase and the gap between +// current txn's publishing version and the max version of the tablet exceeds this value, +// don't print warning log +DECLARE_mInt32(publish_version_gap_logging_threshold); // The secure path with user files, used in the `local` table function. DECLARE_mString(user_files_secure_path); diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 9601cad88d..66721f5623 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -231,16 +231,22 @@ Status EnginePublishVersionTask::execute() { int64_t missed_txn_id = StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version( tablet->tablet_id(), missed_version); - auto msg = fmt::format( - "uniq key with merge-on-write version not continuous, " - "missed version={}, it's transaction_id={}, current publish " - "version={}, tablet_id={}, transaction_id={}", - missed_version, missed_txn_id, version.second, tablet->tablet_id(), - _publish_version_req.transaction_id); - if (first_time_update) { - LOG(INFO) << msg; - } else { - LOG_EVERY_SECOND(INFO) << msg; + bool need_log = + (config::publish_version_gap_logging_threshold < 0 || + max_version + config::publish_version_gap_logging_threshold >= + version.second); + if (need_log) { + auto msg = fmt::format( + "uniq key with merge-on-write version not continuous, " + "missed version={}, it's transaction_id={}, current publish " + "version={}, tablet_id={}, transaction_id={}", + missed_version, missed_txn_id, version.second, + tablet->tablet_id(), _publish_version_req.transaction_id); + if (first_time_update) { + LOG(INFO) << msg; + } else { + LOG_EVERY_SECOND(INFO) << msg; + } } }; // The versions during the schema change period need to be also continuous diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index b9eefd839e..3bf55f3da4 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -494,6 +494,10 @@ public class Config extends ConfigBase { "print log interval for publish transaction failed interval"}) public static long publish_fail_log_interval_second = 5 * 60; + @ConfField(mutable = true, masterOnly = true, description = {"一个 PUBLISH_VERSION 任务打印失败日志的次数上限", + "the upper limit of failure logs of PUBLISH_VERSION task"}) + public static long publish_version_task_failed_log_threshold = 80; + @ConfField(mutable = true, masterOnly = true, description = {"提交事务的最大超时时间,单位是秒。" + "该参数仅用于事务型 insert 操作中。", "Maximal waiting time for all data inserted before one transaction to be committed, in seconds. " diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 9247953424..4870b3a582 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -29,6 +29,7 @@ import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.common.Config; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.load.DeleteJob; import org.apache.doris.load.loadv2.SparkLoadJob; @@ -86,11 +87,13 @@ public class MasterImpl { // check task status // retry task by report process TStatus taskStatus = request.getTaskStatus(); + TTaskType taskType = request.getTaskType(); + long signature = request.getSignature(); if (LOG.isDebugEnabled()) { LOG.debug("get task report: {}", request); } - if (taskStatus.getStatusCode() != TStatusCode.OK) { + if (taskStatus.getStatusCode() != TStatusCode.OK && taskType != TTaskType.PUBLISH_VERSION) { LOG.warn("finish task reports bad. request: {}", request); } @@ -109,8 +112,6 @@ public class MasterImpl { } long backendId = backend.getId(); - TTaskType taskType = request.getTaskType(); - long signature = request.getSignature(); AgentTask task = AgentTaskQueue.getTask(backendId, taskType, signature); if (task == null) { @@ -128,6 +129,13 @@ public class MasterImpl { } else { if (taskStatus.getStatusCode() != TStatusCode.OK) { task.failed(); + if (taskType == TTaskType.PUBLISH_VERSION) { + boolean needLog = (Config.publish_version_task_failed_log_threshold < 0 + || task.getFailedTimes() <= Config.publish_version_task_failed_log_threshold); + if (needLog) { + LOG.warn("finish task reports bad. request: {}", request); + } + } String errMsg = "task type: " + taskType + ", status_code: " + taskStatus.getStatusCode().toString() + (taskStatus.isSetErrorMsgs() ? (", status_message: " + taskStatus.getErrorMsgs()) : "") + ", backendId: " + backend + ", signature: " + signature;