diff --git a/be/src/common/status.h b/be/src/common/status.h index 295e21b77c..acb15eeb70 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -13,6 +13,7 @@ #include "common/compiler_util.h" #include "gen_cpp/Status_types.h" // for TStatus +#include "service/backend_options.h" #ifdef ENABLE_STACKTRACE #include "util/stack_util.h" #endif @@ -299,6 +300,7 @@ public: if (rhs._err_msg) { _err_msg = std::make_unique(*rhs._err_msg); } + _be_ip = rhs._be_ip; return *this; } @@ -477,10 +479,13 @@ private: #endif }; std::unique_ptr _err_msg; + std::string _be_ip = BackendOptions::get_localhost(); }; inline std::ostream& operator<<(std::ostream& ostr, const Status& status) { - ostr << '[' << status.code_as_string() << ']' << (status._err_msg ? status._err_msg->_msg : ""); + ostr << '[' << status.code_as_string() << ']'; + ostr << '[' << status._be_ip << ']'; + ostr << (status._err_msg ? status._err_msg->_msg : ""); #ifdef ENABLE_STACKTRACE if (status._err_msg && !status._err_msg->_stack.empty()) { ostr << '\n' << status._err_msg->_stack; diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index d623a8b245..8f36600298 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -230,10 +230,8 @@ Path HdfsFileSystem::_covert_path(const Path& path) const { int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64; Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS* fs) { - HDFSCommonBuilder builder = createHDFSBuilder(hdfs_params); - if (builder.is_need_kinit()) { - RETURN_IF_ERROR(builder.run_kinit()); - } + HDFSCommonBuilder builder; + RETURN_IF_ERROR(createHDFSBuilder(hdfs_params, &builder)); hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get()); if (hdfs_fs == nullptr) { return Status::InternalError("connect to hdfs failed. error: {}", hdfsGetLastError()); diff --git a/be/src/io/hdfs_builder.cpp b/be/src/io/hdfs_builder.cpp index bb58b3c11e..b08b973860 100644 --- a/be/src/io/hdfs_builder.cpp +++ b/be/src/io/hdfs_builder.cpp @@ -28,6 +28,16 @@ #include "util/url_coding.h" namespace doris { +Status HDFSCommonBuilder::init_hdfs_builder() { + hdfs_builder = hdfsNewBuilder(); + if (hdfs_builder == nullptr) { + LOG(INFO) << "failed to init HDFSCommonBuilder, please check check be/conf/hdfs-site.xml"; + return Status::InternalError( + "failed to init HDFSCommonBuilder, please check check be/conf/hdfs-site.xml"); + } + return Status::OK(); +} + Status HDFSCommonBuilder::run_kinit() { if (hdfs_kerberos_principal.empty() || hdfs_kerberos_keytab.empty()) { return Status::InvalidArgument("Invalid hdfs_kerberos_principal or hdfs_kerberos_keytab"); @@ -79,36 +89,41 @@ THdfsParams parse_properties(const std::map& propertie return hdfsParams; } -HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams) { - HDFSCommonBuilder builder; - hdfsBuilderSetNameNode(builder.get(), hdfsParams.fs_name.c_str()); +Status createHDFSBuilder(const THdfsParams& hdfsParams, HDFSCommonBuilder* builder) { + RETURN_IF_ERROR(builder->init_hdfs_builder()); + hdfsBuilderSetNameNode(builder->get(), hdfsParams.fs_name.c_str()); // set hdfs user if (hdfsParams.__isset.user) { - hdfsBuilderSetUserName(builder.get(), hdfsParams.user.c_str()); + hdfsBuilderSetUserName(builder->get(), hdfsParams.user.c_str()); } // set kerberos conf if (hdfsParams.__isset.hdfs_kerberos_principal) { - builder.need_kinit = true; - builder.hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal; - hdfsBuilderSetPrincipal(builder.get(), hdfsParams.hdfs_kerberos_principal.c_str()); + builder->need_kinit = true; + builder->hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal; + hdfsBuilderSetPrincipal(builder->get(), hdfsParams.hdfs_kerberos_principal.c_str()); } if (hdfsParams.__isset.hdfs_kerberos_keytab) { - builder.need_kinit = true; - builder.hdfs_kerberos_keytab = hdfsParams.hdfs_kerberos_keytab; + builder->need_kinit = true; + builder->hdfs_kerberos_keytab = hdfsParams.hdfs_kerberos_keytab; } // set other conf if (hdfsParams.__isset.hdfs_conf) { for (const THdfsConf& conf : hdfsParams.hdfs_conf) { - hdfsBuilderConfSetStr(builder.get(), conf.key.c_str(), conf.value.c_str()); + hdfsBuilderConfSetStr(builder->get(), conf.key.c_str(), conf.value.c_str()); } } - return builder; + if (builder->is_need_kinit()) { + RETURN_IF_ERROR(builder->run_kinit()); + } + + return Status::OK(); } -HDFSCommonBuilder createHDFSBuilder(const std::map& properties) { +Status createHDFSBuilder(const std::map& properties, + HDFSCommonBuilder* builder) { THdfsParams hdfsParams = parse_properties(properties); - return createHDFSBuilder(hdfsParams); + return createHDFSBuilder(hdfsParams, builder); } } // namespace doris diff --git a/be/src/io/hdfs_builder.h b/be/src/io/hdfs_builder.h index 8f147f5453..ecc08d5a71 100644 --- a/be/src/io/hdfs_builder.h +++ b/be/src/io/hdfs_builder.h @@ -31,13 +31,20 @@ const std::string KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; const std::string TICKET_CACHE_PATH = "/tmp/krb5cc_doris_"; class HDFSCommonBuilder { - friend HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams); - friend HDFSCommonBuilder createHDFSBuilder( - const std::map& properties); + friend Status createHDFSBuilder(const THdfsParams& hdfsParams, HDFSCommonBuilder* builder); + friend Status createHDFSBuilder(const std::map& properties, + HDFSCommonBuilder* builder); public: - HDFSCommonBuilder() : hdfs_builder(hdfsNewBuilder()) {} - ~HDFSCommonBuilder() { hdfsFreeBuilder(hdfs_builder); } + HDFSCommonBuilder() {} + ~HDFSCommonBuilder() { + if (hdfs_builder != nullptr) { + hdfsFreeBuilder(hdfs_builder); + } + } + + // Must call this to init hdfs_builder first. + Status init_hdfs_builder(); hdfsBuilder* get() { return hdfs_builder; } bool is_need_kinit() const { return need_kinit; } @@ -52,7 +59,8 @@ private: THdfsParams parse_properties(const std::map& properties); -HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams); -HDFSCommonBuilder createHDFSBuilder(const std::map& properties); +Status createHDFSBuilder(const THdfsParams& hdfsParams, HDFSCommonBuilder* builder); +Status createHDFSBuilder(const std::map& properties, + HDFSCommonBuilder* builder); } // namespace doris diff --git a/be/src/io/hdfs_writer.cpp b/be/src/io/hdfs_writer.cpp index 46349015c7..e6814438f1 100644 --- a/be/src/io/hdfs_writer.cpp +++ b/be/src/io/hdfs_writer.cpp @@ -26,10 +26,7 @@ namespace doris { HDFSWriter::HDFSWriter(const std::map& properties, const std::string& path) - : _properties(properties), - _path(path), - _hdfs_fs(nullptr), - _builder(createHDFSBuilder(_properties)) { + : _properties(properties), _path(path), _hdfs_fs(nullptr) { _parse_properties(_properties); } @@ -137,10 +134,9 @@ Status HDFSWriter::close() { } Status HDFSWriter::_connect() { - if (_builder.is_need_kinit()) { - RETURN_IF_ERROR(_builder.run_kinit()); - } - _hdfs_fs = hdfsBuilderConnect(_builder.get()); + HDFSCommonBuilder builder; + RETURN_IF_ERROR(createHDFSBuilder(_properties, &builder)); + _hdfs_fs = hdfsBuilderConnect(builder.get()); if (_hdfs_fs == nullptr) { return Status::InternalError("connect to hdfs failed. namenode address:{}, error {}", _namenode, hdfsGetLastError()); diff --git a/be/src/io/hdfs_writer.h b/be/src/io/hdfs_writer.h index ffb38aad67..f5ef32a238 100644 --- a/be/src/io/hdfs_writer.h +++ b/be/src/io/hdfs_writer.h @@ -48,7 +48,6 @@ private: hdfsFS _hdfs_fs = nullptr; hdfsFile _hdfs_file = nullptr; bool _closed = false; - HDFSCommonBuilder _builder; }; } // namespace doris diff --git a/be/src/util/hdfs_storage_backend.cpp b/be/src/util/hdfs_storage_backend.cpp index 30c1b608cc..6f021a4b73 100644 --- a/be/src/util/hdfs_storage_backend.cpp +++ b/be/src/util/hdfs_storage_backend.cpp @@ -37,9 +37,16 @@ namespace doris { static const std::string hdfs_file_prefix = "hdfs://"; HDFSStorageBackend::HDFSStorageBackend(const std::map& prop) - : _properties(prop), _builder(createHDFSBuilder(_properties)) { - _hdfs_fs = HDFSHandle::instance().create_hdfs_fs(_builder); - DCHECK(_hdfs_fs) << "init hdfs client error."; + : _properties(prop) { + HDFSCommonBuilder builder; + Status st = createHDFSBuilder(_properties, &builder); + if (st.ok()) { + _hdfs_fs = HDFSHandle::instance().create_hdfs_fs(builder); + DCHECK(_hdfs_fs) << "init hdfs client error."; + } + // if createHDFSBuilder failed, _hdfs_fs will be null. + // and CHECK_HDFS_CLIENT will return error. + // TODO: refacotr StorageBackend, unify into File system } HDFSStorageBackend::~HDFSStorageBackend() { @@ -325,4 +332,4 @@ Status HDFSStorageBackend::rmdir(const std::string& remote) { return rm(remote); } -} // end namespace doris \ No newline at end of file +} // end namespace doris diff --git a/be/src/util/hdfs_storage_backend.h b/be/src/util/hdfs_storage_backend.h index a80fad486f..acbf18d2d0 100644 --- a/be/src/util/hdfs_storage_backend.h +++ b/be/src/util/hdfs_storage_backend.h @@ -56,8 +56,7 @@ private: private: std::map _properties; - HDFSCommonBuilder _builder; hdfsFS _hdfs_fs = nullptr; }; -} // end namespace doris \ No newline at end of file +} // end namespace doris diff --git a/be/src/util/hdfs_util.cpp b/be/src/util/hdfs_util.cpp index fcfaf2c5fc..b58fc75e46 100644 --- a/be/src/util/hdfs_util.cpp +++ b/be/src/util/hdfs_util.cpp @@ -30,13 +30,6 @@ HDFSHandle& HDFSHandle::instance() { } hdfsFS HDFSHandle::create_hdfs_fs(HDFSCommonBuilder& hdfs_builder) { - if (hdfs_builder.is_need_kinit()) { - Status status = hdfs_builder.run_kinit(); - if (!status.ok()) { - LOG(WARNING) << status; - return nullptr; - } - } hdfsFS hdfs_fs = hdfsBuilderConnect(hdfs_builder.get()); if (hdfs_fs == nullptr) { LOG(WARNING) << "connect to hdfs failed." @@ -46,4 +39,4 @@ hdfsFS HDFSHandle::create_hdfs_fs(HDFSCommonBuilder& hdfs_builder) { return hdfs_fs; } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/test/common/config_test.cpp b/be/test/common/config_test.cpp index 4176fedcfd..bc122dc7f1 100644 --- a/be/test/common/config_test.cpp +++ b/be/test/common/config_test.cpp @@ -99,37 +99,45 @@ TEST_F(ConfigTest, UpdateConfigs) { // not exist Status s = config::set_config("cfg_not_exist", "123"); EXPECT_FALSE(s.ok()); - EXPECT_EQ(s.to_string(), "[NOT_FOUND]'cfg_not_exist' is not found"); + EXPECT_TRUE(s.to_string().find("[NOT_FOUND]") != std::string::npos); + EXPECT_TRUE(s.to_string().find("'cfg_not_exist' is not found") != std::string::npos); // immutable EXPECT_TRUE(cfg_bool_immutable); s = config::set_config("cfg_bool_immutable", "false"); EXPECT_FALSE(s.ok()); - EXPECT_EQ(s.to_string(), - "[NOT_IMPLEMENTED_ERROR]'cfg_bool_immutable' is not support to modify"); + EXPECT_TRUE(s.to_string().find("NOT_IMPLEMENTED_ERROR") != std::string::npos); + EXPECT_TRUE(s.to_string().find("'cfg_bool_immutable' is not support to modify") != + std::string::npos); EXPECT_TRUE(cfg_bool_immutable); // convert error s = config::set_config("cfg_bool", "falseeee"); EXPECT_FALSE(s.ok()); - EXPECT_EQ(s.to_string(), "[INVALID_ARGUMENT]convert 'falseeee' as bool failed"); + EXPECT_TRUE(s.to_string().find("INVALID_ARGUMENT") != std::string::npos); + EXPECT_TRUE(s.to_string().find("convert 'falseeee' as bool failed") != std::string::npos); EXPECT_TRUE(cfg_bool); s = config::set_config("cfg_double", ""); EXPECT_FALSE(s.ok()); - EXPECT_EQ(s.to_string(), "[INVALID_ARGUMENT]convert '' as double failed"); + EXPECT_TRUE(s.to_string().find("INVALID_ARGUMENT") != std::string::npos); + EXPECT_TRUE(s.to_string().find("convert '' as double failed") != std::string::npos); EXPECT_EQ(cfg_double, 654.321); // convert error s = config::set_config("cfg_int32_t", "4294967296124"); EXPECT_FALSE(s.ok()); - EXPECT_EQ(s.to_string(), "[INVALID_ARGUMENT]convert '4294967296124' as int32_t failed"); + EXPECT_TRUE(s.to_string().find("INVALID_ARGUMENT") != std::string::npos); + EXPECT_TRUE(s.to_string().find("convert '4294967296124' as int32_t failed") != + std::string::npos); EXPECT_EQ(cfg_int32_t, 65536124); // not support s = config::set_config("cfg_std_string", "test"); EXPECT_FALSE(s.ok()); - EXPECT_EQ(s.to_string(), "[NOT_IMPLEMENTED_ERROR]'cfg_std_string' is not support to modify"); + EXPECT_TRUE(s.to_string().find("NOT_IMPLEMENTED_ERROR") != std::string::npos); + EXPECT_TRUE(s.to_string().find("'cfg_std_string' is not support to modify") != + std::string::npos); EXPECT_EQ(cfg_std_string, "doris_config_test_string"); } diff --git a/be/test/common/status_test.cpp b/be/test/common/status_test.cpp index ee7cb910b1..f5d9a2f225 100644 --- a/be/test/common/status_test.cpp +++ b/be/test/common/status_test.cpp @@ -29,7 +29,7 @@ TEST_F(StatusTest, OK) { // default Status st; EXPECT_TRUE(st.ok()); - EXPECT_EQ("[OK]", st.to_string()); + EXPECT_TRUE(st.to_string().find("[OK]") != std::string::npos); // copy { Status other = st; @@ -49,22 +49,26 @@ TEST_F(StatusTest, Error) { // default Status st = Status::InternalError("123"); EXPECT_FALSE(st.ok()); - EXPECT_EQ("[INTERNAL_ERROR]123", st.to_string()); + EXPECT_TRUE(st.to_string().find("[INTERNAL_ERROR]") != std::string::npos); + EXPECT_TRUE(st.to_string().find("123") != std::string::npos); // copy { Status other = st; EXPECT_FALSE(other.ok()); - EXPECT_EQ("[INTERNAL_ERROR]123", other.to_string()); + EXPECT_TRUE(other.to_string().find("[INTERNAL_ERROR]") != std::string::npos); + EXPECT_TRUE(other.to_string().find("123") != std::string::npos); } // move assign st = Status::InternalError("456"); EXPECT_FALSE(st.ok()); - EXPECT_EQ("[INTERNAL_ERROR]456", st.to_string()); + EXPECT_TRUE(st.to_string().find("[INTERNAL_ERROR]") != std::string::npos); + EXPECT_TRUE(st.to_string().find("456") != std::string::npos); // move construct { Status other = std::move(st); EXPECT_FALSE(other.ok()); - EXPECT_EQ("[INTERNAL_ERROR]456", other.to_string()); + EXPECT_TRUE(other.to_string().find("[INTERNAL_ERROR]") != std::string::npos); + EXPECT_TRUE(other.to_string().find("456") != std::string::npos); } } diff --git a/bin/start_be.sh b/bin/start_be.sh index f34973af3c..d383c25163 100755 --- a/bin/start_be.sh +++ b/bin/start_be.sh @@ -231,7 +231,9 @@ set_tcmalloc_heap_limit() { # set_tcmalloc_heap_limit || exit 1 ## set hdfs conf -export LIBHDFS3_CONF="${DORIS_HOME}/conf/hdfs-site.xml" +if [[ -f "${DORIS_HOME}/conf/hdfs-site.xml" ]]; then + export LIBHDFS3_CONF="${DORIS_HOME}/conf/hdfs-site.xml" +fi # see https://github.com/jemalloc/jemalloc/issues/2366 export JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:30000,dirty_decay_ms:30000,oversize_threshold:0,lg_tcache_max:16,prof:true,prof_prefix:jeprof.out" diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index 1a2c91915f..f85d3fd900 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -2614,4 +2614,25 @@ View configuration show data (Detail:HELP SHOW DATA) ``` +#### `prefer_compute_node_for_external_table` + +Default:false + +IsMutable:true + +MasterOnly:false + +If set to true, query on external table will prefer to assign to compute node. And the max number of compute node is controlled by `min_backend_num_for_external_table`. +If set to false, query on external table will assign to any node. + +#### `min_backend_num_for_external_table` + +Default:3 + +IsMutable:true + +MasterOnly:false + +Only take effect when `prefer_compute_node_for_external_table` is true. If the compute node number is less than this value, query on external table will try to get some mix node to assign, to let the total number of node reach this value. +If the compute node number is larger than this value, query on external table will assign to compute node only. diff --git a/docs/en/docs/advanced/compute_node.md b/docs/en/docs/advanced/compute_node.md index 7f57d88ea1..5d8b671b60 100644 --- a/docs/en/docs/advanced/compute_node.md +++ b/docs/en/docs/advanced/compute_node.md @@ -96,18 +96,24 @@ HeartbeatFailureCounter: 0 ``` ### Usage -When using the [MultiCatalog](../lakehouse/multi-catalog/multi-catalog) , the query will be preferentially scheduled to the compute node. -In order to balance task scheduling, FE has a `backend_num_for_federation` configuration item, which defaults to 3. -When executing a federated query, the optimizer will select `backend_num_for_federation` as an alternative to the scheduler, -and the scheduler will decide which node to execute on to prevent the task from being skewed. -If the number of compute nodes is less than `backend_num_for_federation`, the mixed nodes will be randomly selected to meet the number. -If the compute node is greater than `backend_num_for_federation`, the federated query task will only be scheduled on the compute node. + +Add configuration items in fe.conf + +``` +prefer_compute_node_for_external_table=true +min_backend_num_for_external_table=3 +``` + +> For parameter description, please refer to: [FE configuration item](../admin-manual/config/fe-config.md) + +When using the [MultiCatalog](../lakehouse/multi-catalog/multi-catalog) function when querying, the query will be dispatched to the computing node first. ### some restrictions -- The compute node currently only supports the query for Hive MetaStore of MultiCatalog and the others are still on the hybrid node. + - Compute nodes are controlled by configuration items, so do not configure mixed type nodes, modify the configuration to compute nodes. -## Unfinished business +## TODO + - Computational spillover: Doris inner table query, when the cluster load is high, the upper layer (outside TableScan) operator can be scheduled to the compute node. - Graceful offline: - When the compute node goes offline, the new task of the task is automatically scheduled to online nodes diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index 2a3d499db9..d995ecf664 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -2614,3 +2614,23 @@ ALTER DATABASE db_name SET TRANSACTION QUOTA quota; show data (其他用法:HELP SHOW DATA) ``` +#### `prefer_compute_node_for_external_table` + +默认值:false + +是否可以动态配置:true + +是否为 Master FE 节点独有的配置项:false + +如果设置为 true,对外部表的查询将优先分配给计算节点。计算节点的最大数量由 `min_backend_num_for_external_table` 控制。如果设置为 false,对外部表的查询将分配给任何节点。 + +#### `min_backend_num_for_external_table` + +默认值:3 + +是否可以动态配置:true + +是否为 Master FE 节点独有的配置项:false + +仅在 `prefer_compute_node_for_external_table` 为 true 时生效。如果计算节点数小于此值,则对外部表的查询将尝试使用一些混合节点,让节点总数达到这个值。 +如果计算节点数大于这个值,外部表的查询将只分配给计算节点。 diff --git a/docs/zh-CN/docs/advanced/compute_node.md b/docs/zh-CN/docs/advanced/compute_node.md index 8b2825034f..9a302ed186 100644 --- a/docs/zh-CN/docs/advanced/compute_node.md +++ b/docs/zh-CN/docs/advanced/compute_node.md @@ -89,16 +89,23 @@ HeartbeatFailureCounter: 0 ``` ### 使用 -当查询时使用[MultiCatalog](../lakehouse/multi-catalog/multi-catalog)功能时, 查询会优先调度到计算节点, 为了均衡任务调度, FE有一个`backend_num_for_federation`配置项, 默认是3. -当执行联邦查询时, 优化器会选取`backend_num_for_federation`给调度器备选, 由调取器决定具体在哪个节点执行, 防止查询任务倾斜. -当计算节点个数小于`backend_num_for_federation`时, 会随机选择混合节点补齐个数;当计算节点大于`backend_num_for_federation`, 那么联邦查询任务只会在计算节点执行. +在 fe.conf 中添加配置项 + +``` +prefer_compute_node_for_external_table=true +min_backend_num_for_external_table=3 +``` + +> 参数说明请参阅:[FE 配置项](../admin-manual/config/fe-config.md) + +当查询时使用[MultiCatalog](../lakehouse/multi-catalog/multi-catalog)功能时, 查询会优先调度到计算节点。 ### 一些限制 -- 计算节点目前只支持[MultiCatalog](../lakehouse/multi-catalog/multi-catalog)对应的Hive MetaStore表类型查询语法, 普通外表的计算依然在混合节点上. + - 计算节点由配置项控制, 但不要将混合类型节点, 修改配置为计算节点. - ## 未尽事项 + - 计算外溢: Doris内表查询, 当集群负载高的时候, 上层(TableScan之外)算子调度到计算节点中. - 优雅下线: 当节点下线的时候, 任务新任务自动调度到其他节点; 等待老任务后全部完成后节点再下线; 老任务无法按时结束时, 能够让任务能够自己结束. diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 826ae98543..840f1237f0 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1828,12 +1828,20 @@ public class Config extends ConfigBase { public static boolean keep_scheduler_mtmv_task_when_job_deleted = false; /** - * The candidate of the backend node for federation query such as hive table and es table query. - * If the backend of computation role is less than this value, it will acquire some mix backend. - * If the computation backend is enough, federation query will only assign to computation backend. + * If set to true, query on external table will prefer to assign to compute node. + * And the max number of compute node is controlled by min_backend_num_for_external_table. + * If set to false, query on external table will assign to any node. */ @ConfField(mutable = true, masterOnly = false) - public static int backend_num_for_federation = 3; + public static boolean prefer_compute_node_for_external_table = false; + /** + * Only take effect when prefer_compute_node_for_external_table is true. + * If the compute node number is less than this value, query on external table will try to get some mix node + * to assign, to let the total number of node reach this value. + * If the compute node number is larger than this value, query on external table will assign to compute node only. + */ + @ConfField(mutable = true, masterOnly = false) + public static int min_backend_num_for_external_table = 3; /** * Max query profile num. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java index a7a7ba8cc3..3291b31741 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java @@ -57,8 +57,8 @@ public class BackendPolicy { .needQueryAvailable() .needLoadAvailable() .addTags(tags) - .preferComputeNode() - .assignExpectBeNum(Config.backend_num_for_federation) + .preferComputeNode(Config.prefer_compute_node_for_external_table) + .assignExpectBeNum(Config.min_backend_num_for_external_table) .build(); backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo().getIdToBackend().values())); if (backends.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index 32d35828b8..6a9bbdd8f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -57,6 +57,7 @@ import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TExpr; +import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileScanNode; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFileScanSlotInfo; @@ -714,6 +715,24 @@ public class ExternalFileScanNode extends ExternalScanNode { output.append(prefix).append("partition=").append(readPartitionNum).append("/").append(totalPartitionNum) .append("\n"); + if (detailLevel == TExplainLevel.VERBOSE) { + output.append(prefix).append("backends:").append("\n"); + for (TScanRangeLocations locations : scanRangeLocations) { + output.append(prefix).append(" ").append(locations.getLocations().get(0).backend_id).append("\n"); + List files = locations.getScanRange().getExtScanRange().getFileScanRange().getRanges(); + for (int i = 0; i < 3; i++) { + if (i >= files.size()) { + break; + } + TFileRangeDesc file = files.get(i); + output.append(prefix).append(" ").append(file.getPath()) + .append(" start: ").append(file.getStartOffset()) + .append(" length: ").append(file.getFileSize()) + .append("\n"); + } + } + } + output.append(prefix); if (cardinality > 0) { output.append(String.format("cardinality=%s, ", cardinality)); @@ -727,5 +746,3 @@ public class ExternalFileScanNode extends ExternalScanNode { } } - - diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java index d0767caff6..3f3db6d78c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java @@ -100,8 +100,8 @@ public class BeSelectionPolicy { return this; } - public Builder preferComputeNode() { - policy.preferComputeNode = true; + public Builder preferComputeNode(boolean prefer) { + policy.preferComputeNode = prefer; return this; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java index 0afa019310..ec9ce6301d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java @@ -235,15 +235,15 @@ public class SystemInfoServiceTest { Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy01, 1).size()); BeSelectionPolicy policy02 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)) - .setStorageMedium(TStorageMedium.HDD).preferComputeNode().build(); + .setStorageMedium(TStorageMedium.HDD).preferComputeNode(true).build(); Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy02, 1).size()); BeSelectionPolicy policy03 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)) - .setStorageMedium(TStorageMedium.HDD).preferComputeNode().assignExpectBeNum(0).build(); + .setStorageMedium(TStorageMedium.HDD).preferComputeNode(true).assignExpectBeNum(0).build(); Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy03, 1).size()); BeSelectionPolicy policy04 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)) - .setStorageMedium(TStorageMedium.HDD).preferComputeNode().assignExpectBeNum(1).build(); + .setStorageMedium(TStorageMedium.HDD).preferComputeNode(true).assignExpectBeNum(1).build(); Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy04, 1).size()); // one compute node and two mix node @@ -264,11 +264,11 @@ public class SystemInfoServiceTest { Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy05, 3).size()); BeSelectionPolicy policy06 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)) - .setStorageMedium(TStorageMedium.HDD).preferComputeNode().assignExpectBeNum(2).build(); + .setStorageMedium(TStorageMedium.HDD).preferComputeNode(true).assignExpectBeNum(2).build(); Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy06, 2).size()); BeSelectionPolicy policy07 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)) - .setStorageMedium(TStorageMedium.HDD).preferComputeNode().assignExpectBeNum(3).build(); + .setStorageMedium(TStorageMedium.HDD).preferComputeNode(true).assignExpectBeNum(3).build(); Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy07, 3).size()); } diff --git a/regression-test/suites/jsonb_p0/test_jsonb_load_and_function.groovy b/regression-test/suites/jsonb_p0/test_jsonb_load_and_function.groovy index 07ea8a781a..e52210b37d 100644 --- a/regression-test/suites/jsonb_p0/test_jsonb_load_and_function.groovy +++ b/regression-test/suites/jsonb_p0/test_jsonb_load_and_function.groovy @@ -61,7 +61,7 @@ suite("test_jsonb_load_and_function", "p0") { log.info("error result: " + out) assertEquals("fail", json.Status.toLowerCase()) - assertEquals("[INTERNAL_ERROR]too many filtered rows", json.Message) + assertTrue(json.Message.contains("too many filtered rows")) assertEquals(25, json.NumberTotalRows) assertEquals(18, json.NumberLoadedRows) assertEquals(7, json.NumberFilteredRows) diff --git a/regression-test/suites/jsonb_p0/test_jsonb_load_unique_key_and_function.groovy b/regression-test/suites/jsonb_p0/test_jsonb_load_unique_key_and_function.groovy index 15f5e9842b..7efec3dcbf 100644 --- a/regression-test/suites/jsonb_p0/test_jsonb_load_unique_key_and_function.groovy +++ b/regression-test/suites/jsonb_p0/test_jsonb_load_unique_key_and_function.groovy @@ -50,7 +50,7 @@ suite("test_jsonb_unique_load_and_function", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals("[INTERNAL_ERROR]too many filtered rows", json.Message) + assertTrue(json.Message.contains("too many filtered rows")) assertEquals(75, json.NumberTotalRows) assertEquals(54, json.NumberLoadedRows) assertEquals(21, json.NumberFilteredRows) diff --git a/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy b/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy index d320d2861a..611adf9e3a 100644 --- a/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy +++ b/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy @@ -81,7 +81,7 @@ suite("test_mysql_load", "p0") { (k1, k2, v2, v10, v11) PROPERTIES ("strict_mode"="true"); """ - exception "errCode = 2, detailMessage = [INTERNAL_ERROR]too many filtered rows" + exception "too many filtered rows" } // test_line_delimiter