From e8690b62ee85c2cb5cdf15bf02dd439e91088bff Mon Sep 17 00:00:00 2001 From: meiyi Date: Thu, 1 Aug 2024 16:59:54 +0800 Subject: [PATCH] [fix](group commit) Pick add debug log show why group commit not work; delete wal when replay success (#38611) (#38659) Pick https://github.com/apache/doris/pull/38611 --- be/src/olap/wal/wal_table.cpp | 15 ++--- be/src/runtime/group_commit_mgr.cpp | 2 + .../doris/analysis/NativeInsertStmt.java | 63 +++++++++++++++---- ...ommit_async_wal_msg_fault_injection.groovy | 41 +++++++++++- .../insert_p0/insert_group_commit_into.groovy | 8 +++ 5 files changed, 108 insertions(+), 21 deletions(-) diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index ec0c412379..38c262e988 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -86,7 +86,6 @@ void WalTable::_pick_relay_wals() { Status WalTable::_relay_wal_one_by_one() { std::vector> need_retry_wals; - std::vector> need_delete_wals; for (auto wal_info : _replaying_queue) { wal_info->add_retry_num(); auto st = _replay_wal_internal(wal_info->get_wal_path()); @@ -96,7 +95,12 @@ Status WalTable::_relay_wal_one_by_one() { msg.find("LabelAlreadyUsedException") != msg.npos) { LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path() << ", st=" << st.to_string(); - need_delete_wals.push_back(wal_info); + // delete wal + WARN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(_table_id, wal_info->get_wal_id()), + "failed to delete wal=" + wal_info->get_wal_path()); + if (config::group_commit_wait_replay_wal_finish) { + RETURN_IF_ERROR(_exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id())); + } } else { doris::wal_fail << 1; LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path() @@ -111,13 +115,6 @@ Status WalTable::_relay_wal_one_by_one() { _replay_wal_map.emplace(retry_wal_info->get_wal_path(), retry_wal_info); } } - for (auto delete_wal_info : need_delete_wals) { - [[maybe_unused]] auto st = - _exec_env->wal_mgr()->delete_wal(_table_id, delete_wal_info->get_wal_id()); - if (config::group_commit_wait_replay_wal_finish) { - RETURN_IF_ERROR(_exec_env->wal_mgr()->notify_relay_wal(delete_wal_info->get_wal_id())); - } - } return Status::OK(); } diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 7bb30b1cc8..b6b4c5d646 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -398,6 +398,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", { status = Status::InternalError(""); }); if (status.ok()) { + DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_error", + { status = Status::InternalError(""); }); // commit txn TLoadTxnCommitRequest request; request.__set_auth_code(0); // this is a fake, fe not check it now diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index b26693134b..ac666020a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -83,6 +83,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.BooleanSupplier; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -1200,28 +1202,53 @@ public class NativeInsertStmt extends InsertStmt { LOG.warn("analyze group commit failed", e); return; } - boolean partialUpdate = ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(); - if (!isExplain() && !partialUpdate && ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit() - && ConnectContext.get().getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES - && targetTable instanceof OlapTable - && ((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange() - && !targetTable.getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME) - && !ConnectContext.get().isTxnModel() - && getQueryStmt() instanceof SelectStmt - && ((SelectStmt) getQueryStmt()).getTableRefs().isEmpty() && targetPartitionNames == null - && (label == null || Strings.isNullOrEmpty(label.getLabelName())) - && (analyzer == null || analyzer != null && !analyzer.isReAnalyze())) { + ConnectContext ctx = ConnectContext.get(); + List>> conditions = new ArrayList<>(); + conditions.add(Pair.of(() -> ctx.getSessionVariable().isEnableInsertGroupCommit(), + () -> "group_commit session variable: " + ctx.getSessionVariable().groupCommit)); + conditions.add(Pair.of(() -> !isExplain(), () -> "isExplain")); + conditions.add(Pair.of(() -> !ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate(), + () -> "enableUniqueKeyPartialUpdate")); + conditions.add(Pair.of(() -> !ctx.isTxnModel(), () -> "isTxnModel")); + conditions.add(Pair.of(() -> targetTable instanceof OlapTable, + () -> "not olapTable, class: " + targetTable.getClass().getName())); + conditions.add(Pair.of(() -> ((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange(), + () -> "notUseSchemaLightChange")); + conditions.add(Pair.of(() -> !targetTable.getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME), + () -> "db is internal")); + conditions.add( + Pair.of(() -> targetPartitionNames == null, () -> "targetPartitionNames: " + targetPartitionNames)); + conditions.add(Pair.of(() -> ctx.getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES, + () -> "sqlMode: " + ctx.getSessionVariable().getSqlMode())); + conditions.add(Pair.of(() -> queryStmt instanceof SelectStmt, + () -> "queryStmt is not SelectStmt, class: " + queryStmt.getClass().getName())); + conditions.add(Pair.of(() -> ((SelectStmt) queryStmt).getTableRefs().isEmpty(), + () -> "tableRefs is not empty: " + ((SelectStmt) queryStmt).getTableRefs())); + conditions.add( + Pair.of(() -> (label == null || Strings.isNullOrEmpty(label.getLabelName())), () -> "label: " + label)); + conditions.add( + Pair.of(() -> (analyzer == null || analyzer != null && !analyzer.isReAnalyze()), () -> "analyzer")); + boolean match = conditions.stream().allMatch(p -> p.first.getAsBoolean()); + if (match) { SelectStmt selectStmt = (SelectStmt) queryStmt; if (selectStmt.getValueList() != null) { for (List row : selectStmt.getValueList().getRows()) { for (Expr expr : row) { if (!(expr instanceof LiteralExpr)) { + if (LOG.isDebugEnabled()) { + LOG.debug("group commit is off for table: {}, because not literal expr, " + + "expr: {}, row: {}", targetTable.getName(), expr, row); + } return; } } } // Does not support: insert into tbl values(); if (selectStmt.getValueList().getFirstRow().isEmpty() && CollectionUtils.isEmpty(targetColumnNames)) { + if (LOG.isDebugEnabled()) { + LOG.debug("group commit is off for table: {}, because first row: {}, target columns: {}", + targetTable.getName(), selectStmt.getValueList().getFirstRow(), targetColumnNames); + } return; } } else { @@ -1231,6 +1258,10 @@ public class NativeInsertStmt extends InsertStmt { if (items != null) { for (SelectListItem item : items) { if (item.getExpr() != null && !(item.getExpr() instanceof LiteralExpr)) { + if (LOG.isDebugEnabled()) { + LOG.debug("group commit is off for table: {}, because not literal expr, " + + "expr: {}, row: {}", targetTable.getName(), item.getExpr(), item); + } return; } } @@ -1238,6 +1269,16 @@ public class NativeInsertStmt extends InsertStmt { } } isGroupCommit = true; + } else { + if (LOG.isDebugEnabled()) { + for (Pair> pair : conditions) { + if (pair.first.getAsBoolean() == false) { + LOG.debug("group commit is off for table: {}, because: {}", targetTable.getName(), + pair.second.get()); + break; + } + } + } } } diff --git a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy index c2523c4909..bb06430513 100644 --- a/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_group_commit_async_wal_msg_fault_injection.groovy @@ -15,11 +15,24 @@ // specific language governing permissions and limitations // under the License. +import org.awaitility.Awaitility +import static java.util.concurrent.TimeUnit.SECONDS + suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") { def tableName = "wal_test" - + + def getRowCount = { expectedRowCount -> + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until( + { + def result = sql "select count(*) from ${tableName}" + logger.info("table: ${tableName}, rowCount: ${result}") + return result[0][0] == expectedRowCount + } + ) + } + // test successful group commit async load sql """ DROP TABLE IF EXISTS ${tableName} """ @@ -132,4 +145,30 @@ suite("test_group_commit_async_wal_msg_fault_injection","nonConcurrent") { assertTrue(exception) } + // test replay wal should success + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DISTRIBUTED BY HASH(`k`) + BUCKETS 5 + properties("replication_num" = "1", "group_commit_interval_ms" = "4000") + """ + GetDebugPoint().clearDebugPointsForAllBEs() + try { + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.commit_error") + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'group_commit', 'async_mode' + unset 'label' + file 'group_commit_wal_msg.csv' + time 10000 + } + getRowCount(5) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.commit_error") + } } \ No newline at end of file diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy b/regression-test/suites/insert_p0/insert_group_commit_into.groovy index 371e392fcc..b06e7f4c89 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy @@ -341,6 +341,14 @@ suite("insert_group_commit_into") { getRowCount(1) qt_sql """ select * from ${table}; """ + + sql " set enable_unique_key_partial_update=true " + none_group_commit_insert """ + INSERT INTO ${table} (`data_binary`, `end_time`, `endpoint_id`, `endpoint_name`, `is_error`, `latency`, `segment_id`, `service_id`, `service_instance_id`, `start_time`, `statement`, `tags`, `teamID`, `time_bucket`, `trace_id`) + VALUES + ('', + 1697032066304, '36b2d9ff-4c25-49f3-a726-eea812564411', '355f96cd-b1b1-4688-a5f6-a8e3f3a55c9a', false, 3, '3229b7cd-f3a2-4359-aa24-946388c9cc54', 'service_46da0dab-e27d-4820-aea2-9bfc15741615', 'service_instanceac89a4b7-81f7-43e8-85ed-d2b578d98050', 1697032066304, 'statement: b9903670-3821-4f4c-a587-bbcf02c04b77', ['[tagKey_5=tagValue_5, tagKey_3=tagValue_3, tagKey_1=tagValue_1, tagKey_16=tagValue_16, tagKey_8=tagValue_8, tagKey_15=tagValue_15, tagKey_6=tagValue_6, tagKey_11=tagValue_11, tagKey_10=tagValue_10, tagKey_4=tagValue_4, tagKey_13=tagValue_13, tagKey_14=tagValue_14, tagKey_2=tagValue_2, tagKey_17=tagValue_17, tagKey_19=tagValue_19, tagKey_0=tagValue_0, tagKey_18=tagValue_18, tagKey_9=tagValue_9, tagKey_7=tagValue_7, tagKey_12=tagValue_12]'], '0', 0, '0'); + """, 1 } } finally { // try_sql("DROP TABLE ${table}")