diff --git a/be/src/http/action/debug_point_action.cpp b/be/src/http/action/debug_point_action.cpp index 08b1e116b2..04aa38efaa 100644 --- a/be/src/http/action/debug_point_action.cpp +++ b/be/src/http/action/debug_point_action.cpp @@ -21,6 +21,7 @@ #include "http/http_channel.h" #include "http/http_status.h" #include "util/debug_points.h" +#include "util/time.h" namespace doris { @@ -43,17 +44,16 @@ void BaseDebugPointAction::handle(HttpRequest* req) { } Status AddDebugPointAction::_handle(HttpRequest* req) { - std::string debug_point = req->param("debug_point"); + std::string name = req->param("debug_point"); std::string execute = req->param("execute"); std::string timeout = req->param("timeout"); - if (debug_point.empty()) { + if (name.empty()) { return Status::InternalError("Empty debug point name"); } - int64_t execute_limit = -1; - int64_t timeout_second = -1; + auto debug_point = std::make_shared(); try { if (!execute.empty()) { - execute_limit = std::stol(execute); + debug_point->execute_limit = std::stol(execute); } } catch (const std::exception& e) { return Status::InternalError("Invalid execute limit format, execute {}, err {}", execute, @@ -61,14 +61,19 @@ Status AddDebugPointAction::_handle(HttpRequest* req) { } try { if (!timeout.empty()) { - timeout_second = std::stol(timeout); + int64_t timeout_second = std::stol(timeout); + if (timeout_second > 0) { + debug_point->expire_ms = MonotonicMillis() + timeout_second * MILLIS_PER_SEC; + } } } catch (const std::exception& e) { return Status::InternalError("Invalid timeout format, timeout {}, err {}", timeout, e.what()); } - DebugPoints::instance()->add(debug_point, execute_limit, timeout_second); + debug_point->params = *(req->params()); + + DebugPoints::instance()->add(name, debug_point); return Status::OK(); } diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 0d19e78cf7..ec41ce1295 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -60,6 +60,7 @@ #include "runtime/client_cache.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/thread_context.h" +#include "util/debug_points.h" #include "util/defer_op.h" #include "util/network_util.h" #include "util/stopwatch.hpp" @@ -105,6 +106,10 @@ Status EngineCloneTask::execute() { } Status EngineCloneTask::_do_clone() { + DBUG_EXECUTE_IF("EngineCloneTask.wait_clone", { + auto duration = std::chrono::milliseconds(dp->param("duration", 10 * 1000)); + std::this_thread::sleep_for(duration); + }); Status status = Status::OK(); string src_file_path; TBackend src_host; diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 2bdd7e5b26..407aea6e44 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -44,6 +44,7 @@ #include "olap/tablet_meta.h" #include "olap/tablet_meta_manager.h" #include "olap/task/engine_publish_version_task.h" +#include "util/debug_points.h" #include "util/time.h" namespace doris { @@ -225,6 +226,12 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, key.first, key.second, tablet_info.to_string()); } + DBUG_EXECUTE_IF( + "TxnManager.commit_txn_random_failed", + if (rand() % 100 < (100 * dp->param("percent", 0.5))) { + return Status::InternalError("debug commit txn random failed"); + }); + std::lock_guard txn_lock(_get_txn_lock(transaction_id)); // this while loop just run only once, just for if break do { diff --git a/be/src/util/debug_points.cpp b/be/src/util/debug_points.cpp index 587f8c944a..43bb39df9a 100644 --- a/be/src/util/debug_points.cpp +++ b/be/src/util/debug_points.cpp @@ -30,37 +30,42 @@ DebugPoints* DebugPoints::instance() { } bool DebugPoints::is_enable(const std::string& name) { + return get_debug_point(name) != nullptr; +} + +std::shared_ptr DebugPoints::get_debug_point(const std::string& name) { if (!config::enable_debug_points) { - return false; + return nullptr; } auto map_ptr = std::atomic_load_explicit(&_debug_points, std::memory_order_relaxed); auto it = map_ptr->find(name); if (it == map_ptr->end()) { - return false; + return nullptr; } - auto& debug_point = *(it->second); - if ((debug_point.expire_ms > 0 && MonotonicMillis() >= debug_point.expire_ms) || - (debug_point.execute_limit > 0 && - debug_point.execute_num.fetch_add(1, std::memory_order_relaxed) >= - debug_point.execute_limit)) { + auto debug_point = it->second; + if ((debug_point->expire_ms > 0 && MonotonicMillis() >= debug_point->expire_ms) || + (debug_point->execute_limit > 0 && + debug_point->execute_num.fetch_add(1, std::memory_order_relaxed) >= + debug_point->execute_limit)) { remove(name); - return false; + return nullptr; } - return true; + return debug_point; } -void DebugPoints::add(const std::string& name, int64_t execute_limit, int64_t timeout_second) { - auto debug_point = std::make_shared(); - debug_point->execute_limit = execute_limit; - if (timeout_second > 0) { - debug_point->expire_ms = MonotonicMillis() + timeout_second * MILLIS_PER_SEC; - } +void DebugPoints::add(const std::string& name, std::shared_ptr debug_point) { update([&](DebugPointMap& new_points) { new_points[name] = debug_point; }); - LOG(INFO) << "add debug point: name=" << name << ", execute=" << execute_limit - << ", timeout=" << timeout_second; + std::ostringstream oss; + oss << "{"; + for (auto [key, value] : debug_point->params) { + oss << key << " : " << value << ", "; + } + oss << "}"; + + LOG(INFO) << "add debug point: name=" << name << ", params=" << oss.str(); } void DebugPoints::remove(const std::string& name) { diff --git a/be/src/util/debug_points.h b/be/src/util/debug_points.h index 704405689c..47b3aaa9cb 100644 --- a/be/src/util/debug_points.h +++ b/be/src/util/debug_points.h @@ -18,19 +18,21 @@ #pragma once #include +#include #include #include #include -#include +#include #include "common/compiler_util.h" #include "common/config.h" -#define DBUG_EXECUTE_IF(debug_point, code) \ - if (UNLIKELY(config::enable_debug_points)) { \ - if (DebugPoints::instance()->is_enable(debug_point)) { \ - code; \ - } \ +#define DBUG_EXECUTE_IF(debug_point_name, code) \ + if (UNLIKELY(config::enable_debug_points)) { \ + auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ + if (dp) { \ + code; \ + } \ } namespace doris { @@ -39,12 +41,41 @@ struct DebugPoint { std::atomic execute_num {0}; int64_t execute_limit = -1; int64_t expire_ms = -1; + + std::map params; + + template + T param(const std::string& key, T default_value = T()) { + auto it = params.find(key); + if (it == params.end()) { + return default_value; + } + if constexpr (std::is_same_v) { + if (it->second == "true") { + return true; + } + if (it->second == "false") { + return false; + } + return boost::lexical_cast(it->second); + } else if constexpr (std::is_arithmetic_v) { + return boost::lexical_cast(it->second); + } else { + static_assert(std::is_same_v); + return it->second; + } + } + + std::string param(const std::string& key, const char* default_value) { + return param(key, std::string(default_value)); + } }; class DebugPoints { public: bool is_enable(const std::string& name); - void add(const std::string& name, int64_t execute_limit, int64_t timeout_second); + std::shared_ptr get_debug_point(const std::string& name); + void add(const std::string& name, std::shared_ptr debug_point); void remove(const std::string& name); void clear(); diff --git a/be/test/util/debug_points_test.cpp b/be/test/util/debug_points_test.cpp index c2cf2bdedf..df86cf0d1b 100644 --- a/be/test/util/debug_points_test.cpp +++ b/be/test/util/debug_points_test.cpp @@ -54,6 +54,21 @@ TEST(DebugPointsTest, BaseTest) { EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug4")); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); EXPECT_FALSE(DebugPoints::instance()->is_enable("dbug4")); + + POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/dbug5?v1=1&v2=a&v3=1.2&v4=true&v5=false"); + EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug5")); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1, dp->param("v1", 100))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2"))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2", std::string()))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2", "b"))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param("v3"))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param("v3", 0.0))); + DBUG_EXECUTE_IF("dbug5", EXPECT_TRUE(dp->param("v4", false))); + DBUG_EXECUTE_IF("dbug5", EXPECT_FALSE(dp->param("v5", false))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param("v_not_exist"))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param("v_not_exist", 0L))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(123, dp->param("v_not_exist", 123))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("abcd", dp->param("v_not_exist", "abcd"))); } } // namespace doris diff --git a/docker/runtime/doris-compose/Dockerfile b/docker/runtime/doris-compose/Dockerfile index b930b10668..913c161d9d 100644 --- a/docker/runtime/doris-compose/Dockerfile +++ b/docker/runtime/doris-compose/Dockerfile @@ -25,6 +25,8 @@ ENV JAVA_HOME="/usr/local/openjdk-8/" ADD output /opt/apache-doris/ RUN apt-get update && \ - apt-get install -y default-mysql-client python lsof && \ + apt-get install -y default-mysql-client python lsof tzdata && \ + ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \ + dpkg-reconfigure -f noninteractive tzdata && \ apt-get clean diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py index adb2d2b0d0..65b19a561a 100644 --- a/docker/runtime/doris-compose/command.py +++ b/docker/runtime/doris-compose/command.py @@ -370,6 +370,10 @@ class DownCommand(Command): if args.clean: utils.enable_dir_with_rw_perm(node.get_path()) shutil.rmtree(node.get_path()) + register_file = "{}/{}-register".format( + CLUSTER.get_status_path(cluster.name), node.get_ip()) + if os.path.exists(register_file): + os.remove(register_file) LOG.info( utils.render_yellow( "Clean {} with id {} data cause has specific --clean" @@ -413,8 +417,18 @@ class ListNode(object): self.tablet_num, self.last_heartbeat, self.err_msg ] if detail: + query_port = "" + http_port = "" + if self.node_type == CLUSTER.Node.TYPE_FE: + query_port = CLUSTER.FE_QUERY_PORT + http_port = CLUSTER.FE_HTTP_PORT + elif self.node_type == CLUSTER.Node.TYPE_BE: + http_port = CLUSTER.BE_WEBSVR_PORT + else: + pass result += [ - CLUSTER.FE_QUERY_PORT, + query_port, + http_port, ] return result @@ -565,6 +579,7 @@ class ListCommand(Command): if args.detail: header += [ "query_port", + "http_port", ] rows = [] diff --git a/docs/en/docs/data-operate/import/import-advanced/min-load-replica-num.md b/docs/en/docs/data-operate/import/import-advanced/min-load-replica-num.md new file mode 100644 index 0000000000..70f9c0a087 --- /dev/null +++ b/docs/en/docs/data-operate/import/import-advanced/min-load-replica-num.md @@ -0,0 +1,101 @@ +--- +{ + "title": "Min Load Replica Num", + "language": "en" +} +--- + + + +# Min Load Replica Num + +Importing data requires more than half of the replicas to be written successfully. However, it is not flexible enough and may cause inconvenience in some scenarios. + +For example, in the case of two replicas, to import data, both replicas need to be written successfully. This means that no replica is allowed to be unavailable during the data import process. This greatly affects the availability of the cluster. + +In order to solve the above problems, Doris allows users to set the minimum number of write replicas. For the task of importing data, when the number of replicas it successfully writes is greater than or equal to the minimum number of replicas written, the import is successful. + +## Usage + +### Min load replica num for single table + +You can set the table property `min_load_replica_num` for a single olap table. The valid value of this property must be greater than 0 and not exceed `replication_num`(the number of replicas of the table). Its default value is -1, indicating that the property is not enabled. + +The `min_load_replica_num` of the table can be set when creating the table. + +```sql +CREATE TABLE test_table1 +( + k1 INT, + k2 INT +) +DUPLICATE KEY(k1) +DISTRIBUTED BY HASH(k1) BUCKETS 5 +PROPERTIES +( + 'replication_num' = '2', + 'min_load_replica_num' = '1' +); +``` + +For an existing table, you can use `ALTER TABLE` to modify its `min_load_replica_num`. + +```sql +ALTER TABLE test_table1 +SET ( 'min_load_replica_num' = '1'); +``` + +You can use `SHOW CREATE TABLE` to view the table property `min_load_replica_num`. + +```SQL +SHOW CREATE TABLE test_table1; +``` + +The PROPERTIES of the output will contain `min_load_replica_num`. e.g. + +```text +Create Table: CREATE TABLE `test_table1` ( + `k1` int(11) NULL, + `k2` int(11) NULL +) ENGINE=OLAP +DUPLICATE KEY(`k1`) +COMMENT 'OLAP' +DISTRIBUTED BY HASH(`k1`) BUCKETS 5 +PROPERTIES ( +"replication_allocation" = "tag.location.default: 2", +"min_load_replica_num" = "1", +"storage_format" = "V2", +"light_schema_change" = "true", +"disable_auto_compaction" = "false", +"enable_single_replica_compaction" = "false" +); +``` + +### Global min load replica num for all tables + +You can set FE configuration item `min_load_replica_num` for all olap tables. The valid value of this configuration item must be greater than 0. Its default value is -1, which means that the global minimum number of load replicas is not enabled. + +For a table, if the table property `min_load_replica_num` is valid (>0), then the table will ignore the global configuration `min_load_replica_num`. Otherwise, if the global configuration `min_load_replica_num` is valid (>0), then the minimum number of load replicas for the table will be equal to `min(FE.conf.min_load_replica_num, table.replication_num/2 + 1)`. + +For viewing and modification of FE configuration items, you can refer to [here](../../../admin-manual/config/fe-config.md). + +### Other cases + +If the table property `min_load_replica_num` is not enabled (<=0), and the global configuration `min_load_replica_num` is not enabled(<=0), then the data import still needs to be successfully written to the majority replica. At this point, the minimum number of write replicas for the table is equal to `table.replication_num/2 + 1`. diff --git a/docs/sidebars.json b/docs/sidebars.json index 0caf967961..b0015f1a4f 100644 --- a/docs/sidebars.json +++ b/docs/sidebars.json @@ -93,6 +93,13 @@ "data-operate/import/import-way/insert-into-manual", "data-operate/import/import-way/load-json-format" ] + }, + { + "type": "category", + "label": "Import Advanced", + "items": [ + "data-operate/import/import-advanced/min-load-replica-num" + ] } ] }, diff --git a/docs/zh-CN/docs/data-operate/import/import-advanced/min-load-replica-num.md b/docs/zh-CN/docs/data-operate/import/import-advanced/min-load-replica-num.md new file mode 100644 index 0000000000..827f6baf9f --- /dev/null +++ b/docs/zh-CN/docs/data-operate/import/import-advanced/min-load-replica-num.md @@ -0,0 +1,102 @@ +--- +{ + "title": "Min Load Replica Num", + "language": "zh-CN" +} +--- + + + +# Min Load Replica Num + +默认情况下,数据导入要求至少有超过半数的副本写入成功,导入才算成功。然而,这种方式不够灵活,在某些场景会带来不便。 + +举个例子,对于两副本情况,按上面的多数派原则,要想导入数据,则需要这两个副本都写入成功。这意味着,在导入数据过程中,不允许任意一个副本不可用。这极大影响了集群的可用性。 + +为了解决以上问题,Doris允许用户设置最小写入副本数(Min Load Replica Num)。对导入数据任务,当它成功写入的副本数大于或等于最小写入副本数时,导入即成功。 + +## 用法 + +### 单个表的最小写入副本数 + +可以对单个olap表,设置最小写入副本数,并用表属性`min_load_replica_num`来表示。该属性的有效值要求大于0且不超过表的副本数。其默认值为-1,表示不启用该属性。 + +可以在创建表时设置表的`min_load_replica_num`。 + +```sql +CREATE TABLE test_table1 +( + k1 INT, + k2 INT +) +DUPLICATE KEY(k1) +DISTRIBUTED BY HASH(k1) BUCKETS 5 +PROPERTIES +( + 'replication_num' = '2', + 'min_load_replica_num' = '1' +); +``` + +对一个已存在的表,可以使用语句`ALTER TABLE`来修改它的`min_load_replica_num`。 + +```sql +ALTER TABLE test_table1 +SET ( 'min_load_replica_num' = '1'); +``` + +可以使用语句`SHOW CREATE TABLE`来查看表的属性`min_load_replica_num`。 + +```SQL +SHOW CREATE TABLE test_table1; +``` + +输出结果的PROPERTIES中将包含`min_load_replica_num`。例如: + +```text +Create Table: CREATE TABLE `test_table1` ( + `k1` int(11) NULL, + `k2` int(11) NULL +) ENGINE=OLAP +DUPLICATE KEY(`k1`) +COMMENT 'OLAP' +DISTRIBUTED BY HASH(`k1`) BUCKETS 5 +PROPERTIES ( +"replication_allocation" = "tag.location.default: 2", +"min_load_replica_num" = "1", +"storage_format" = "V2", +"light_schema_change" = "true", +"disable_auto_compaction" = "false", +"enable_single_replica_compaction" = "false" +); +``` + +### 全局最小写入副本数 + +可以对所有olap表,设置全局最小写入副本数,并用FE的配置项`min_load_replica_num`来表示。该配置项的有效值要求大于0。其默认值为-1,表示不开启全局最小写入副本数。 + +对一个表,如果表属性`min_load_replica_num`有效(即大于0),那么该表将会忽略全局配置`min_load_replica_num`。否则,如果全局配置`min_load_replica_num`有效(即大于0),那么该表的最小写入副本数将等于`min(FE.conf.min_load_replica_num,table.replication_num/2 + 1)`。 + +对于FE配置项的查看和修改,可以参考[这里](../../../admin-manual/config/fe-config.md)。 + +### 其余情况 + +如果没有开启表属性`min_load_replica_num`(即小于或者等于0),也没有设置全局配置`min_load_replica_num`(即小于或等于0),那么数据的导入仍需多数派副本写入成功才算成功。此时,表的最小写入副本数等于`table.replicatition_num/2 + 1`。 + diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index d3c404c957..1560334c11 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -458,6 +458,10 @@ public class Config extends ConfigBase { "Maximal timeout for delete job, in seconds."}) public static int delete_job_max_timeout_second = 300; + @ConfField(mutable = true, masterOnly = true, description = {"Load 成功所需的最小写入副本数。", + "Minimal number of write successful replicas for load job."}) + public static short min_load_replica_num = -1; + @ConfField(description = {"load job 调度器的执行间隔,单位是秒。", "The interval of load job scheduler, in seconds."}) public static int load_checker_interval_second = 5; @@ -950,6 +954,17 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int schedule_batch_size = 50; + /** + * tablet health check interval. Do not modify it in production environment. + */ + @ConfField(mutable = false, masterOnly = true) + public static long tablet_checker_interval_ms = 20 * 1000; + + /** + * tablet scheduled interval. Do not modify it in production environment. + */ + @ConfField(mutable = false, masterOnly = true) + public static long tablet_schedule_interval_ms = 1000; /** * Deprecated after 0.10 diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 800788b7ca..e9e28a155c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -210,6 +210,10 @@ public class Alter { } else if (currentAlterOps.checkIsBeingSynced(alterClauses)) { olapTable.setIsBeingSynced(currentAlterOps.isBeingSynced(alterClauses)); needProcessOutsideTableLock = true; + } else if (currentAlterOps.checkMinLoadReplicaNum(alterClauses)) { + Preconditions.checkState(alterClauses.size() == 1); + AlterClause alterClause = alterClauses.get(0); + processModifyMinLoadReplicaNum(db, olapTable, alterClause); } else if (currentAlterOps.checkBinlogConfigChange(alterClauses)) { if (!Config.enable_feature_binlog) { throw new DdlException("Binlog feature is not enabled"); @@ -886,6 +890,37 @@ public class Alter { } } + private void processModifyMinLoadReplicaNum(Database db, OlapTable olapTable, AlterClause alterClause) + throws DdlException { + Map properties = alterClause.getProperties(); + short minLoadReplicaNum = -1; + try { + minLoadReplicaNum = PropertyAnalyzer.analyzeMinLoadReplicaNum(properties); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + ReplicaAllocation replicaAlloc = olapTable.getDefaultReplicaAllocation(); + if (minLoadReplicaNum > replicaAlloc.getTotalReplicaNum()) { + throw new DdlException("Failed to check min load replica num [" + minLoadReplicaNum + "] <= " + + "default replica num [" + replicaAlloc.getTotalReplicaNum() + "]"); + } + if (olapTable.dynamicPartitionExists()) { + replicaAlloc = olapTable.getTableProperty().getDynamicPartitionProperty().getReplicaAllocation(); + if (!replicaAlloc.isNotSet() && minLoadReplicaNum > replicaAlloc.getTotalReplicaNum()) { + throw new DdlException("Failed to check min load replica num [" + minLoadReplicaNum + "] <= " + + "dynamic partition replica num [" + replicaAlloc.getTotalReplicaNum() + "]"); + } + } + properties.put(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM, Short.toString(minLoadReplicaNum)); + olapTable.setMinLoadReplicaNum(minLoadReplicaNum); + olapTable.writeLockOrDdlException(); + try { + Env.getCurrentEnv().modifyTableProperties(db, olapTable, properties); + } finally { + olapTable.writeUnlock(); + } + } + public AlterHandler getSchemaChangeHandler() { return schemaChangeHandler; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java index 22104522ed..5742c631de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOperations.java @@ -84,6 +84,12 @@ public class AlterOperations { ).anyMatch(clause -> clause.getProperties().containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED)); } + public boolean checkMinLoadReplicaNum(List alterClauses) { + return alterClauses.stream().filter(clause -> + clause instanceof ModifyTablePropertiesClause + ).anyMatch(clause -> clause.getProperties().containsKey(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM)); + } + public boolean checkBinlogConfigChange(List alterClauses) { return alterClauses.stream().filter(clause -> clause instanceof ModifyTablePropertiesClause diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java index 0925154439..59259ba37d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java @@ -46,6 +46,8 @@ public class ModifyTablePropertiesClause extends AlterTableClause { private boolean isBeingSynced = false; + private short minLoadReplicaNum = -1; + public void setIsBeingSynced(boolean isBeingSynced) { this.isBeingSynced = isBeingSynced; } @@ -117,6 +119,9 @@ public class ModifyTablePropertiesClause extends AlterTableClause { this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TABLET_TYPE)) { throw new AnalysisException("Alter tablet type not supported"); + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM)) { + // do nothing, will be alter in Alter.processAlterOlapTable + this.needTableStable = false; } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY)) { this.needTableStable = false; String storagePolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, ""); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index e706ea6385..35c8c0e8e1 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -3111,6 +3111,10 @@ public class Env { sb.append("\"").append(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION).append("\" = \""); sb.append(replicaAlloc.toCreateStmt()).append("\""); + // min load replica num + sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM).append("\" = \""); + sb.append(olapTable.getMinLoadReplicaNum()).append("\""); + // bloom filter Set bfColumnNames = olapTable.getCopiedBfColumns(); if (bfColumnNames != null) { @@ -4646,6 +4650,7 @@ public class Env { tableProperty.modifyTableProperties(properties); } tableProperty.buildInMemory() + .buildMinLoadReplicaNum() .buildStoragePolicy() .buildIsBeingSynced() .buildCompactionPolicy() diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 2daa5c3ee4..598392d391 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1854,6 +1854,36 @@ public class OlapTable extends Table { tableProperty.buildEnableLightSchemaChange(); } + public short getMinLoadReplicaNum() { + if (tableProperty != null) { + return tableProperty.getMinLoadReplicaNum(); + } + + return -1; + } + + public void setMinLoadReplicaNum(short minLoadReplicaNum) { + TableProperty tableProperty = getOrCreatTableProperty(); + tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM, + Short.valueOf(minLoadReplicaNum).toString()); + tableProperty.buildMinLoadReplicaNum(); + } + + public int getLoadRequiredReplicaNum(long partitionId) { + int totalReplicaNum = partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum(); + int minLoadReplicaNum = getMinLoadReplicaNum(); + if (minLoadReplicaNum > 0) { + return Math.min(minLoadReplicaNum, totalReplicaNum); + } + + int quorum = totalReplicaNum / 2 + 1; + if (Config.min_load_replica_num > 0) { + return Math.min(quorum, Config.min_load_replica_num); + } + + return quorum; + } + public void setStoragePolicy(String storagePolicy) throws UserException { if (!Config.enable_storage_policy && !Strings.isNullOrEmpty(storagePolicy)) { throw new UserException("storage policy feature is disabled by default. " diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index ae8f85dc0c..07c04a907e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -58,6 +58,7 @@ public class TableProperty implements Writable { private DynamicPartitionProperty dynamicPartitionProperty = new DynamicPartitionProperty(Maps.newHashMap()); private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; private boolean isInMemory = false; + private short minLoadReplicaNum = -1; private String storagePolicy = ""; private Boolean isBeingSynced = null; @@ -121,6 +122,7 @@ public class TableProperty implements Writable { break; case OperationType.OP_MODIFY_IN_MEMORY: buildInMemory(); + buildMinLoadReplicaNum(); buildStoragePolicy(); buildIsBeingSynced(); buildCompactionPolicy(); @@ -275,6 +277,16 @@ public class TableProperty implements Writable { return timeSeriesCompactionTimeThresholdSeconds; } + public TableProperty buildMinLoadReplicaNum() { + minLoadReplicaNum = Short.parseShort( + properties.getOrDefault(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM, "-1")); + return this; + } + + public short getMinLoadReplicaNum() { + return minLoadReplicaNum; + } + public TableProperty buildStoragePolicy() { storagePolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, ""); return this; @@ -487,6 +499,7 @@ public class TableProperty implements Writable { TableProperty tableProperty = GsonUtils.GSON.fromJson(Text.readString(in), TableProperty.class) .executeBuildDynamicProperty() .buildInMemory() + .buildMinLoadReplicaNum() .buildStorageFormat() .buildDataSortInfo() .buildCompressionType() diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java index 4ec8993be0..fce07ed2a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java @@ -34,7 +34,6 @@ import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletScheduler.AddResult; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.persist.ColocatePersistInfo; @@ -73,7 +72,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { if (INSTANCE == null) { synchronized (ColocateTableCheckerAndBalancer.class) { if (INSTANCE == null) { - INSTANCE = new ColocateTableCheckerAndBalancer(FeConstants.tablet_checker_interval_ms); + INSTANCE = new ColocateTableCheckerAndBalancer(Config.tablet_checker_interval_ms); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java index 09e61ec59b..c6f6be3d46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java @@ -32,7 +32,6 @@ import org.apache.doris.catalog.Tablet.TabletStatus; import org.apache.doris.clone.TabletScheduler.AddResult; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.metric.GaugeMetric; @@ -127,7 +126,7 @@ public class TabletChecker extends MasterDaemon { public TabletChecker(Env env, SystemInfoService infoService, TabletScheduler tabletScheduler, TabletSchedulerStat stat) { - super("tablet checker", FeConstants.tablet_checker_interval_ms); + super("tablet checker", Config.tablet_checker_interval_ms); this.env = env; this.infoService = infoService; this.tabletScheduler = tabletScheduler; diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index b4667f8069..c9671cd2db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -309,7 +309,7 @@ public class TabletSchedCtx implements Comparable { } else { decommissionTime = -1; if (code == SubCode.WAITING_SLOT && type != Type.BALANCE) { - return failedSchedCounter > 30 * 1000 / FeConstants.tablet_schedule_interval_ms; + return failedSchedCounter > 30 * 1000 / Config.tablet_schedule_interval_ms; } else { return failedSchedCounter > 10; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index ee9da3ac10..510f039333 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -151,7 +151,7 @@ public class TabletScheduler extends MasterDaemon { public TabletScheduler(Env env, SystemInfoService infoService, TabletInvertedIndex invertedIndex, TabletSchedulerStat stat, String rebalancerType) { - super("tablet scheduler", FeConstants.tablet_schedule_interval_ms); + super("tablet scheduler", Config.tablet_schedule_interval_ms); this.env = env; this.infoService = infoService; this.invertedIndex = invertedIndex; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index b369234898..9d264b0c0a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -64,9 +64,6 @@ public class FeConstants { // use \N to indicate NULL public static String null_string = "\\N"; - public static long tablet_checker_interval_ms = 20 * 1000L; - public static long tablet_schedule_interval_ms = 1000L; - public static String FS_PREFIX_S3 = "s3"; public static String FS_PREFIX_S3A = "s3a"; public static String FS_PREFIX_S3N = "s3n"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java index aab9b8f2ba..d9cedb2d53 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java @@ -19,6 +19,8 @@ package org.apache.doris.common.util; import org.apache.doris.common.Config; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,39 +36,82 @@ public class DebugPointUtil { private static final Map debugPoints = new ConcurrentHashMap<>(); - private static class DebugPoint { + public static class DebugPoint { public AtomicInteger executeNum = new AtomicInteger(0); public int executeLimit = -1; public long expireTime = -1; + + // params + public Map params = Maps.newHashMap(); + + public E param(String key, E defaultValue) { + Preconditions.checkState(defaultValue != null); + + String value = params.get(key); + if (value == null) { + return defaultValue; + } + if (defaultValue instanceof Boolean) { + return (E) Boolean.valueOf(value); + } + if (defaultValue instanceof Byte) { + return (E) Byte.valueOf(value); + } + if (defaultValue instanceof Character) { + Preconditions.checkState(value.length() == 1); + return (E) Character.valueOf(value.charAt(0)); + } + if (defaultValue instanceof Short) { + return (E) Short.valueOf(value); + } + if (defaultValue instanceof Integer) { + return (E) Integer.valueOf(value); + } + if (defaultValue instanceof Long) { + return (E) Long.valueOf(value); + } + if (defaultValue instanceof Float) { + return (E) Float.valueOf(value); + } + if (defaultValue instanceof Double) { + return (E) Double.valueOf(value); + } + if (defaultValue instanceof String) { + return (E) value; + } + + Preconditions.checkState(false, "Can not convert with default value=" + defaultValue); + + return defaultValue; + } } public static boolean isEnable(String debugPointName) { + return getDebugPoint(debugPointName) != null; + } + + public static DebugPoint getDebugPoint(String debugPointName) { if (!Config.enable_debug_points) { - return false; + return null; } DebugPoint debugPoint = debugPoints.get(debugPointName); if (debugPoint == null) { - return false; + return null; } if ((debugPoint.expireTime > 0 && System.currentTimeMillis() >= debugPoint.expireTime) || (debugPoint.executeLimit > 0 && debugPoint.executeNum.incrementAndGet() > debugPoint.executeLimit)) { debugPoints.remove(debugPointName); - return false; + return null; } - return true; + return debugPoint; } - public static void addDebugPoint(String name, int executeLimit, long timeoutSecond) { - DebugPoint debugPoint = new DebugPoint(); - debugPoint.executeLimit = executeLimit; - if (timeoutSecond > 0) { - debugPoint.expireTime = System.currentTimeMillis() + timeoutSecond * 1000; - } + public static void addDebugPoint(String name, DebugPoint debugPoint) { debugPoints.put(name, debugPoint); - LOG.info("add debug point: name={}, execute={}, timeout seconds={}", name, executeLimit, timeoutSecond); + LOG.info("add debug point: name={}, params={}", name, debugPoint.params); } public static void removeDebugPoint(String name) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java index fcd578d9bc..de6101181f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java @@ -639,6 +639,10 @@ public class DynamicPartitionUtil { } else { replicaAlloc = olapTable.getDefaultReplicaAllocation(); } + if (olapTable.getMinLoadReplicaNum() > replicaAlloc.getTotalReplicaNum()) { + throw new DdlException("Failed to check min load replica num [" + olapTable.getMinLoadReplicaNum() + + "] <= dynamic partition replica num [" + replicaAlloc.getTotalReplicaNum() + "]"); + } checkReplicaAllocation(replicaAlloc, hotPartitionNum, db); if (properties.containsKey(DynamicPartitionProperty.RESERVED_HISTORY_PERIODS)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index d983c688f9..294a9d6c6a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -61,6 +61,7 @@ public class PropertyAnalyzer { public static final String PROPERTIES_SHORT_KEY = "short_key"; public static final String PROPERTIES_REPLICATION_NUM = "replication_num"; public static final String PROPERTIES_REPLICATION_ALLOCATION = "replication_allocation"; + public static final String PROPERTIES_MIN_LOAD_REPLICA_NUM = "min_load_replica_num"; public static final String PROPERTIES_STORAGE_TYPE = "storage_type"; public static final String PROPERTIES_STORAGE_MEDIUM = "storage_medium"; public static final String PROPERTIES_STORAGE_COOLDOWN_TIME = "storage_cooldown_time"; @@ -345,6 +346,24 @@ public class PropertyAnalyzer { return replicationNum; } + public static short analyzeMinLoadReplicaNum(Map properties) throws AnalysisException { + short minLoadReplicaNum = -1; + if (properties != null && properties.containsKey(PROPERTIES_MIN_LOAD_REPLICA_NUM)) { + try { + minLoadReplicaNum = Short.parseShort(properties.get(PROPERTIES_MIN_LOAD_REPLICA_NUM)); + } catch (Exception e) { + throw new AnalysisException(e.getMessage()); + } + + if (minLoadReplicaNum <= 0 && minLoadReplicaNum != -1) { + throw new AnalysisException("min_load_replica_num should > 0 or =-1"); + } + + properties.remove(PROPERTIES_MIN_LOAD_REPLICA_NUM); + } + return minLoadReplicaNum; + } + public static String analyzeColumnSeparator(Map properties, String oldColumnSeparator) { String columnSeparator = oldColumnSeparator; if (properties != null && properties.containsKey(PROPERTIES_COLUMN_SEPARATOR)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 75a212cb1f..8ecdf643e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -2062,6 +2062,18 @@ public class InternalCatalog implements CatalogIf { // this should be done before create partition. Map properties = stmt.getProperties(); + short minLoadReplicaNum = -1; + try { + minLoadReplicaNum = PropertyAnalyzer.analyzeMinLoadReplicaNum(properties); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + if (minLoadReplicaNum > replicaAlloc.getTotalReplicaNum()) { + throw new DdlException("Failed to check min load replica num [" + minLoadReplicaNum + "] <= " + + "default replica num [" + replicaAlloc.getTotalReplicaNum() + "]"); + } + olapTable.setMinLoadReplicaNum(minLoadReplicaNum); + // get use light schema change Boolean enableLightSchemaChange; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java index 25ee7a5d0a..8c102fd0ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java @@ -19,6 +19,7 @@ package org.apache.doris.httpv2.rest; import org.apache.doris.common.Config; import org.apache.doris.common.util.DebugPointUtil; +import org.apache.doris.common.util.DebugPointUtil.DebugPoint; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -40,7 +41,7 @@ import javax.servlet.http.HttpServletResponse; public class DebugPointAction extends RestBaseController { @RequestMapping(path = "/api/debug_point/add/{debugPoint}", method = RequestMethod.POST) - protected Object addDebugPoint(@PathVariable("debugPoint") String debugPoint, + protected Object addDebugPoint(@PathVariable("debugPoint") String name, @RequestParam(name = "execute", required = false, defaultValue = "") String execute, @RequestParam(name = "timeout", required = false, defaultValue = "") String timeout, HttpServletRequest request, HttpServletResponse response) { @@ -50,28 +51,38 @@ public class DebugPointAction extends RestBaseController { } executeCheckPassword(request, response); checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); - if (Strings.isNullOrEmpty(debugPoint)) { + if (Strings.isNullOrEmpty(name)) { return ResponseEntityBuilder.badRequest("Empty debug point name."); } - int executeLimit = -1; + + DebugPoint debugPoint = new DebugPoint(); if (!Strings.isNullOrEmpty(execute)) { try { - executeLimit = Integer.valueOf(execute); + debugPoint.executeLimit = Integer.valueOf(execute); } catch (Exception e) { return ResponseEntityBuilder.badRequest( "Invalid execute format: " + execute + ", err " + e.getMessage()); } } - long timeoutSeconds = -1; if (!Strings.isNullOrEmpty(timeout)) { try { - timeoutSeconds = Long.valueOf(timeout); + long timeoutSeconds = Long.valueOf(timeout); + if (timeoutSeconds > 0) { + debugPoint.expireTime = System.currentTimeMillis() + timeoutSeconds * 1000; + } } catch (Exception e) { return ResponseEntityBuilder.badRequest( "Invalid timeout format: " + timeout + ", err " + e.getMessage()); } } - DebugPointUtil.addDebugPoint(debugPoint, executeLimit, timeoutSeconds); + request.getParameterMap().forEach((key, values) -> { + if (values != null && values.length > 0) { + debugPoint.params.put(key, values[0]); + } + }); + + DebugPointUtil.addDebugPoint(name, debugPoint); + return ResponseEntityBuilder.ok(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 0f328e31e7..6df438bb25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -38,6 +38,7 @@ import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.RangePartitionItem; +import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Tablet; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; @@ -46,6 +47,8 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugPointUtil; +import org.apache.doris.common.util.DebugPointUtil.DebugPoint; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TColumn; @@ -80,6 +83,7 @@ import org.apache.logging.log4j.Logger; import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -429,19 +433,25 @@ public class OlapTableSink extends DataSink { Multimap allBePathsMap = HashMultimap.create(); for (Long partitionId : partitionIds) { Partition partition = table.getPartition(partitionId); - int quorum = table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + 1; + int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partition.getId()); for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) { // we should ensure the replica backend is alive // otherwise, there will be a 'unknown node id, id=xxx' error for stream load for (Tablet tablet : index.getTablets()) { Multimap bePathsMap = tablet.getNormalReplicaBackendPathMap(); - if (bePathsMap.keySet().size() < quorum) { + if (bePathsMap.keySet().size() < loadRequiredReplicaNum) { throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, "tablet " + tablet.getId() + " alive replica num " + bePathsMap.keySet().size() - + " < quorum replica num " + quorum + + " < load required replica num " + loadRequiredReplicaNum + ", alive backends: [" + StringUtils.join(bePathsMap.keySet(), ",") + "]"); } + debugWriteRandomChooseSink(tablet, partition.getVisibleVersion(), bePathsMap); + if (bePathsMap.keySet().isEmpty()) { + throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, + "tablet " + tablet.getId() + " no available replica"); + } + if (singleReplicaLoad) { Long[] nodes = bePathsMap.keySet().toArray(new Long[0]); Random random = new SecureRandom(); @@ -474,6 +484,39 @@ public class OlapTableSink extends DataSink { return Arrays.asList(locationParam, slaveLocationParam); } + private void debugWriteRandomChooseSink(Tablet tablet, long version, Multimap bePathsMap) { + DebugPoint debugPoint = DebugPointUtil.getDebugPoint("OlapTableSink.write_random_choose_sink"); + if (debugPoint == null) { + return; + } + + boolean needCatchup = debugPoint.param("needCatchUp", false); + int sinkNum = debugPoint.param("sinkNum", 0); + if (sinkNum == 0) { + sinkNum = new SecureRandom().nextInt() % bePathsMap.size() + 1; + } + List candidatePaths = tablet.getReplicas().stream() + .filter(replica -> !needCatchup || replica.getVersion() >= version) + .map(Replica::getPathHash) + .collect(Collectors.toList()); + if (sinkNum > 0 && sinkNum < candidatePaths.size()) { + Collections.shuffle(candidatePaths); + while (candidatePaths.size() > sinkNum) { + candidatePaths.remove(candidatePaths.size() - 1); + } + } + + Multimap result = HashMultimap.create(); + bePathsMap.forEach((tabletId, pathHash) -> { + if (candidatePaths.contains(pathHash)) { + result.put(tabletId, pathHash); + } + }); + + bePathsMap.clear(); + bePathsMap.putAll(result); + } + private TPaloNodesInfo createPaloNodesInfo() { TPaloNodesInfo nodesInfo = new TPaloNodesInfo(); SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 1921402cfd..4c03f49470 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -535,8 +535,7 @@ public class DatabaseTransactionMgr { transactionState.prolongPublishTimeout(); } - int quorumReplicaNum = table.getPartitionInfo() - .getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + 1; + int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partition.getId()); for (MaterializedIndex index : allIndices) { for (Tablet tablet : index.getTablets()) { tabletSuccReplicas.clear(); @@ -574,13 +573,13 @@ public class DatabaseTransactionMgr { } int successReplicaNum = tabletSuccReplicas.size(); - if (successReplicaNum < quorumReplicaNum) { + if (successReplicaNum < loadRequiredReplicaNum) { String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas, tabletVersionFailedReplicas); - String errMsg = String.format("Failed to commit txn %s, cause tablet %s succ replica " - + "num %s < quorum replica num %s. table %s, partition %s, this tablet detail: %s", - transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum, tableId, + String errMsg = String.format("Failed to commit txn %s, cause tablet %s succ replica num %s" + + " < load required replica num %s. table %s, partition %s, this tablet detail: %s", + transactionId, tablet.getId(), successReplicaNum, loadRequiredReplicaNum, tableId, partition.getId(), writeDetail); LOG.info(errMsg); @@ -957,7 +956,7 @@ public class DatabaseTransactionMgr { transactionState); continue; } - PartitionInfo partitionInfo = table.getPartitionInfo(); + Iterator partitionCommitInfoIterator = tableCommitInfo.getIdToPartitionCommitInfo().values().iterator(); while (partitionCommitInfoIterator.hasNext()) { @@ -983,8 +982,8 @@ public class DatabaseTransactionMgr { transactionState.setErrorMsg(errMsg); return; } - int quorumReplicaNum = partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum() / 2 + 1; + int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partitionId); List allIndices; if (transactionState.getLoadedTblIndexes().isEmpty()) { allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL); @@ -1017,15 +1016,15 @@ public class DatabaseTransactionMgr { } int healthReplicaNum = tabletSuccReplicas.size(); - if (healthReplicaNum >= quorumReplicaNum) { + if (healthReplicaNum >= loadRequiredReplicaNum) { if (!tabletWriteFailedReplicas.isEmpty() || !tabletVersionFailedReplicas.isEmpty()) { String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas, tabletVersionFailedReplicas); LOG.info("publish version quorum succ for transaction {} on tablet {} with version" - + " {}, and has failed replicas, quorum num {}. table {}, partition {}," - + " tablet detail: {}", + + " {}, and has failed replicas, load require replica num {}. table {}, " + + "partition {}, tablet detail: {}", transactionState, tablet.getId(), partitionCommitInfo.getVersion(), - quorumReplicaNum, tableId, partitionId, writeDetail); + loadRequiredReplicaNum, tableId, partitionId, writeDetail); } continue; } @@ -1050,23 +1049,23 @@ public class DatabaseTransactionMgr { // ahead, otherwise data may be lost and thre // publish task hangs forever. LOG.info("publish version timeout succ for transaction {} on tablet {} with version" - + " {}, and has failed replicas, quorum num {}. table {}, partition {}," - + " tablet detail: {}", + + " {}, and has failed replicas, load require replica num {}. table {}, " + + "partition {}, tablet detail: {}", transactionState, tablet.getId(), partitionCommitInfo.getVersion(), - quorumReplicaNum, tableId, partitionId, writeDetail); + loadRequiredReplicaNum, tableId, partitionId, writeDetail); } else { publishResult = PublishResult.FAILED; String errMsg = String.format("publish on tablet %d failed." - + " succeed replica num %d less than quorum %d." + + " succeed replica num %d < load required replica num %d." + " table: %d, partition: %d, publish version: %d", - tablet.getId(), healthReplicaNum, quorumReplicaNum, tableId, + tablet.getId(), healthReplicaNum, loadRequiredReplicaNum, tableId, partitionId, partition.getVisibleVersion() + 1); transactionState.setErrorMsg(errMsg); LOG.info("publish version failed for transaction {} on tablet {} with version" - + " {}, and has failed replicas, quorum num {}. table {}, partition {}," - + " tablet detail: {}", + + " {}, and has failed replicas, load required replica num {}. table {}, " + + "partition {}, tablet detail: {}", transactionState, tablet.getId(), partitionCommitInfo.getVersion(), - quorumReplicaNum, tableId, partitionId, writeDetail); + loadRequiredReplicaNum, tableId, partitionId, writeDetail); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 747508d211..922e8645d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -19,6 +19,7 @@ package org.apache.doris.transaction; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.metric.MetricRepo; import org.apache.doris.system.SystemInfoService; @@ -59,6 +60,9 @@ public class PublishVersionDaemon extends MasterDaemon { } private void publishVersion() { + if (DebugPointUtil.isEnable("PublishVersionDaemon.stop_publish")) { + return; + } GlobalTransactionMgr globalTransactionMgr = Env.getCurrentGlobalTransactionMgr(); List readyTransactionStates = globalTransactionMgr.getReadyToPublishTransactions(); if (readyTransactionStates.isEmpty()) { @@ -147,7 +151,8 @@ public class PublishVersionDaemon extends MasterDaemon { .anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId())); transactionState.setTableIdToTotalNumDeltaRows(tableIdToNumDeltaRows); - boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout(); + boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout() + || DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks"); if (shouldFinishTxn) { try { // one transaction exception should not affect other transaction diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java index 4a78ef4de2..da9891c4e3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java @@ -84,8 +84,7 @@ public class AlterTest { public static void beforeClass() throws Exception { FeConstants.runningUnitTest = true; FeConstants.default_scheduler_interval_millisecond = 100; - FeConstants.tablet_checker_interval_ms = 100; - FeConstants.tablet_checker_interval_ms = 100; + Config.tablet_checker_interval_ms = 100; Config.dynamic_partition_check_interval_seconds = 1; Config.disable_storage_medium_check = true; Config.enable_storage_policy = true; diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableAsSelectStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableAsSelectStmtTest.java index 854225c742..664b5e5514 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableAsSelectStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableAsSelectStmtTest.java @@ -92,6 +92,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`userId`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -113,6 +114,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`__sum_0`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -130,6 +132,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`__sum_0`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -165,6 +168,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`userId`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -189,6 +193,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`__count_0`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -215,6 +220,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`__sum_0`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -238,6 +244,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`amount`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -257,6 +264,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`alias_name`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -283,6 +291,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`userId`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -306,6 +315,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`userId1`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -333,6 +343,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`user`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -357,6 +368,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`userId`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -380,6 +392,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`userId`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -399,6 +412,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`id`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -425,6 +439,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`userId`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -450,6 +465,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`userId`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -474,6 +490,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`username`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -499,6 +516,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`userId`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -548,6 +566,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`k1`) BUCKETS 1\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -565,6 +584,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`k1`) BUCKETS 1\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" @@ -596,6 +616,7 @@ public class CreateTableAsSelectStmtTest extends TestWithFeService { + "DISTRIBUTED BY HASH(`__literal_0`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"min_load_replica_num\" = \"-1\",\n" + "\"is_being_synced\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"light_schema_change\" = \"true\",\n" diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java index 3e98d8e285..f8b3797321 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java @@ -48,7 +48,7 @@ public class CreateTableTest { @BeforeClass public static void beforeClass() throws Exception { Config.disable_storage_medium_check = true; - UtFrameUtils.createDorisCluster(runningDir); + UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 3); // create connect context connectContext = UtFrameUtils.createDefaultCtx(); @@ -278,10 +278,10 @@ public class CreateTableTest { + "properties('replication_num' = '1', 'short_key' = '4');")); ExceptionChecker - .expectThrowsWithMsg(DdlException.class, "replication num should be less than the number of available backends. replication num is 3, available backend num is 1", + .expectThrowsWithMsg(DdlException.class, "replication num should be less than the number of available backends. replication num is 4, available backend num is 3", () -> createTable("create table test.atbl5\n" + "(k1 int, k2 int, k3 int)\n" + "duplicate key(k1, k2, k3)\n" + "distributed by hash(k1) buckets 1\n" - + "properties('replication_num' = '3');")); + + "properties('replication_num' = '4');")); ExceptionChecker.expectThrowsNoException( () -> createTable("create table test.atbl6\n" + "(k1 int, k2 int)\n" + "duplicate key(k1)\n" @@ -749,24 +749,160 @@ public class CreateTableTest { @Test public void testCreateTableWithForceReplica() throws DdlException { - Config.force_olap_table_replication_num = 1; - // no need to specify replication_num, the table can still be created. - ExceptionChecker.expectThrowsNoException(() -> { - createTable("create table test.test_replica\n" + "(k1 int, k2 int) partition by range(k1)\n" + "(\n" - + "partition p1 values less than(\"10\"),\n" + "partition p2 values less than(\"20\")\n" + ")\n" - + "distributed by hash(k2) buckets 1;"); - }); + try { + Config.force_olap_table_replication_num = 1; + // no need to specify replication_num, the table can still be created. + ExceptionChecker.expectThrowsNoException(() -> { + createTable("create table test.test_replica\n" + "(k1 int, k2 int) partition by range(k1)\n" + "(\n" + + "partition p1 values less than(\"10\"),\n" + "partition p2 values less than(\"20\")\n" + ")\n" + + "distributed by hash(k2) buckets 1 \n properties ('replication_num' = '4') ;"); + }); - // can still set replication_num manually. - ExceptionChecker.expectThrowsWithMsg(UserException.class, "Failed to find enough host with tag", - () -> { - alterTable("alter table test.test_replica modify partition p1 set ('replication_num' = '3')"); - }); + // can still set replication_num manually. + ExceptionChecker.expectThrowsWithMsg(UserException.class, "Failed to find enough host with tag", + () -> { + alterTable("alter table test.test_replica modify partition p1 set ('replication_num' = '4')"); + }); + + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException("default_cluster:test"); + OlapTable tb = (OlapTable) db.getTableOrDdlException("test_replica"); + Partition p1 = tb.getPartition("p1"); + Assert.assertEquals(1, tb.getPartitionInfo().getReplicaAllocation(p1.getId()).getTotalReplicaNum()); + Assert.assertEquals(1, tb.getTableProperty().getReplicaAllocation().getTotalReplicaNum()); + } finally { + Config.force_olap_table_replication_num = -1; + } + } + + @Test + public void testCreateTableWithMinLoadReplicaNum() throws Exception { + ExceptionChecker.expectThrowsNoException( + () -> createTable("create table test.tbl_min_load_replica_num_1\n" + + "(k1 int, k2 int)\n" + + "duplicate key(k1)\n" + + "distributed by hash(k1) buckets 1\n" + + "properties(\n" + + " 'replication_num' = '2',\n" + + " 'min_load_replica_num' = '1'\n" + + ");")); Database db = Env.getCurrentInternalCatalog().getDbOrDdlException("default_cluster:test"); - OlapTable tb = (OlapTable) db.getTableOrDdlException("test_replica"); - Partition p1 = tb.getPartition("p1"); - Assert.assertEquals(1, tb.getPartitionInfo().getReplicaAllocation(p1.getId()).getTotalReplicaNum()); - Assert.assertEquals(1, tb.getTableProperty().getReplicaAllocation().getTotalReplicaNum()); + OlapTable tbl1 = (OlapTable) db.getTableOrDdlException("tbl_min_load_replica_num_1"); + Assert.assertEquals(1, tbl1.getMinLoadReplicaNum()); + Assert.assertEquals(2, (int) tbl1.getDefaultReplicaAllocation().getTotalReplicaNum()); + + ExceptionChecker.expectThrowsNoException( + () -> alterTable("alter table test.tbl_min_load_replica_num_1\n" + + " set ( 'min_load_replica_num' = '2');")); + Assert.assertEquals(2, tbl1.getMinLoadReplicaNum()); + + ExceptionChecker.expectThrowsWithMsg(DdlException.class, "Failed to check min load replica num", + () -> alterTable("alter table test.tbl_min_load_replica_num_1\n" + + " set ( 'min_load_replica_num' = '3');")); + Assert.assertEquals(2, tbl1.getMinLoadReplicaNum()); + + ExceptionChecker.expectThrowsWithMsg(DdlException.class, "min_load_replica_num should > 0 or =-1", + () -> alterTable("alter table test.tbl_min_load_replica_num_1\n" + + " set ( 'min_load_replica_num' = '-3');")); + + ExceptionChecker.expectThrowsWithMsg(DdlException.class, "Failed to check min load replica num", + () -> createTable("create table test.tbl_min_load_replica_num_2\n" + + "(k1 int, k2 int)\n" + + "duplicate key(k1)\n" + + "distributed by hash(k1) buckets 1\n" + + "properties(\n" + + " 'replication_num' = '2',\n" + + " 'min_load_replica_num' = '3'\n" + + ");")); + + ExceptionChecker.expectThrowsNoException( + () -> createTable("create table test.tbl_min_load_replica_num_3\n" + + "(k1 date, k2 int, k3 int)\n" + + "unique key(k1, k2)\n" + + "partition by range(k1)()\n" + + "distributed by hash(k2) buckets 2\n" + + "properties(\n" + + " 'dynamic_partition.enable' = 'true',\n" + + " 'dynamic_partition.time_unit' = 'DAY',\n" + + " 'dynamic_partition.prefix' = 'p',\n" + + " 'dynamic_partition.replication_allocation' = 'tag.location.default: 2',\n" + + " 'dynamic_partition.end' = '4',\n" + + " 'dynamic_partition.buckets' = '3',\n" + + " 'min_load_replica_num' = '1'\n" + + ");")); + + OlapTable tbl3 = (OlapTable) db.getTableOrDdlException("tbl_min_load_replica_num_3"); + Assert.assertEquals(1, tbl3.getMinLoadReplicaNum()); + + ExceptionChecker.expectThrowsWithMsg(DdlException.class, "Failed to check min load replica num", + () -> createTable("create table test.tbl_min_load_replica_num_4\n" + + "(k1 date, k2 int, k3 int)\n" + + "unique key(k1, k2)\n" + + "partition by range(k1)()\n" + + "distributed by hash(k2) buckets 2\n" + + "properties(\n" + + " 'dynamic_partition.enable' = 'true',\n" + + " 'dynamic_partition.time_unit' = 'DAY',\n" + + " 'dynamic_partition.prefix' = 'p',\n" + + " 'dynamic_partition.replication_allocation' = 'tag.location.default: 2',\n" + + " 'dynamic_partition.end' = '4',\n" + + " 'dynamic_partition.buckets' = '3',\n" + + " 'min_load_replica_num' = '3'\n" + + ");")); + + ExceptionChecker.expectThrowsWithMsg(DdlException.class, "min_load_replica_num should > 0 or =-1", + () -> createTable("create table test.tbl_min_load_replica_num_5\n" + + "(k1 int, k2 int)\n" + + "duplicate key(k1)\n" + + "distributed by hash(k1) buckets 1\n" + + "properties(\n" + + " 'replication_num' = '2',\n" + + " 'min_load_replica_num' = '-2'\n" + + ");")); + + ExceptionChecker.expectThrowsNoException( + () -> createTable("create table test.tbl_min_load_replica_num_6\n" + + "(`uuid` varchar(255) NULL,\n" + + "`action_datetime` date NULL\n" + + ")\n" + + "DUPLICATE KEY(uuid)\n" + + "PARTITION BY RANGE(action_datetime)()\n" + + "DISTRIBUTED BY HASH(uuid) BUCKETS 3\n" + + "PROPERTIES\n" + + "(\n" + + "\"dynamic_partition.enable\" = \"true\",\n" + + "\"dynamic_partition.time_unit\" = \"DAY\",\n" + + "\"dynamic_partition.end\" = \"3\",\n" + + "\"dynamic_partition.prefix\" = \"p\",\n" + + "\"dynamic_partition.buckets\" = \"32\",\n" + + "\"dynamic_partition.replication_num\" = \"2\",\n" + + "\"dynamic_partition.create_history_partition\"=\"true\",\n" + + "\"dynamic_partition.start\" = \"-3\"\n" + + ");\n")); + + ExceptionChecker.expectThrowsWithMsg(DdlException.class, "Failed to check min load replica num", + () -> alterTable("alter table test.tbl_min_load_replica_num_6\n" + + " set ( 'min_load_replica_num' = '3');")); + + ExceptionChecker.expectThrowsWithMsg(DdlException.class, "Failed to check min load replica num", + () -> createTable("create table test.tbl_min_load_replica_num_7\n" + + "(`uuid` varchar(255) NULL,\n" + + "`action_datetime` date NULL\n" + + ")\n" + + "DUPLICATE KEY(uuid)\n" + + "PARTITION BY RANGE(action_datetime)()\n" + + "DISTRIBUTED BY HASH(uuid) BUCKETS 3\n" + + "PROPERTIES\n" + + "(\n" + + "\"min_load_replica_num\" = \"3\",\n" + + "\"dynamic_partition.enable\" = \"true\",\n" + + "\"dynamic_partition.time_unit\" = \"DAY\",\n" + + "\"dynamic_partition.end\" = \"3\",\n" + + "\"dynamic_partition.prefix\" = \"p\",\n" + + "\"dynamic_partition.buckets\" = \"32\",\n" + + "\"dynamic_partition.replication_num\" = \"2\",\n" + + "\"dynamic_partition.create_history_partition\"=\"true\",\n" + + "\"dynamic_partition.start\" = \"-3\"\n" + + ");\n")); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java index 43ea5340bc..1764573d0f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java @@ -66,8 +66,8 @@ public class DecommissionTest { FeConstants.runningUnitTest = true; System.out.println(runningDir); FeConstants.runningUnitTest = true; - FeConstants.tablet_checker_interval_ms = 200; - FeConstants.tablet_schedule_interval_ms = 2000; + Config.tablet_schedule_interval_ms = 2000; + Config.tablet_checker_interval_ms = 200; Config.tablet_repair_delay_factor_second = 1; Config.enable_round_robin_create_tablet = true; Config.schedule_slot_num_per_hdd_path = 10000; diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java index 1524ec3a88..3e178ef090 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java @@ -105,7 +105,7 @@ public class TabletRepairAndBalanceTest { FeConstants.runningUnitTest = true; System.out.println(runningDir); FeConstants.runningUnitTest = true; - FeConstants.tablet_checker_interval_ms = 1000; + Config.tablet_checker_interval_ms = 1000; Config.tablet_repair_delay_factor_second = 1; Config.colocate_group_relocate_delay_second = 1; // 5 backends: diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java index f76c6e6bbf..6a38985b73 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java @@ -74,7 +74,7 @@ public class TabletReplicaTooSlowTest { FeConstants.runningUnitTest = true; System.out.println(runningDir); FeConstants.runningUnitTest = true; - FeConstants.tablet_checker_interval_ms = 1000; + Config.tablet_checker_interval_ms = 1000; Config.tablet_repair_delay_factor_second = 1; Config.repair_slow_replica = true; // 5 backends: diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index 209f8a1127..568b886c35 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -52,7 +52,7 @@ public class DecommissionBackendTest extends TestWithFeService { @Override protected void beforeCreatingConnectContext() throws Exception { FeConstants.default_scheduler_interval_millisecond = 1000; - FeConstants.tablet_checker_interval_ms = 1000; + Config.tablet_checker_interval_ms = 1000; Config.tablet_repair_delay_factor_second = 1; Config.allow_replica_on_same_host = true; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java index 984d5e9c71..3a23928734 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java @@ -133,8 +133,8 @@ public class AutoBucketUtilsTest { @Before public void setUp() throws Exception { FeConstants.runningUnitTest = true; - FeConstants.tablet_checker_interval_ms = 1000; FeConstants.default_scheduler_interval_millisecond = 100; + Config.tablet_checker_interval_ms = 1000; Config.tablet_repair_delay_factor_second = 1; connectContext = UtFrameUtils.createDefaultCtx(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java index 2845cec922..5c6492b063 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java @@ -18,6 +18,7 @@ package org.apache.doris.common.util; import org.apache.doris.common.Config; +import org.apache.doris.common.util.DebugPointUtil.DebugPoint; import org.apache.doris.http.DorisHttpTestCase; import okhttp3.Request; @@ -54,6 +55,17 @@ public class DebugPointUtilTest extends DorisHttpTestCase { Assert.assertTrue(DebugPointUtil.isEnable("dbug4")); Thread.sleep(1000); Assert.assertFalse(DebugPointUtil.isEnable("dbug4")); + + sendRequest("/api/debug_point/add/dbug5?v1=1&v2=a&v3=1.2&v4=true&v5=false"); + Assert.assertTrue(DebugPointUtil.isEnable("dbug5")); + DebugPoint debugPoint = DebugPointUtil.getDebugPoint("dbug5"); + Assert.assertNotNull(debugPoint); + Assert.assertEquals(1, (int) debugPoint.param("v1", 0)); + Assert.assertEquals("a", debugPoint.param("v2", "")); + Assert.assertEquals(1.2, debugPoint.param("v3", 0.0), 1e-6); + Assert.assertTrue(debugPoint.param("v4", false)); + Assert.assertFalse(debugPoint.param("v5", false)); + Assert.assertEquals(123L, (long) debugPoint.param("v_no_exist", 123L)); } private void sendRequest(String uri) throws Exception { diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java index ef4a58e534..29aa558860 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java @@ -90,7 +90,7 @@ public class ResourceTagQueryTest { public static void beforeClass() throws Exception { System.out.println(runningDir); FeConstants.runningUnitTest = true; - FeConstants.tablet_checker_interval_ms = 1000; + Config.tablet_checker_interval_ms = 1000; Config.tablet_repair_delay_factor_second = 1; // 5 backends: // 127.0.0.1 diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java index 10f94adeb8..af9b98f126 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java @@ -82,8 +82,8 @@ public class DemoMultiBackendsTest { public static void beforeClass() throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException, InterruptedException { FeConstants.runningUnitTest = true; - FeConstants.tablet_checker_interval_ms = 1000; FeConstants.default_scheduler_interval_millisecond = 100; + Config.tablet_checker_interval_ms = 1000; Config.tablet_repair_delay_factor_second = 1; UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 3); diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index b5114f9dce..e21e513718 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -47,6 +47,24 @@ pluginPath = "${DORIS_HOME}/regression-test/plugins" realDataPath = "${DORIS_HOME}/regression-test/realdata" sslCertificatePath = "${DORIS_HOME}/regression-test/ssl_default_certificate" +// suite configs +suites = { + + //// equals to: + //// suites.test_suite_1.key1 = "val1" + //// suites.test_suite_1.key2 = "val2" + //// + //test_suite_1 { + // key1 = "val1" + // key2 = "val2" + //} + + //test_suite_2 { + // key3 = "val1" + // key4 = "val2" + //} +} + // docker image image = "" dockerEndDeleteFiles = false diff --git a/regression-test/data/load_p0/insert/test_min_load_replica_num_simple.out b/regression-test/data/load_p0/insert/test_min_load_replica_num_simple.out new file mode 100644 index 0000000000..fc0bd7f8e7 --- /dev/null +++ b/regression-test/data/load_p0/insert/test_min_load_replica_num_simple.out @@ -0,0 +1,33 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_1 -- +1 a +2 b +3 c +4 e + +-- !select_2 -- +1 a +2 b +3 c +4 e +7 h +8 i + +-- !select_3 -- +1 a +2 b +3 c +4 e +7 h +8 i + +-- !select_4 -- +1 a +2 b +3 c +4 e +7 h +8 i +11 l +12 m + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/ConfigOptions.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/ConfigOptions.groovy index 7ad0720bd1..36648dbe61 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/ConfigOptions.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/ConfigOptions.groovy @@ -171,7 +171,7 @@ class ConfigOptions { .type(String.class) .longOpt("sslCertificatePath") .desc("the sslCertificate path") - .build() + .build() imageOpt = Option.builder("image") .argName("image") diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 23ae47aa38..f7460d95be 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -82,6 +82,10 @@ class Suite implements GroovyInterceptable { return value == null ? defaultValue : value } + String getSuiteConf(String key, String defaultValue = null) { + return getConf("suites." + name + "." + key, defaultValue) + } + Properties getConfs(String prefix) { Properties p = new Properties() for (String name : context.config.otherConfigs.stringPropertyNames()) { @@ -154,11 +158,20 @@ class Suite implements GroovyInterceptable { } public ListenableFuture thread(String threadName = null, Closure actionSupplier) { + def connInfo = context.threadLocalConn.get() return MoreExecutors.listeningDecorator(context.actionExecutors).submit((Callable) { long startTime = System.currentTimeMillis() def originThreadName = Thread.currentThread().name try { Thread.currentThread().setName(threadName == null ? originThreadName : threadName) + if (connInfo != null) { + def newConnInfo = new ConnectionInfo() + newConnInfo.conn = DriverManager.getConnection(connInfo.conn.getMetaData().getURL(), + connInfo.username, connInfo.password) + newConnInfo.username = connInfo.username + newConnInfo.password = connInfo.password + context.threadLocalConn.set(newConnInfo) + } context.scriptContext.eventListeners.each { it.onThreadStarted(context) } return actionSupplier.call() diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 85dbea17b5..f6f5002365 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -14,10 +14,10 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - package org.apache.doris.regression.suite import org.apache.doris.regression.Config +import org.apache.doris.regression.util.Http import com.google.common.collect.Maps import org.slf4j.Logger @@ -29,15 +29,23 @@ import groovy.util.logging.Slf4j import java.util.stream.Collectors class ClusterOptions { + int feNum = 1 int beNum = 3 int beDiskNum = 1 - List feConfigs - List beConfigs + List feConfigs = [] + List beConfigs = [] + + void enableDebugPoints() { + feConfigs.add('enable_debug_points=true') + beConfigs.add('enable_debug_points=true') + } + } class ListHeader { - Map fields; + + Map fields ListHeader(List fieldList) { this.fields = Maps.newHashMap() @@ -48,50 +56,115 @@ class ListHeader { int indexOf(String field) { def index = fields.get(field) - assert index != null : "Not found field: " + field + assert index != null : 'Not found field: ' + field return index } + } -class Frontend { +enum NodeType { + + FE, + BE, + +} + +class ServerNode { + int index String host - int queryPort + int httpPort boolean alive + + static void fromCompose(ServerNode node, ListHeader header, int index, List fields) { + node.index = index + node.host = (String) fields.get(header.indexOf('IP')) + node.httpPort = (Integer) fields.get(header.indexOf('http_port')) + node.alive = fields.get(header.indexOf('alive')) == 'true' + } + + String getHttpAddress() { + return 'http://' + host + ':' + httpPort + } + + void enableDebugPoint(String name, Map params = null) { + def url = getHttpAddress() + '/api/debug_point/add/' + name + if (params != null && params.size() > 0) { + url += '?' + params.collect((k, v) -> k + '=' + v).join('&') + } + def result = Http.http_post(url, null, true) + checkHttpResult(result) + } + + void disableDebugPoint(String name) { + def url = getHttpAddress() + '/api/debug_point/remove/' + name + def result = Http.http_post(url, null, true) + checkHttpResult(result) + } + + void clearDebugPoints() { + def url = getHttpAddress() + '/api/debug_point/clear' + def result = Http.http_post(url, null, true) + checkHttpResult(result) + } + + private void checkHttpResult(Object result) { + def type = getNodeType() + if (type == NodeType.FE) { + assert result.code == 0 : result.toString() + } else if (type == NodeType.BE) { + assert result.status == 'OK' : result.toString() + } + } + + NodeType getNodeType() { + assert false : 'Unknown node type' + } + +} + +class Frontend extends ServerNode { + + int queryPort boolean isMaster static Frontend fromCompose(ListHeader header, int index, List fields) { Frontend fe = new Frontend() - fe.index = index - fe.host = (String) fields.get(header.indexOf("IP")) - fe.queryPort = (Integer) fields.get(header.indexOf("query_port")) - fe.alive = fields.get(header.indexOf("alive")) == "true" - fe.isMaster = fields.get(header.indexOf("is_master")) == "true" + ServerNode.fromCompose(fe, header, index, fields) + fe.queryPort = (Integer) fields.get(header.indexOf('query_port')) + fe.isMaster = fields.get(header.indexOf('is_master')) == 'true' return fe } + + NodeType getNodeType() { + return NodeType.FE + } + } -class Backend { - int index +class Backend extends ServerNode { + long backendId - String host - boolean alive int tabletNum static Backend fromCompose(ListHeader header, int index, List fields) { Backend be = new Backend() - be.index = index - be.backendId = (long) fields.get(header.indexOf("backend_id")) - be.host = (String) fields.get(header.indexOf("IP")) - be.alive = fields.get(header.indexOf("alive")) == "true" - be.tabletNum = (int) fields.get(header.indexOf("tablet_num")) + ServerNode.fromCompose(be, header, index, fields) + be.backendId = (long) fields.get(header.indexOf('backend_id')) + be.tabletNum = (int) fields.get(header.indexOf('tablet_num')) return be } + + NodeType getNodeType() { + return NodeType.BE + } + } @Slf4j @CompileStatic class SuiteCluster { + static final Logger logger = LoggerFactory.getLogger(this.class) final String name @@ -106,40 +179,69 @@ class SuiteCluster { void init(ClusterOptions options) { if (inited) { - return; + return } - assert name != null && name != "" + assert name != null && name != '' assert options.feNum > 0 || options.beNum > 0 - assert config.image != null && config.image != "" + assert config.image != null && config.image != '' def sb = new StringBuilder() - sb.append("up " + name + " ") - sb.append(config.image + " ") + sb.append('up ' + name + ' ') + sb.append(config.image + ' ') if (options.feNum > 0) { - sb.append("--add-fe-num " + options.feNum + " ") + sb.append('--add-fe-num ' + options.feNum + ' ') } if (options.beNum > 0) { - sb.append("--add-be-num " + options.beNum + " ") + sb.append('--add-be-num ' + options.beNum + ' ') } // TODO: need escape white space in config if (options.feConfigs != null && options.feConfigs.size() > 0) { - sb.append("--fe-config ") - options.feConfigs.forEach(item -> sb.append(" " + item + " ")) + sb.append('--fe-config ') + options.feConfigs.forEach(item -> sb.append(' ' + item + ' ')) } if (options.beConfigs != null && options.beConfigs.size() > 0) { - sb.append("--be-config ") - options.beConfigs.forEach(item -> sb.append(" " + item + " ")) + sb.append('--be-config ') + options.beConfigs.forEach(item -> sb.append(' ' + item + ' ')) } - sb.append("--be-disk-num " + options.beDiskNum + " ") - sb.append("--wait-timeout 180") + sb.append('--be-disk-num ' + options.beDiskNum + ' ') + sb.append('--wait-timeout 180') runCmd(sb.toString(), -1) // wait be report disk Thread.sleep(5000) - inited = true; + inited = true + } + + void injectDebugPoints(NodeType type, Map> injectPoints) { + if (injectPoints == null || injectPoints.isEmpty()) { + return + } + + List servers = [] + if (type == NodeType.FE) { + servers.addAll(getFrontends()) + } else if (type == NodeType.BE) { + servers.addAll(getBackends()) + } else { + throw new Exception('Unknown node type: ' + type) + } + + servers.each { server -> + injectPoints.each { name, params -> + server.enableDebugPoint(name, params) + } + } + } + + void clearFrontendDebugPoints() { + getFrontends().each { it.clearDebugPoints() } + } + + void clearBackendDebugPoints() { + getBackends().each { it.clearDebugPoints() } } Frontend getMasterFe() { @@ -181,32 +283,32 @@ class SuiteCluster { } private void getAllNodes(List frontends, List backends) { - def cmd = "ls " + name + " --detail" + def cmd = 'ls ' + name + ' --detail' def data = runCmd(cmd) assert data instanceof List def rows = (List>) data def header = new ListHeader(rows.get(0)) for (int i = 1; i < rows.size(); i++) { def row = (List) rows.get(i) - def name = (String) row.get(header.indexOf("NAME")) - if (name.startsWith("be-")) { - int index = name.substring("be-".length()) as int + def name = (String) row.get(header.indexOf('NAME')) + if (name.startsWith('be-')) { + int index = name.substring('be-'.length()) as int backends.add(Backend.fromCompose(header, index, row)) - } else if (name.startsWith("fe-")) { - int index = name.substring("fe-".length()) as int + } else if (name.startsWith('fe-')) { + int index = name.substring('fe-'.length()) as int frontends.add(Frontend.fromCompose(header, index, row)) } else { - assert false : "Unknown node type with name: " + name + assert false : 'Unknown node type with name: ' + name } } } - List addFe(int num) throws Exception { + List addFrontend(int num) throws Exception { def result = add(num, 0) return result.first } - List addBe(int num) throws Exception { + List addBackend(int num) throws Exception { def result = add(0, num) return result.second } @@ -215,18 +317,18 @@ class SuiteCluster { assert feNum > 0 || beNum > 0 def sb = new StringBuilder() - sb.append("up " + name + " ") + sb.append('up ' + name + ' ') if (feNum > 0) { - sb.append("--add-fe-num " + feNum + " ") + sb.append('--add-fe-num ' + feNum + ' ') } if (beNum > 0) { - sb.append("--add-be-num " + beNum + " ") + sb.append('--add-be-num ' + beNum + ' ') } - sb.append("--wait-timeout 60") + sb.append('--wait-timeout 60') def data = (Map>) runCmd(sb.toString(), -1) - def newFrontends = (List) data.get("fe").get("add_list") - def newBackends = (List) data.get("be").get("add_list") + def newFrontends = (List) data.get('fe').get('add_list') + def newBackends = (List) data.get('be').get('add_list') // wait be report disk Thread.sleep(5000) @@ -235,9 +337,9 @@ class SuiteCluster { } void destroy(boolean clean) throws Exception { - def cmd = "down " + name + def cmd = 'down ' + name if (clean) { - cmd += " --clean" + cmd += ' --clean' } runCmd(cmd) inited = false @@ -245,99 +347,120 @@ class SuiteCluster { // if not specific fe indices, then start all frontends void startFrontends(int... indices) { - runFrontendsCmd("start", indices) + runFrontendsCmd('start', indices) + waitHbChanged() } // if not specific be indices, then start all backends void startBackends(int... indices) { - runBackendsCmd("start", indices) + runBackendsCmd('start', indices) + waitHbChanged() } // if not specific fe indices, then stop all frontends void stopFrontends(int... indices) { - runFrontendsCmd("stop", indices) + runFrontendsCmd('stop', indices) + waitHbChanged() } // if not specific be indices, then stop all backends void stopBackends(int... indices) { - runBackendsCmd("stop", indices) + runBackendsCmd('stop', indices) + waitHbChanged() } // if not specific fe indices, then restart all frontends void restartFrontends(int... indices) { - runFrontendsCmd("restart", indices) + runFrontendsCmd('restart', indices) + waitHbChanged() } // if not specific be indices, then restart all backends void restartBackends(int... indices) { - runBackendsCmd("restart", indices) + runBackendsCmd('restart', indices) + waitHbChanged() } // if not specific fe indices, then drop all frontends - void dropFrontends(int... indices) { - runFrontendsCmd("down", indices) + void dropFrontends(boolean clean=false, int... indices) { + def cmd = 'down' + if (clean) { + cmd += ' --clean' + } + runFrontendsCmd(cmd, indices) } // if not specific be indices, then decommission all backends - void decommissionBackends(int... indices) { - runBackendsCmdWithTimeout(300, "down", indices) + void decommissionBackends(boolean clean=false, int... indices) { + def cmd = 'down' + if (clean) { + cmd += ' --clean' + } + runBackendsCmd(300, cmd, indices) } // if not specific be indices, then drop force all backends - void dropForceBackends(int... indices) { - runBackendsCmd("down --drop-force", indices) + void dropForceBackends(boolean clean=false, int... indices) { + def cmd = 'down --drop-force' + if (clean) { + cmd += ' --clean' + } + runBackendsCmd(cmd, indices) } void checkFeIsAlive(int index, boolean isAlive) { def fe = getFeByIndex(index) - assert fe != null : "frontend with index " + index + " not exists!" - assert fe.alive == isAlive : (fe.alive ? "frontend with index " + index + " still alive" - : "frontend with index " + index + " dead") + assert fe != null : 'frontend with index ' + index + ' not exists!' + assert fe.alive == isAlive : (fe.alive ? 'frontend with index ' + index + ' still alive' + : 'frontend with index ' + index + ' dead') } void checkBeIsAlive(int index, boolean isAlive) { def be = getBeByIndex(index) - assert be != null : "backend with index " + index + " not exists!" - assert be.alive == isAlive : (be.alive ? "backend with index " + index + " still alive" - : "backend with index " + index + " dead") + assert be != null : 'backend with index ' + index + ' not exists!' + assert be.alive == isAlive : (be.alive ? 'backend with index ' + index + ' still alive' + : 'backend with index ' + index + ' dead') } void checkFeIsExists(int index, boolean isExists) { def fe = getFeByIndex(index) if (isExists) { - assert fe != null : "frontend with index " + index + " not exists!" + assert fe != null : 'frontend with index ' + index + ' not exists!' } else { - assert fe == null : "frontend with index " + index + " exists!" + assert fe == null : 'frontend with index ' + index + ' exists!' } } void checkBeIsExists(int index, boolean isExists) { def be = getBeByIndex(index) if (isExists) { - assert be != null : "backend with index " + index + " not exists!" + assert be != null : 'backend with index ' + index + ' not exists!' } else { - assert be == null : "backend with index " + index + " exists!" + assert be == null : 'backend with index ' + index + ' exists!' } } + private void waitHbChanged() { + Thread.sleep(6000) + } + private void runFrontendsCmd(String op, int... indices) { - def cmd = op + " " + name + " --fe-id " + indices.join(" ") + def cmd = op + ' ' + name + ' --fe-id ' + indices.join(' ') runCmd(cmd) } - private void runBackendsCmd(String op, int... indices) { - def cmd = op + " " + name + " --be-id " + indices.join(" ") - runCmd(cmd) - } - - private void runBackendsCmdWithTimeout(int timeoutSecond, String op, int... indices) { - def cmd = op + " " + name + " --be-id " + indices.join(" ") - runCmd(cmd, timeoutSecond) + private void runBackendsCmd(Integer timeoutSecond = null, String op, int... indices) { + def cmd = op + ' ' + name + ' --be-id ' + indices.join(' ') + if (timeoutSecond == null) { + runCmd(cmd) + } else { + runCmd(cmd, timeoutSecond) + } } private Object runCmd(String cmd, int timeoutSecond = 60) throws Exception { - def fullCmd = String.format("python %s %s --output-json", config.dorisComposePath, cmd) - logger.info("Run doris compose cmd: {}", fullCmd) + def fullCmd = String.format('python %s %s --output-json', config.dorisComposePath, cmd) + logger.info('Run doris compose cmd: {}', fullCmd) def proc = fullCmd.execute() def outBuf = new StringBuilder() def errBuf = new StringBuilder() @@ -350,14 +473,15 @@ class SuiteCluster { def out = outBuf.toString() def err = errBuf.toString() if (proc.exitValue()) { - throw new Exception(String.format("Exit value: %s != 0, stdout: %s, stderr: %s", + throw new Exception(String.format('Exit value: %s != 0, stdout: %s, stderr: %s', proc.exitValue(), out, err)) } def parser = new JsonSlurper() def object = (Map) parser.parseText(out) - if (object.get("code") != 0) { - throw new Exception(String.format("Code: %s != 0, err: %s", object.get("code"), object.get("err"))) + if (object.get('code') != 0) { + throw new Exception(String.format('Code: %s != 0, err: %s', object.get('code'), object.get('err'))) } - return object.get("data") + return object.get('data') } + } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy index 9cc21faae0..f0f5cce3f7 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy @@ -29,6 +29,12 @@ import java.sql.DriverManager import java.util.concurrent.ExecutorService import java.util.function.Function +class ConnectionInfo { + Connection conn + String username + String password +} + @Slf4j @CompileStatic class SuiteContext implements Closeable { @@ -36,7 +42,7 @@ class SuiteContext implements Closeable { public final String suiteName public final String group public final String dbName - public final ThreadLocal threadLocalConn = new ThreadLocal<>() + public final ThreadLocal threadLocalConn = new ThreadLocal<>() public final ThreadLocal threadHiveDockerConn = new ThreadLocal<>() public final ThreadLocal threadHiveRemoteConn = new ThreadLocal<>() private final ThreadLocal syncer = new ThreadLocal<>() @@ -121,12 +127,15 @@ class SuiteContext implements Closeable { } Connection getConnection() { - def threadConn = threadLocalConn.get() - if (threadConn == null) { - threadConn = config.getConnectionByDbName(dbName) - threadLocalConn.set(threadConn) + def threadConnInfo = threadLocalConn.get() + if (threadConnInfo == null) { + threadConnInfo = new ConnectionInfo() + threadConnInfo.conn = config.getConnectionByDbName(dbName) + threadConnInfo.username = config.jdbcUser + threadConnInfo.password = config.jdbcPassword + threadLocalConn.set(threadConnInfo) } - return threadConn + return threadConnInfo.conn } Connection getHiveDockerConnection(){ @@ -213,7 +222,11 @@ class SuiteContext implements Closeable { try { log.info("Create new connection for user '${user}'") return DriverManager.getConnection(url, user, password).withCloseable { newConn -> - threadLocalConn.set(newConn) + def newConnInfo = new ConnectionInfo() + newConnInfo.conn = newConn + newConnInfo.username = user + newConnInfo.password = password + threadLocalConn.set(newConnInfo) return actionSupplier.call() } } finally { @@ -312,11 +325,11 @@ class SuiteContext implements Closeable { } } - Connection conn = threadLocalConn.get() + ConnectionInfo conn = threadLocalConn.get() if (conn != null) { threadLocalConn.remove() try { - conn.close() + conn.conn.close() } catch (Throwable t) { log.warn("Close connection failed", t) } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy new file mode 100644 index 0000000000..1daabf469b --- /dev/null +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// This file is copy from selectdb-core. +package org.apache.doris.regression.util + +import org.apache.http.client.methods.CloseableHttpResponse +import org.apache.http.client.methods.HttpPost +import org.apache.http.entity.StringEntity +import org.apache.http.impl.client.CloseableHttpClient +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils +import org.junit.Assert +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import groovy.json.JsonOutput +import groovy.json.JsonSlurper +import groovy.util.logging.Slf4j + +import java.nio.charset.Charset + +@Slf4j +class Http { + + final static Logger logger = LoggerFactory.getLogger(this.class) + + static Object http_post(url, data = null, isJson = false) { + def conn = new URL(url).openConnection() + conn.setRequestMethod('POST') + conn.setRequestProperty('Authorization', 'Basic cm9vdDo=') //token for root + if (data) { + if (isJson) { + conn.setRequestProperty('Content-Type', 'application/json') + data = JsonOutput.toJson(data) + } + // Output request parameters + conn.doOutput = true + def writer = new OutputStreamWriter(conn.outputStream) + writer.write(data) + writer.flush() + writer.close() + } + def code = conn.responseCode + def text = conn.content.text + logger.info("http post url=${url}, data=${data}, isJson=${isJson}, response code=${code}, text=${text}") + Assert.assertEquals(200, code) + if (isJson) { + def json = new JsonSlurper() + def result = json.parseText(text) + return result + } else { + return text + } + } + + public static String httpJson(int ms, String url, String json) throws Exception{ + String err = '00', line = null + StringBuilder sb = new StringBuilder() + HttpURLConnection conn = null + BufferedWriter out = null + BufferedReader inB = null + try { + conn = (HttpURLConnection) (new URL(url.replaceAll('/','/'))).openConnection() + conn.setRequestMethod('POST') + conn.setDoOutput(true) + conn.setDoInput(true) + conn.setUseCaches(false) + conn.setConnectTimeout(ms) + conn.setReadTimeout(ms) + conn.setRequestProperty('Content-Type', 'application/json;charset=utf-8') + conn.connect() + out = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(), 'utf-8')) + out.write(new String(json.getBytes(), 'utf-8')) + out.flush() + int code = conn.getResponseCode() + if (conn.getResponseCode() == 200) { + inB = new BufferedReader(new InputStreamReader(conn.getInputStream(), 'UTF-8')) + while ((line = inB.readLine()) != null) { + sb.append(line) + } + } + } catch (Exception ex) { + err = ex.getMessage() + } + try { if (out != null) { out.close() } } catch (Exception ex) { } + try { if (inB != null) { inB.close() } } catch (Exception ex) { } + try { if (conn != null) { conn.disconnect() } } catch (Exception ex) { } + if (!err.equals('00')) { throw new Exception(err) } + return sb.toString() + } + + public static String sendPost(String url, String param) { + OutputStreamWriter out = null + BufferedReader inB = null + StringBuilder result = new StringBuilder('') + try { + URL realUrl = new URL(url) + URLConnection conn = realUrl.openConnection() + conn.setRequestProperty('Content-Type', 'application/json;charset=UTF-8') + conn.setRequestProperty('accept', '*/*') + conn.setDoOutput(true) + conn.setDoInput(true) + out = new OutputStreamWriter(conn.getOutputStream(), 'UTF-8') + out.write(param) + out.flush() + inB = new BufferedReader(new InputStreamReader(conn.getInputStream(), 'UTF-8')) + String line + while ((line = inB.readLine()) != null) { + result.append(line) + } + } catch (Exception e) { + System.out.println('post exception' + e) + e.printStackTrace() + } finally { + if (out != null) { try { out.close() } catch (Exception ex) { } } + if (inB != null) { try { inB.close() } catch (Exception ex) { } } + } + return result.toString() + } + + public static String httpPostJson(String url, String json) throws Exception{ + String data = '' + CloseableHttpClient httpClient = null + CloseableHttpResponse response = null + try { + httpClient = HttpClients.createDefault() + HttpPost httppost = new HttpPost(url) + httppost.setHeader('Content-Type', 'application/json;charset=UTF-8') + StringEntity se = new StringEntity(json, Charset.forName('UTF-8')) + se.setContentType('text/json') + se.setContentEncoding('UTF-8') + httppost.setEntity(se) + response = httpClient.execute(httppost) + int code = response.getStatusLine().getStatusCode() + System.out.println('res statusCode:' + code) + data = EntityUtils.toString(response.getEntity(), 'utf-8') + EntityUtils.consume(response.getEntity()) + } catch (Exception e) { + e.printStackTrace() + } finally { + if (response != null) { try { response.close() } catch (IOException e) { } } + if (httpClient != null) { try { httpClient.close() } catch (IOException e) { } } + } + return data + } + +} diff --git a/regression-test/suites/chaos/test_docker_example.groovy b/regression-test/suites/demo_p0/docker_action.groovy similarity index 71% rename from regression-test/suites/chaos/test_docker_example.groovy rename to regression-test/suites/demo_p0/docker_action.groovy index 5cb7238607..0dbb818e98 100644 --- a/regression-test/suites/chaos/test_docker_example.groovy +++ b/regression-test/suites/demo_p0/docker_action.groovy @@ -17,37 +17,35 @@ import org.apache.doris.regression.suite.ClusterOptions -suite("test_docker_example") { +suite('docker_action') { docker { - sql """create table tb1 (k int) DISTRIBUTED BY HASH(k) BUCKETS 10""" - sql """insert into tb1 values (1),(2),(3)""" + sql '''create table tb1 (k int) DISTRIBUTED BY HASH(k) BUCKETS 10''' + sql '''insert into tb1 values (1),(2),(3)''' cluster.checkBeIsAlive(2, true) // stop backend 2, 3 cluster.stopBackends(2, 3) - // wait be lost heartbeat - Thread.sleep(6000) cluster.checkBeIsAlive(2, false) test { - sql """insert into tb1 values (4),(5),(6)""" + sql '''insert into tb1 values (4),(5),(6)''' // REPLICA_FEW_ERR - exception "errCode = 3," + exception 'errCode = 3,' } } // run another docker def options = new ClusterOptions() // add fe config items - options.feConfigs = ["example_conf_k1=v1", "example_conf_k2=v2"] + options.feConfigs = ['example_conf_k1=v1', 'example_conf_k2=v2'] // contains 5 backends options.beNum = 5 // each backend has 3 disks options.beDiskNum = 3 - docker (options) { - sql """create table tb1 (k int) DISTRIBUTED BY HASH(k) BUCKETS 10 properties ("replication_num"="5")""" + docker(options) { + sql '''create table tb1 (k int) DISTRIBUTED BY HASH(k) BUCKETS 10 properties ("replication_num"="5")''' } } diff --git a/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy b/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy new file mode 100644 index 0000000000..81fd2f2661 --- /dev/null +++ b/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.suite.NodeType +import org.apache.doris.regression.suite.SuiteCluster + +class InjectCase { + + boolean writeMayFailed + Map> fePoints + Map> bePoints + + InjectCase(boolean writeMayFailed, Map> fePoints, + Map> bePoints) { + this.writeMayFailed = writeMayFailed + this.fePoints = fePoints + this.bePoints = bePoints + } + +} + +suite('test_min_load_replica_num_complicate') { + def beCloneCostMs = 3000 + + def random = new Random() + def insertThreadNum = getSuiteConf('insertThreadNum', '3') as int + def iterationNum = getSuiteConf('iterationNum', '20') as int + def injectProb = getSuiteConf('injectProb', '50') as int // inject perc = injectProb / 100 + def insertBatchSize = 4 + def bucketNum = 2 + + def run = { int replicaNum, int minLoadNum -> + logger.info("start run test_min_load_replica_num_complicate with replica num ${replicaNum}, " + + "min load replica num ${minLoadNum}, iterationNum ${iterationNum}, injectProb ${injectProb} %") + + def allInjectCases = [ + new InjectCase(true, + ['OlapTableSink.write_random_choose_sink' : [needCatchUp:false, sinkNum: minLoadNum]], + ['EngineCloneTask.wait_clone' : [duration:beCloneCostMs]] + ), + new InjectCase(false, + ['PublishVersionDaemon.stop_publish' : [timeout:10]], + ['EngineCloneTask.wait_clone' : [duration:beCloneCostMs]] + ), + new InjectCase(false, + ['PublishVersionDaemon.not_wait_unfinished_tasks' : null], + ['EngineCloneTask.wait_clone' : [duration:beCloneCostMs]] + ), + new InjectCase(true, + [:], + ['TxnManager.commit_txn_random_failed' : null] + ), + ] + + def options = new ClusterOptions() + options.enableDebugPoints() + options.feConfigs.add('disable_balance=true') + options.feConfigs.add('tablet_checker_interval_ms=1000') + options.beNum = replicaNum + 1 + + def tbl = "test_min_load_replica_num_complicate_tbl_replica_num_${replicaNum}_min_load_num_${minLoadNum}" + tbl = tbl.replaceAll('-', 'm') + + docker(options) { + sql """ + CREATE TABLE ${tbl} + ( + k1 int + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS ${bucketNum} + PROPERTIES ( + "replication_num" = "${replicaNum}", + "min_load_replica_num" = "${minLoadNum}" + ); + """ + + // wait visible 10 min + sql "SET GLOBAL insert_visible_timeout_ms = ${1000 * 60 * 10}" + + def rowIndex = 0 + def getNextBatch = { + def rows = [] + for (def k = 0; k < insertBatchSize; k++) { + rows.add('(' + rowIndex + ')') + rowIndex++ + } + return rows.join(', ') + } + + def insertSuccNum = 0 + for (def i = 0; i < iterationNum; i++) { + def futures = [] + boolean writeMayFailed = false + if (random.nextInt(100) < injectProb) { + sql '''admin set frontend config ("disable_tablet_scheduler" = "true")''' + + def originBackends = cluster.getBackends() + cluster.addBackend(1) + + def injectCase = allInjectCases[random.nextInt(allInjectCases.size())] + writeMayFailed = injectCase.writeMayFailed + cluster.injectDebugPoints(NodeType.FE, injectCase.fePoints) + cluster.injectDebugPoints(NodeType.BE, injectCase.bePoints) + + futures.add(thread { + sql '''admin set frontend config ("disable_tablet_scheduler" = "false")''' + cluster.decommissionBackends(clean = true, originBackends.get(0).index) + cluster.clearFrontendDebugPoints() + cluster.clearBackendDebugPoints() + + return false + }) + } + + for (def j = 0; j < insertThreadNum; j++) { + def values = getNextBatch() + futures.add(thread { + try { + sql "INSERT INTO ${tbl} VALUES ${values}" + } catch (Exception e) { + if (!writeMayFailed) { + throw e + } + return false + } + return true + }) + } + + futures.each { if (it.get()) { insertSuccNum++ } } + } + + sql """ ALTER TABLE ${tbl} SET ( "min_load_replica_num" = "-1" ) """ + + for (def i = 0; i < 5; i++) { + sql "INSERT INTO ${tbl} VALUES ${getNextBatch()}" + insertSuccNum++ + } + + def rowNum = insertSuccNum * insertBatchSize + for (def i = 0; i < replicaNum; i++) { + sql "set use_fix_replica = ${i}" + def result = sql "select count(*) from ${tbl}" + assertEquals(rowNum, result[0][0] as int) + } + + sql 'set use_fix_replica = -1' + def versionNum = insertSuccNum + 1 + def result = sql "show tablets from ${tbl}" + assertEquals(bucketNum * replicaNum, result.size()) + for (def tablet : result) { + assertEquals(versionNum, tablet[5] as int) + } + } + } + + run(2, 1) + run(2, -1) + run(3, 1) + run(3, -1) +} diff --git a/regression-test/suites/load_p0/insert/test_min_load_replica_num_simple.groovy b/regression-test/suites/load_p0/insert/test_min_load_replica_num_simple.groovy new file mode 100644 index 0000000000..75d7155c3d --- /dev/null +++ b/regression-test/suites/load_p0/insert/test_min_load_replica_num_simple.groovy @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + +suite('test_min_load_replica_num_simple') { + def options = new ClusterOptions() + options.feConfigs.add('tablet_checker_interval_ms=1000') + docker(options) { + def tbl = 'test_min_load_replica_num_simple_tbl' + + sql """ DROP TABLE IF EXISTS ${tbl} """ + + sql """ + CREATE TABLE ${tbl} ( + `k1` int(11) NULL, + `k2` char(5) NULL + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ( + "replication_num"="3" + ); + """ + + sql """ + INSERT INTO ${tbl} (k1, k2) + VALUES (1, "a"), (2, "b"), (3, "c"), (4, "e"); + """ + + qt_select_1 """ SELECT * from ${tbl} ORDER BY k1; """ + + cluster.stopBackends(2, 3) + cluster.checkBeIsAlive(2, false) + cluster.checkBeIsAlive(3, false) + + def be1 = cluster.getBeByIndex(1) + assert be1 != null + assert be1.alive + + test { + sql """ INSERT INTO ${tbl} (k1, k2) VALUES (5, "f"), (6, "g") """ + + // REPLICA_FEW_ERR + exception 'errCode = 3,' + } + + sql """ ALTER TABLE ${tbl} SET ( "min_load_replica_num" = "1" ) """ + + sql """ INSERT INTO ${tbl} (k1, k2) VALUES (7, "h"), (8, "i") """ + + qt_select_2 """ SELECT * from ${tbl} ORDER BY k1; """ + + sql """ ALTER TABLE ${tbl} SET ( "min_load_replica_num" = "-1" ) """ + + test { + sql """ INSERT INTO ${tbl} (k1, k2) VALUES (9, "j"), (10, "k") """ + + // REPLICA_FEW_ERR + exception 'errCode = 3,' + } + + qt_select_3 """ SELECT * from ${tbl} ORDER BY k1; """ + + cluster.startBackends(2, 3) + cluster.checkBeIsAlive(2, true) + cluster.checkBeIsAlive(3, true) + + sql """ ADMIN REPAIR TABLE ${tbl} """ + + // wait clone finish + sleep(5000) + + cluster.stopBackends(1) + cluster.checkBeIsAlive(1, false) + + sql """ INSERT INTO ${tbl} (k1, k2) VALUES (11, "l"), (12, "m") """ + + qt_select_4 """ SELECT * from ${tbl} ORDER BY k1; """ + + sql """ DROP TABLE IF EXISTS ${tbl} """ + } +}