From 7443e8fcf2a57c480a087162cdf53f858ec52436 Mon Sep 17 00:00:00 2001 From: Sun Chenyang Date: Tue, 2 Jul 2024 15:22:04 +0800 Subject: [PATCH] [cherry-pick](branch-2.1) fix single compaction test p2 #34568 #36881 (#37075) --- be/src/olap/olap_server.cpp | 1 + be/src/olap/single_replica_compaction.cpp | 9 + be/src/olap/tablet.cpp | 2 + be/src/util/doris_metrics.cpp | 6 + be/src/util/doris_metrics.h | 3 + .../doris/service/FrontendServiceImpl.java | 5 + ...test_single_compaction_fault_injection.out | 10 + ...t_single_compaction_fault_injection.groovy | 375 ++++++++++++++++++ .../test_single_replica_compaction.groovy | 72 ++-- 9 files changed, 452 insertions(+), 31 deletions(-) create mode 100644 regression-test/data/compaction/test_single_compaction_fault_injection.out create mode 100644 regression-test/suites/compaction/test_single_compaction_fault_injection.groovy diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 3d4693c2e6..422d87daec 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -791,6 +791,7 @@ Status StorageEngine::_submit_single_replica_compaction_task(TabletSharedPtr tab } auto compaction = std::make_shared(tablet, compaction_type); + DorisMetrics::instance()->single_compaction_request_total->increment(1); auto st = compaction->prepare_compact(); auto clean_single_replica_compaction = [tablet, this]() { diff --git a/be/src/olap/single_replica_compaction.cpp b/be/src/olap/single_replica_compaction.cpp index c520b23f94..3843052c43 100644 --- a/be/src/olap/single_replica_compaction.cpp +++ b/be/src/olap/single_replica_compaction.cpp @@ -103,9 +103,12 @@ Status SingleReplicaCompaction::_do_single_replica_compaction() { } Status SingleReplicaCompaction::_do_single_replica_compaction_impl() { + DBUG_EXECUTE_IF("do_single_compaction_return_ok", { return Status::OK(); }); TReplicaInfo addr; std::string token; // 1. get peer replica info + DBUG_EXECUTE_IF("single_compaction_failed_get_peer", + { return Status::Aborted("tablet don't have peer replica"); }); if (!StorageEngine::instance()->get_peer_replica_info(_tablet->tablet_id(), &addr, &token)) { LOG(WARNING) << _tablet->tablet_id() << " tablet don't have peer replica"; return Status::Aborted("tablet don't have peer replica"); @@ -163,6 +166,8 @@ Status SingleReplicaCompaction::_do_single_replica_compaction_impl() { Status SingleReplicaCompaction::_get_rowset_verisons_from_peer( const TReplicaInfo& addr, std::vector* peer_versions) { + DBUG_EXECUTE_IF("single_compaction_failed_get_peer_versions", + { return Status::Aborted("tablet failed get peer versions"); }); PGetTabletVersionsRequest request; request.set_tablet_id(_tablet->tablet_id()); PGetTabletVersionsResponse response; @@ -351,6 +356,8 @@ Status SingleReplicaCompaction::_make_snapshot(const std::string& ip, int port, if (snapshot_path->at(snapshot_path->length() - 1) != '/') { snapshot_path->append("/"); } + DBUG_EXECUTE_IF("single_compaction_failed_make_snapshot", + { return Status::InternalError("failed snapshot"); }); } else { return Status::InternalError("success snapshot without snapshot path"); } @@ -455,6 +462,8 @@ Status SingleReplicaCompaction::_download_files(DataDir* data_dir, client->set_timeout_ms(estimate_timeout * 1000); RETURN_IF_ERROR(client->download(local_file_path)); + DBUG_EXECUTE_IF("single_compaction_failed_download_file", + { return Status::InternalError("failed to download file"); }); // Check file length uint64_t local_file_size = std::filesystem::file_size(local_file_path); if (local_file_size != file_size) { diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index a9f67b4ad5..5af4e51cab 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2001,9 +2001,11 @@ void Tablet::execute_single_replica_compaction(SingleReplicaCompaction& compacti set_last_failure_time(this, compaction, UnixMillis()); set_last_single_compaction_failure_status(res.to_string()); if (res.is()) { + DorisMetrics::instance()->single_compaction_request_cancelled->increment(1); VLOG_CRITICAL << "Cannel fetching from the remote peer. res=" << res << ", tablet=" << tablet_id(); } else { + DorisMetrics::instance()->single_compaction_request_failed->increment(1); LOG(WARNING) << "failed to do single replica compaction. res=" << res << ", tablet=" << tablet_id(); } diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 907498748a..f0518f4e71 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -76,6 +76,9 @@ DEFINE_ENGINE_COUNTER_METRIC(finish_task_requests_total, finish_task, total); DEFINE_ENGINE_COUNTER_METRIC(finish_task_requests_failed, finish_task, failed); DEFINE_ENGINE_COUNTER_METRIC(base_compaction_request_total, base_compaction, total); DEFINE_ENGINE_COUNTER_METRIC(base_compaction_request_failed, base_compaction, failed); +DEFINE_ENGINE_COUNTER_METRIC(single_compaction_request_total, single_compaction, total); +DEFINE_ENGINE_COUNTER_METRIC(single_compaction_request_failed, single_compaction, failed); +DEFINE_ENGINE_COUNTER_METRIC(single_compaction_request_cancelled, single_compaction, cancelled); DEFINE_ENGINE_COUNTER_METRIC(cumulative_compaction_request_total, cumulative_compaction, total); DEFINE_ENGINE_COUNTER_METRIC(cumulative_compaction_request_failed, cumulative_compaction, failed); DEFINE_ENGINE_COUNTER_METRIC(publish_task_request_total, publish, total); @@ -215,6 +218,9 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, base_compaction_request_failed); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, single_compaction_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, single_compaction_request_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, single_compaction_request_cancelled); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, publish_task_request_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, publish_task_failed_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, alter_inverted_index_requests_total); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 5ddad48727..513ef91723 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -84,6 +84,9 @@ public: IntCounter* base_compaction_request_failed = nullptr; IntCounter* cumulative_compaction_request_total = nullptr; IntCounter* cumulative_compaction_request_failed = nullptr; + IntCounter* single_compaction_request_total = nullptr; + IntCounter* single_compaction_request_failed = nullptr; + IntCounter* single_compaction_request_cancelled = nullptr; IntCounter* base_compaction_deltas_total = nullptr; IntCounter* base_compaction_bytes_total = nullptr; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 7bee62880e..5c92802851 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -62,6 +62,7 @@ import org.apache.doris.common.ThriftServerEventProcessor; import org.apache.doris.common.UserException; import org.apache.doris.common.Version; import org.apache.doris.common.annotation.LogException; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.Util; import org.apache.doris.cooldown.CooldownDelete; import org.apache.doris.datasource.CatalogIf; @@ -2732,6 +2733,10 @@ public class FrontendServiceImpl implements FrontendService.Iface { List tabletIds = request.getTabletIds(); Map> tabletReplicaInfos = Maps.newHashMap(); for (Long tabletId : tabletIds) { + if (DebugPointUtil.isEnable("getTabletReplicaInfos.returnEmpty")) { + LOG.info("enable getTabletReplicaInfos.returnEmpty"); + continue; + } List replicaInfos = Lists.newArrayList(); List replicas = Env.getCurrentEnv().getCurrentInvertedIndex() .getReplicasByTabletId(tabletId); diff --git a/regression-test/data/compaction/test_single_compaction_fault_injection.out b/regression-test/data/compaction/test_single_compaction_fault_injection.out new file mode 100644 index 0000000000..9895dbe9e1 --- /dev/null +++ b/regression-test/data/compaction/test_single_compaction_fault_injection.out @@ -0,0 +1,10 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 b 100 +2 b 100 +3 b 100 +5 a 100 +6 a 100 +7 a 100 +8 a 100 + diff --git a/regression-test/suites/compaction/test_single_compaction_fault_injection.groovy b/regression-test/suites/compaction/test_single_compaction_fault_injection.groovy new file mode 100644 index 0000000000..ebc7425703 --- /dev/null +++ b/regression-test/suites/compaction/test_single_compaction_fault_injection.groovy @@ -0,0 +1,375 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_single_compaction_fault_injection", "p2") { + def tableName = "test_single_compaction" + + def set_be_config = { key, value -> + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + + def triggerCompaction = { be_host, be_http_port, compact_type, tablet_id -> + if (compact_type == "cumulative") { + def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) + assertEquals(code_1, 0) + return out_1 + } else if (compact_type == "full") { + def (code_2, out_2, err_2) = be_run_full_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + ", err=" + err_2) + assertEquals(code_2, 0) + return out_2 + } else { + assertFalse(True) + } + } + + def triggerSingleCompaction = { be_host, be_http_port, tablet_id -> + StringBuilder sb = new StringBuilder(); + sb.append("curl -X POST http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run?tablet_id=") + sb.append(tablet_id) + sb.append("&compact_type=cumulative&remote=true") + + Integer maxRetries = 10; // Maximum number of retries + Integer retryCount = 0; // Current retry count + Integer sleepTime = 5000; // Sleep time in milliseconds + String cmd = sb.toString() + def process + int code_3 + String err_3 + String out_3 + + while (retryCount < maxRetries) { + process = cmd.execute() + code_3 = process.waitFor() + err_3 = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))) + out_3 = process.getText() + + // If the command was successful, break the loop + if (code_3 == 0) { + break + } + + // If the command was not successful, increment the retry count, sleep for a while and try again + retryCount++ + sleep(sleepTime) + } + assertEquals(code_3, 0) + logger.info("Get compaction status: code=" + code_3 + ", out=" + out_3) + return out_3 + } + def waitForCompaction = { be_host, be_http_port, tablet_id -> + boolean running = true + do { + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + def getTabletStatus = { be_host, be_http_port, tablet_id -> + boolean running = true + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/show?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get tablet status: =" + code + ", out=" + out) + assertEquals(code, 0) + def tabletStatus = parseJson(out.trim()) + return tabletStatus + } + + boolean disableAutoCompaction = true + try { + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + for (Object ele in (List) configList) { + assert ele instanceof List + if (((List) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List) ele)[2]) + } + } + set_be_config.call("disable_auto_compaction", "true") + set_be_config.call("update_replica_infos_interval_seconds", "5") + + + // find the master be for single compaction + Boolean found = false + String master_backend_id + List follower_backend_id = new ArrayList<>() + String tablet_id + def tablets + try { + GetDebugPoint().enableDebugPointForAllFEs('getTabletReplicaInfos.returnEmpty') + sql """ DROP TABLE IF EXISTS ${tableName}; """ + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(255) NULL, + `score` int(11) NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( "replication_num" = "2", "enable_single_replica_compaction" = "true", "enable_unique_key_merge_on_write" = "false" ); + """ + + tablets = sql_return_maparray """ show tablets from ${tableName}; """ + // wait for update replica infos + Thread.sleep(20000) + // The test table only has one bucket with 2 replicas, + // and `show tablets` will return 2 different replicas with the same tablet. + // So we can use the same tablet_id to get tablet/trigger compaction with different backends. + tablet_id = tablets[0].TabletId + def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ + logger.info("tablet: " + tablet_info) + for (def tablet in tablets) { + String trigger_backend_id = tablet.BackendId + def tablet_status = getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id) + if (!tablet_status.containsKey("single replica compaction status")) { + if (found) { + found = false + logger.warn("multipe master"); + break; + } + found = true + master_backend_id = trigger_backend_id + } else { + follower_backend_id.add(trigger_backend_id) + } + } + assertFalse(found) + assertFalse(master_backend_id.isEmpty()) + assertTrue(follower_backend_id.isEmpty()) + master_backend_id = "" + } finally { + GetDebugPoint().disableDebugPointForAllFEs('getTabletReplicaInfos.returnEmpty') + // wait for update replica infos + // be.conf: update_replica_infos_interval_seconds + 2s + Thread.sleep(20000) + // The test table only has one bucket with 2 replicas, + // and `show tablets` will return 2 different replicas with the same tablet. + // So we can use the same tablet_id to get tablet/trigger compaction with different backends. + tablet_id = tablets[0].TabletId + def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ + for (def tablet in tablets) { + String trigger_backend_id = tablet.BackendId + def tablet_status = getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); + if (!tablet_status.containsKey("single replica compaction status")) { + if (found) { + logger.warn("multipe master") + assertTrue(false) + } + found = true + master_backend_id = trigger_backend_id + } else { + follower_backend_id.add(trigger_backend_id) + } + } + assertTrue(found) + assertFalse(master_backend_id.isEmpty()) + assertFalse(follower_backend_id.isEmpty()) + } + + + def checkSucceedCompactionResult = { + def master_tablet_status = getTabletStatus(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], tablet_id); + def master_rowsets = master_tablet_status."rowsets" + assert master_rowsets instanceof List + logger.info("rowset size: " + master_rowsets.size()) + + for (String backend: follower_backend_id) { + def tablet_status = getTabletStatus(backendId_to_backendIP[backend], backendId_to_backendHttpPort[backend], tablet_id); + def rowsets = tablet_status."rowsets" + assert rowsets instanceof List + assertEquals(master_rowsets.size(), rowsets.size()) + } + } + + def checkFailedCompactionResult = { + def master_tablet_status = getTabletStatus(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], tablet_id); + def master_rowsets = master_tablet_status."rowsets" + assert master_rowsets instanceof List + logger.info("rowset size: " + master_rowsets.size()) + + for (String backend: follower_backend_id) { + def tablet_status = getTabletStatus(backendId_to_backendIP[backend], backendId_to_backendHttpPort[backend], tablet_id); + def rowsets = tablet_status."rowsets" + assert rowsets instanceof List + assertFalse(master_rowsets.size() == rowsets.size()) + } + } + + // return ok + try { + GetDebugPoint().enableDebugPointForAllBEs("do_single_compaction_return_ok"); + for (String id in follower_backend_id) { + assertTrue(triggerSingleCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id).contains("Success")); + waitForCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id); + } + } finally { + GetDebugPoint().disableDebugPointForAllBEs("do_single_compaction_return_ok"); + } + sql """ INSERT INTO ${tableName} VALUES (1, "a", 100); """ + sql """ INSERT INTO ${tableName} VALUES (1, "b", 100); """ + sql """ INSERT INTO ${tableName} VALUES (2, "a", 100); """ + sql """ INSERT INTO ${tableName} VALUES (2, "b", 100); """ + sql """ INSERT INTO ${tableName} VALUES (3, "a", 100); """ + sql """ INSERT INTO ${tableName} VALUES (3, "b", 100); """ + + // trigger master be to do cumu compaction + assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], + "cumulative", tablet_id).contains("Success")); + waitForCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], tablet_id) + + try { + GetDebugPoint().enableDebugPointForAllBEs("single_compaction_failed_get_peer"); + for (String id in follower_backend_id) { + out = triggerSingleCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id) + assertTrue(out.contains("compaction task is successfully triggered") || out.contains("tablet don't have peer replica")); + } + checkFailedCompactionResult.call() + } finally { + GetDebugPoint().disableDebugPointForAllBEs("single_compaction_failed_get_peer") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("single_compaction_failed_get_peer_versions"); + for (String id in follower_backend_id) { + out = triggerSingleCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id) + assertTrue(out.contains("compaction task is successfully triggered") || out.contains("tablet failed get peer versions")); + } + checkFailedCompactionResult.call() + } finally { + GetDebugPoint().disableDebugPointForAllBEs("single_compaction_failed_get_peer_versions") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("single_compaction_failed_make_snapshot"); + for (String id in follower_backend_id) { + out = triggerSingleCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id) + assertTrue(out.contains("compaction task is successfully triggered") || out.contains("failed snapshot")); + } + checkFailedCompactionResult.call() + } finally { + GetDebugPoint().disableDebugPointForAllBEs("single_compaction_failed_make_snapshot") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("single_compaction_failed_download_file"); + for (String id in follower_backend_id) { + out = triggerSingleCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id) + assertTrue(out.contains("compaction task is successfully triggered") || out.contains("failed to download file")); + } + checkFailedCompactionResult.call() + } finally { + GetDebugPoint().disableDebugPointForAllBEs("single_compaction_failed_download_file") + } + + // trigger follower be to fetch compaction result + for (String id in follower_backend_id) { + assertTrue(triggerSingleCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id).contains("Success")); + waitForCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id) + } + + // check rowsets + checkSucceedCompactionResult.call() + + sql """ INSERT INTO ${tableName} VALUES (4, "a", 100); """ + sql """ INSERT INTO ${tableName} VALUES (5, "a", 100); """ + sql """ INSERT INTO ${tableName} VALUES (6, "a", 100); """ + sql """ DELETE FROM ${tableName} WHERE id = 4; """ + sql """ INSERT INTO ${tableName} VALUES (7, "a", 100); """ + sql """ INSERT INTO ${tableName} VALUES (8, "a", 100); """ + + // trigger master be to do cumu compaction with delete + assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], + "cumulative", tablet_id).contains("Success")); + waitForCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], tablet_id) + + // trigger follower be to fetch compaction result + for (String id in follower_backend_id) { + assertTrue(triggerSingleCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id).contains("Success")); + waitForCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id) + } + + // check rowsets + checkSucceedCompactionResult.call() + + // trigger master be to do base compaction + assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], + "full", tablet_id).contains("Success")); + waitForCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], tablet_id) + + // trigger follower be to fetch compaction result + for (String id in follower_backend_id) { + assertTrue(triggerSingleCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id).contains("Success")); + waitForCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id) + } + + // check rowsets + checkSucceedCompactionResult.call() + + qt_sql """ + select * from ${tableName} order by id + """ + + } finally { + set_be_config.call("disable_auto_compaction", disableAutoCompaction.toString()) + } +} diff --git a/regression-test/suites/compaction/test_single_replica_compaction.groovy b/regression-test/suites/compaction/test_single_replica_compaction.groovy index c9a9f65ee3..0b83ddd51e 100644 --- a/regression-test/suites/compaction/test_single_replica_compaction.groovy +++ b/regression-test/suites/compaction/test_single_replica_compaction.groovy @@ -61,25 +61,20 @@ suite("test_single_replica_compaction", "p2") { has_update_be_config = true def triggerCompaction = { be_host, be_http_port, compact_type, tablet_id -> - StringBuilder sb = new StringBuilder(); - sb.append("curl -X POST http://${be_host}:${be_http_port}") - sb.append("/api/compaction/run?tablet_id=") - sb.append(tablet_id) - sb.append("&compact_type=${compact_type}") - - String command = sb.toString() - logger.info(command) - process = command.execute() - code = process.waitFor() - err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); - out = process.getText() - logger.info("Run compaction: code=" + code + ", out=" + out + ", disableAutoCompaction " + disableAutoCompaction + ", err=" + err) - if (!disableAutoCompaction) { - return "Success, " + out + if (compact_type == "cumulative") { + def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) + assertEquals(code_1, 0) + return out_1 + } else if (compact_type == "full") { + def (code_2, out_2, err_2) = be_run_full_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + ", err=" + err_2) + assertEquals(code_2, 0) + return out_2 + } else { + assertFalse(True) } - assertEquals(code, 0) - return out - } + } def triggerSingleCompaction = { be_host, be_http_port, tablet_id -> StringBuilder sb = new StringBuilder(); @@ -88,18 +83,33 @@ suite("test_single_replica_compaction", "p2") { sb.append(tablet_id) sb.append("&compact_type=cumulative&remote=true") - String command = sb.toString() - logger.info(command) - process = command.execute() - code = process.waitFor() - err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); - out = process.getText() - logger.info("Run compaction: code=" + code + ", out=" + out + ", disableAutoCompaction " + disableAutoCompaction + ", err=" + err) - if (!disableAutoCompaction) { - return "Success, " + out + Integer maxRetries = 10; // Maximum number of retries + Integer retryCount = 0; // Current retry count + Integer sleepTime = 5000; // Sleep time in milliseconds + String cmd = sb.toString() + def process + int code_3 + String err_3 + String out_3 + + while (retryCount < maxRetries) { + process = cmd.execute() + code_3 = process.waitFor() + err_3 = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))) + out_3 = process.getText() + + // If the command was successful, break the loop + if (code_3 == 0) { + break + } + + // If the command was not successful, increment the retry count, sleep for a while and try again + retryCount++ + sleep(sleepTime) } - assertEquals(code, 0) - return out + assertEquals(code_3, 0) + logger.info("Get compaction status: code=" + code_3 + ", out=" + out_3) + return out_3 } def waitForCompaction = { be_host, be_http_port, tablet_id -> boolean running = true @@ -258,9 +268,9 @@ suite("test_single_replica_compaction", "p2") { // check rowsets checkCompactionResult.call() - // trigger master be to do base compaction + // trigger master be to do full compaction assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], - "base", tablet_id).contains("Success")); + "full", tablet_id).contains("Success")); waitForCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], tablet_id) // trigger follower be to fetch compaction result