diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c6b0e08e19..2f8445e73f 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1046,6 +1046,10 @@ DEFINE_mInt64(LZ4_HC_compression_level, "9"); // enable window_funnel_function with different modes DEFINE_mBool(enable_window_funnel_function_v2, "false"); +DEFINE_Bool(enable_hdfs_hedged_read, "false"); +DEFINE_Int32(hdfs_hedged_read_thread_num, "128"); +DEFINE_Int32(hdfs_hedged_read_threshold_time, "500"); + #ifdef BE_TEST // test s3 DEFINE_String(test_s3_resource, "resource"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 958b17aa45..a1e2afdfe5 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1089,6 +1089,16 @@ DECLARE_mInt64(LZ4_HC_compression_level); // enable window_funnel_function with different modes DECLARE_mBool(enable_window_funnel_function_v2); +// whether to enable hdfs hedged read. +// If set to true, it will be enabled even if user not enable it when creating catalog +DECLARE_Bool(enable_hdfs_hedged_read); +// hdfs hedged read thread pool size, for "dfs.client.hedged.read.threadpool.size" +// Maybe overwritten by the value specified when creating catalog +DECLARE_Int32(hdfs_hedged_read_thread_num); +// the threshold of doing hedged read, for "dfs.client.hedged.read.threshold.millis" +// Maybe overwritten by the value specified when creating catalog +DECLARE_Int32(hdfs_hedged_read_threshold_time); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index cf3a2b6563..04c9ee2150 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -57,6 +57,13 @@ HdfsFileReader::HdfsFileReader(Path path, const std::string& name_node, _profile, "TotalShortCircuitBytesRead", TUnit::BYTES, hdfs_profile_name); _hdfs_profile.total_total_zero_copy_bytes_read = ADD_CHILD_COUNTER( _profile, "TotalZeroCopyBytesRead", TUnit::BYTES, hdfs_profile_name); + + _hdfs_profile.total_hedged_read = + ADD_CHILD_COUNTER(_profile, "TotalHedgedRead", TUnit::UNIT, hdfs_profile_name); + _hdfs_profile.hedged_read_in_cur_thread = ADD_CHILD_COUNTER( + _profile, "HedgedReadInCurThread", TUnit::UNIT, hdfs_profile_name); + _hdfs_profile.hedged_read_wins = + ADD_CHILD_COUNTER(_profile, "HedgedReadWins", TUnit::UNIT, hdfs_profile_name); #endif } } @@ -85,6 +92,22 @@ Status HdfsFileReader::close() { COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read, hdfs_statistics->totalZeroCopyBytesRead); hdfsFileFreeReadStatistics(hdfs_statistics); + + struct hdfsHedgedReadMetrics* hdfs_hedged_read_statistics = nullptr; + r = hdfsGetHedgedReadMetrics(_handle->fs(), &hdfs_hedged_read_statistics); + if (r != 0) { + return Status::InternalError( + fmt::format("Failed to run hdfsGetHedgedReadMetrics(): {}", r)); + } + + COUNTER_UPDATE(_hdfs_profile.total_hedged_read, + hdfs_hedged_read_statistics->hedgedReadOps); + COUNTER_UPDATE(_hdfs_profile.hedged_read_in_cur_thread, + hdfs_hedged_read_statistics->hedgedReadOpsInCurThread); + COUNTER_UPDATE(_hdfs_profile.hedged_read_wins, + hdfs_hedged_read_statistics->hedgedReadOpsWin); + + hdfsFreeHedgedReadMetrics(hdfs_hedged_read_statistics); hdfsFileClearReadStatistics(_handle->file()); #endif } @@ -92,6 +115,36 @@ Status HdfsFileReader::close() { return Status::OK(); } +#ifdef USE_HADOOP_HDFS +Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* /*io_ctx*/) { + DCHECK(!closed()); + if (offset > _handle->file_size()) { + return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})", + offset, _handle->file_size(), _path.native()); + } + + size_t bytes_req = result.size; + char* to = result.data; + bytes_req = std::min(bytes_req, (size_t)(_handle->file_size() - offset)); + *bytes_read = 0; + if (UNLIKELY(bytes_req == 0)) { + return Status::OK(); + } + + tSize r = hdfsPread(_handle->fs(), _handle->file(), offset, to, bytes_req); + if (r == -1) { + return Status::InternalError( + "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}", + BackendOptions::get_localhost(), _name_node, _path.string(), hdfs_error()); + } + *bytes_read = bytes_req; + return Status::OK(); +} + +#else +// The hedged read only support hdfsPread(). +// TODO: rethink here to see if there are some difference betwenn hdfsPread() and hdfsRead() Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* /*io_ctx*/) { DCHECK(!closed()); @@ -131,5 +184,6 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r *bytes_read = has_read; return Status::OK(); } +#endif } // namespace io } // namespace doris diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h index 864a55bc41..4e093e1c9b 100644 --- a/be/src/io/fs/hdfs_file_reader.h +++ b/be/src/io/fs/hdfs_file_reader.h @@ -64,6 +64,10 @@ private: RuntimeProfile::Counter* total_local_bytes_read; RuntimeProfile::Counter* total_short_circuit_bytes_read; RuntimeProfile::Counter* total_total_zero_copy_bytes_read; + + RuntimeProfile::Counter* total_hedged_read; + RuntimeProfile::Counter* hedged_read_in_cur_thread; + RuntimeProfile::Counter* hedged_read_wins; }; #endif diff --git a/be/src/io/hdfs_builder.cpp b/be/src/io/hdfs_builder.cpp index 19986f76e4..b420c84e13 100644 --- a/be/src/io/hdfs_builder.cpp +++ b/be/src/io/hdfs_builder.cpp @@ -26,6 +26,7 @@ #include #include "agent/utils.h" +#include "common/config.h" #include "common/logging.h" #include "io/fs/hdfs.h" #include "util/string_util.h" @@ -134,6 +135,7 @@ Status createHDFSBuilder(const THdfsParams& hdfsParams, HDFSCommonBuilder* build if (hdfsParams.__isset.hdfs_conf) { for (const THdfsConf& conf : hdfsParams.hdfs_conf) { hdfsBuilderConfSetStr(builder->get(), conf.key.c_str(), conf.value.c_str()); + LOG(INFO) << "set hdfs config: " << conf.key << ", value: " << conf.value; #ifdef USE_HADOOP_HDFS // Set krb5.conf, we should define java.security.krb5.conf in catalog properties if (strcmp(conf.key.c_str(), "java.security.krb5.conf") == 0) { @@ -143,6 +145,17 @@ Status createHDFSBuilder(const THdfsParams& hdfsParams, HDFSCommonBuilder* build } } +#ifdef USE_HADOOP_HDFS + if (config::enable_hdfs_hedged_read) { + hdfsBuilderConfSetStr(builder->get(), "dfs.client.hedged.read.threadpool.size", + std::to_string(config::hdfs_hedged_read_thread_num).c_str()); + hdfsBuilderConfSetStr(builder->get(), "dfs.client.hedged.read.threshold.millis", + std::to_string(config::hdfs_hedged_read_threshold_time).c_str()); + LOG(INFO) << "set hdfs hedged read config: " << config::hdfs_hedged_read_thread_num << ", " + << config::hdfs_hedged_read_threshold_time; + } +#endif + hdfsBuilderConfSetStr(builder->get(), "ipc.client.fallback-to-simple-auth-allowed", "true"); if (builder->is_need_kinit()) { diff --git a/docs/en/docs/lakehouse/faq.md b/docs/en/docs/lakehouse/faq.md index e73a0a5bf4..9a756bf830 100644 --- a/docs/en/docs/lakehouse/faq.md +++ b/docs/en/docs/lakehouse/faq.md @@ -189,3 +189,43 @@ under the License. 'hive.version' = '2.x.x' ); ``` +19. Use Hedged Read to optimize the problem of slow HDFS reading. + + In some cases, the high load of HDFS may lead to a long time to read the data on HDFS, thereby slowing down the overall query efficiency. HDFS Client provides Hedged Read. + This function can start another read thread to read the same data when a read request exceeds a certain threshold and is not returned, and whichever is returned first will use the result. + + This feature can be enabled in two ways: + + - Specify in the parameters to create the Catalog: + + ``` + create catalog regression properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://172.21.16.47:7004', + 'dfs.client.hedged.read.threadpool.size' = '128', + 'dfs.client.hedged.read.threshold.millis' = "500" + ); + ``` + + `dfs.client.hedged.read.threadpool.size` indicates the number of threads used for Hedged Read, which are shared by one HDFS Client. Usually, for an HDFS cluster, BE nodes will share an HDFS Client. + + `dfs.client.hedged.read.threshold.millis` is the read threshold in milliseconds. When a read request exceeds this threshold and is not returned, Hedged Read will be triggered. + + - Configure parameters in be.conf + + ``` + enable_hdfs_hedged_read = true + hdfs_hedged_read_thread_num = 128 + hdfs_hedged_read_threshold_time = 500 + ``` + + This method will enable Hedged Read globally on BE nodes (not enabled by default). And ignore the Hedged Read property set when creating the Catalog. + + After enabling it, you can see related parameters in Query Profile: + + `TotalHedgedRead`: The number of Hedged Reads initiated. + + `HedgedReadWins`: The number of successful Hedged Reads (numbers initiated and returned faster than the original request) + + Note that the value here is the cumulative value of a single HDFS Client, not the value of a single query. The same HDFS Client will be reused by multiple queries. + diff --git a/docs/zh-CN/docs/lakehouse/faq.md b/docs/zh-CN/docs/lakehouse/faq.md index 177f8e3b77..11e2cc937f 100644 --- a/docs/zh-CN/docs/lakehouse/faq.md +++ b/docs/zh-CN/docs/lakehouse/faq.md @@ -169,11 +169,13 @@ under the License. ``` 16. 在Catalog中配置Kerberos时,报错`Unable to obtain password from user`的解决方法: + - 用到的principal必须在klist中存在,使用`klist -kt your.keytab`检查。 - 检查catalog配置是否正确,比如漏配`yarn.resourcemanager.principal`。 - 若上述检查没问题,则当前系统yum或者其他包管理软件安装的JDK版本存在不支持的加密算法,建议自行安装JDK并设置`JAVA_HOME`环境变量。 17. 查询配置了Kerberos的外表,遇到该报错:`GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos Ticket)`,一般重启FE和BE能够解决该问题。 + - 重启所有节点前可在`"${DORIS_HOME}/be/conf/be.conf"`中的JAVA_OPTS参数里配置`-Djavax.security.auth.useSubjectCredsOnly=false`,通过底层机制去获取JAAS credentials信息,而不是应用程序。 - 在[JAAS Troubleshooting](https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/Troubleshooting.html)中可获取更多常见JAAS报错的解决方法。 @@ -184,3 +186,49 @@ under the License. 'hive.version' = '1.x.x' ); ``` + +19. 使用 Hedged Read 优化 HDFS 读取慢的问题。 + + 在某些情况下,HDFS 的负载较高可能导致读取某个 HDFS 上的数据副本的时间较长,从而拖慢整体的查询效率。HDFS Client 提供了 Hedged Read 功能。 + 该功能可以在一个读请求超过一定阈值未返回时,启动另一个读线程读取同一份数据,哪个先返回就是用哪个结果。 + + 注意:该功能可能会增加 HDFS 集群的负载,请酌情使用。 + + 可以通过以下两种方式开启这个功能: + + - 在创建 Catalog 的参数中指定: + + ``` + create catalog regression properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://172.21.16.47:7004', + 'dfs.client.hedged.read.threadpool.size' = '128', + 'dfs.client.hedged.read.threshold.millis' = "500" + ); + ``` + + `dfs.client.hedged.read.threadpool.size` 表示用于 Hedged Read 的线程数,这些线程由一个 HDFS Client 共享。通常情况下,针对一个 HDFS 集群,BE 节点会共享一个 HDFS Client。 + + `dfs.client.hedged.read.threshold.millis` 是读取阈值,单位毫秒。当一个读请求超过这个阈值未返回时,会触发 Hedged Read。 + + - 在 be.conf 中配置参数 + + ``` + enable_hdfs_hedged_read = true + hdfs_hedged_read_thread_num = 128 + hdfs_hedged_read_threshold_time = 500 + ``` + + 这种方式会在BE节点全局开启 Hedged Read(默认不开启)。并忽略在创建 Catalog 时设置的 Hedged Read 属性。 + + 开启后,可以在 Query Profile 中看到相关参数: + + `TotalHedgedRead`: 发起 Hedged Read 的次数。 + + `HedgedReadWins`:Hedged Read 成功的次数(发起并且比原请求更快返回的次数) + + 注意,这里的值是单个 HDFS Client 的累计值,而不是单个查询的数值。同一个 HDFS Client 会被多个查询复用。 + + + +