From 1dec592e917905ab6c4a253fee54c2f22d8fdaf9 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Mon, 26 Jun 2023 11:37:14 +0800 Subject: [PATCH] [improvement](fs_bench) optimize the usage of fs benchmark tool for hdfs (#21154) Optimize the usage of fs benchmark tool: 1. Remove `Open` benchmark, it is useless. 2. Remove `Delete` benchmark, it is dangerous. 3. Add `SingleRead` benchmark, user can specify an exist file to test read operation: `sh bin/run-fs-benchmark.sh --conf=conf/hdfs_read.conf --fs_type=hdfs --operation=single_read` 4. Modify the `run-fs-benchmark.sh`, remove `OPTS` section, use options in `fs_benchmark_tool` directly 5. Add some custom counters in the benchmark result, eg: ``` -------------------------------------------------------------------------------------------------------------------------------- Benchmark Time CPU Iterations UserCounters... -------------------------------------------------------------------------------------------------------------------------------- HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1 6864 ms 2385 ms 1 ReadRate=200.936M/s HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1 3919 ms 1828 ms 1 ReadRate=351.96M/s HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1 3839 ms 1819 ms 1 ReadRate=359.265M/s HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_mean 4874 ms 2011 ms 3 ReadRate=304.054M/s HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_median 3919 ms 1828 ms 3 ReadRate=351.96M/s HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_stddev 1724 ms 324 ms 3 ReadRate=89.3768M/s HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_cv 35.37 % 16.11 % 3 ReadRate=29.40% HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_max 6864 ms 2385 ms 3 ReadRate=359.265M/s HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_min 3839 ms 1819 ms 3 ReadRate=200.936M/s ``` - For `open_read` and `single_read`, add `ReadRate` as `bytes per second`. - For `create_write`, add `WriteRate` as `bytes per second`. - For `exists` and `rename`, add `ExistsCost` and `RenameCost` as `time cost per one operation`. --- be/src/io/fs/benchmark/base_benchmark.h | 2 +- be/src/io/fs/benchmark/benchmark_factory.hpp | 12 +- be/src/io/fs/benchmark/fs_benchmark_tool.cpp | 24 ++-- be/src/io/fs/benchmark/hdfs_benchmark.hpp | 126 +++++++------------ bin/run-fs-benchmark.sh | 67 +--------- build.sh | 4 + 6 files changed, 68 insertions(+), 167 deletions(-) diff --git a/be/src/io/fs/benchmark/base_benchmark.h b/be/src/io/fs/benchmark/base_benchmark.h index bcc8ed284c..c28ad02de5 100644 --- a/be/src/io/fs/benchmark/base_benchmark.h +++ b/be/src/io/fs/benchmark/base_benchmark.h @@ -97,7 +97,7 @@ protected: int _threads; int _iterations; size_t _file_size; - int _repetitions = 3; + int _repetitions = 1; std::map _conf_map; }; diff --git a/be/src/io/fs/benchmark/benchmark_factory.hpp b/be/src/io/fs/benchmark/benchmark_factory.hpp index bc73f904be..3e8c9314ca 100644 --- a/be/src/io/fs/benchmark/benchmark_factory.hpp +++ b/be/src/io/fs/benchmark/benchmark_factory.hpp @@ -50,12 +50,10 @@ Status BenchmarkFactory::getBm(const std::string fs_type, const std::string op_t *bm = new HdfsCreateWriteBenchmark(threads, iterations, file_size, conf_map); } else if (op_type == "open_read") { *bm = new HdfsOpenReadBenchmark(threads, iterations, file_size, conf_map); - } else if (op_type == "open") { - *bm = new HdfsOpenBenchmark(threads, iterations, file_size, conf_map); + } else if (op_type == "single_read") { + *bm = new HdfsSingleReadBenchmark(threads, iterations, file_size, conf_map); } else if (op_type == "rename") { *bm = new HdfsRenameBenchmark(threads, iterations, file_size, conf_map); - } else if (op_type == "delete") { - *bm = new HdfsDeleteBenchmark(threads, iterations, file_size, conf_map); } else if (op_type == "exists") { *bm = new HdfsExistsBenchmark(threads, iterations, file_size, conf_map); } else { @@ -80,7 +78,7 @@ public: _conf_map(conf_map) {} ~MultiBenchmark() { - for (auto bm : benchmarks) { + for (auto bm : _benchmarks) { delete bm; } } @@ -113,12 +111,12 @@ public: return st; } bm->register_bm(); - benchmarks.emplace_back(bm); + _benchmarks.emplace_back(bm); return Status::OK(); } private: - std::vector benchmarks; + std::vector _benchmarks; std::string _type; std::string _operation; int64_t _threads; diff --git a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp index 4002ea84ac..a5be5db80a 100644 --- a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp +++ b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp @@ -21,12 +21,13 @@ #include "io/fs/benchmark/benchmark_factory.hpp" -DEFINE_string(fs_type, "hdfs", "Supported File System: s3, hdfs, local"); +DEFINE_string(fs_type, "hdfs", "Supported File System: s3, hdfs"); DEFINE_string(operation, "create_write", "Supported Operations: create_write, open_read, open, rename, delete, exists"); -DEFINE_string(threads, "10", "Number of threads"); -DEFINE_string(iterations, "10", "Number of runs"); -DEFINE_string(file_size, "104857600", "File size"); +DEFINE_string(threads, "1", "Number of threads"); +DEFINE_string(iterations, "1", "Number of runs of each thread"); +DEFINE_string(repetitions, "1", "Number of iterations"); +DEFINE_string(file_size, "0", "File size for read/write opertions"); DEFINE_string(conf, "", "config file"); std::string get_usage(const std::string& progname) { @@ -35,8 +36,9 @@ std::string get_usage(const std::string& progname) { ss << "Usage:\n"; ss << progname - << " --fs_type=[fs_type] --operation=[op_type] --threads=10 --iterations=10 " - "--file_size=104857600\n"; + << " --fs_type=[fs_type] --operation=[op_type] --threads=[num] --iterations=[num] " + "--repetitions=[num] " + "--file_size=[num]\n"; ss << "\nfs_type:\n"; ss << " hdfs\n"; ss << " s3\n"; @@ -46,13 +48,15 @@ std::string get_usage(const std::string& progname) { ss << "\nthreads:\n"; ss << " num of threads\n"; ss << "\niterations:\n"; - ss << " num of run\n"; + ss << " Number of runs of each thread\n"; + ss << "\nrepetitions:\n"; + ss << " Number of iterations\n"; ss << "\nfile_size:\n"; - ss << " file size\n"; + ss << " File size for read/write opertions\n"; ss << "\nExample:\n"; ss << progname - << " --conf my.conf --fs_type=s3 --operation=read --threads=10 --iterations=100 " - "--file_size=104857600\n"; + << " --conf my.conf --fs_type=hdfs --operation=create_write --threads=2 --iterations=100 " + "--file_size=1048576\n"; return ss.str(); } diff --git a/be/src/io/fs/benchmark/hdfs_benchmark.hpp b/be/src/io/fs/benchmark/hdfs_benchmark.hpp index 637f7a614a..1307ddc95a 100644 --- a/be/src/io/fs/benchmark/hdfs_benchmark.hpp +++ b/be/src/io/fs/benchmark/hdfs_benchmark.hpp @@ -38,17 +38,23 @@ public: Status init() override { return Status::OK(); } + virtual std::string get_file_path(benchmark::State& state) { + std::string base_dir = _conf_map["base_dir"]; + auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); + bm_log("file_path: {}", file_path); + return file_path; + } + Status run(benchmark::State& state) override { std::shared_ptr fs; io::FileReaderSPtr reader; bm_log("begin to init {}", _name); - std::string base_dir = _conf_map["baseDir"]; size_t buffer_size = _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L; io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); THdfsParams hdfs_params = parse_properties(_conf_map); - auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); - bm_log("file_path: {}", file_path); + + auto file_path = get_file_path(state); RETURN_IF_ERROR( FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, &reader, reader_opts)); bm_log("finish to init {}", _name); @@ -61,10 +67,13 @@ public: size_t offset = 0; size_t bytes_read = 0; - auto start = std::chrono::high_resolution_clock::now(); - size_t read_size = _file_size; + size_t read_size = reader->size(); + if (_file_size > 0) { + read_size = std::min(read_size, _file_size); + } long remaining_size = read_size; + auto start = std::chrono::high_resolution_clock::now(); while (remaining_size > 0) { bytes_read = 0; size_t size = std::min(buffer_size, (size_t)remaining_size); @@ -81,13 +90,13 @@ public: remaining_size -= bytes_read; } bm_log("finish to run {}", _name); - auto end = std::chrono::high_resolution_clock::now(); auto elapsed_seconds = std::chrono::duration_cast>(end - start); state.SetIterationTime(elapsed_seconds.count()); + state.counters["ReadRate"] = benchmark::Counter(read_size, benchmark::Counter::kIsRate); if (reader != nullptr) { reader->close(); @@ -96,41 +105,19 @@ public: } }; -class HdfsOpenBenchmark : public BaseBenchmark { +// Read a single specified file +class HdfsSingleReadBenchmark : public HdfsOpenReadBenchmark { public: - HdfsOpenBenchmark(int threads, int iterations, size_t file_size, - const std::map& conf_map) - : BaseBenchmark("HdfsOpenBenchmark", threads, iterations, file_size, 3, conf_map) {} - virtual ~HdfsOpenBenchmark() = default; + HdfsSingleReadBenchmark(int threads, int iterations, size_t file_size, + const std::map& conf_map) + : HdfsOpenReadBenchmark(threads, iterations, file_size, conf_map) {} + virtual ~HdfsSingleReadBenchmark() = default; - Status init() override { return Status::OK(); } - - Status run(benchmark::State& state) override { - bm_log("begin to run {}", _name); - auto start = std::chrono::high_resolution_clock::now(); - std::string base_dir = _conf_map["baseDir"]; - io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); - THdfsParams hdfs_params = parse_properties(_conf_map); - auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); + virtual std::string get_file_path(benchmark::State& state) override { + std::string file_path = _conf_map["file_path"]; bm_log("file_path: {}", file_path); - std::shared_ptr fs; - io::FileReaderSPtr reader; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); - RETURN_IF_ERROR(fs->open_file(file_path, reader_opts, &reader)); - auto end = std::chrono::high_resolution_clock::now(); - auto elapsed_seconds = - std::chrono::duration_cast>(end - start); - - state.SetIterationTime(elapsed_seconds.count()); - bm_log("finish to run {}", _name); - - if (reader != nullptr) { - reader->close(); - } - return Status::OK(); + return file_path; } - -private: }; class HdfsCreateWriteBenchmark : public BaseBenchmark { @@ -141,24 +128,17 @@ public: conf_map) {} virtual ~HdfsCreateWriteBenchmark() = default; - Status init() override { - std::string base_dir = _conf_map["baseDir"]; - io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); - THdfsParams hdfs_params = parse_properties(_conf_map); - std::shared_ptr fs; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); - RETURN_IF_ERROR(fs->delete_directory(base_dir)); - return Status::OK(); - } + Status init() override { return Status::OK(); } Status run(benchmark::State& state) override { bm_log("begin to run {}", _name); - auto start = std::chrono::high_resolution_clock::now(); - std::string base_dir = _conf_map["baseDir"]; + std::string base_dir = _conf_map["base_dir"]; io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); THdfsParams hdfs_params = parse_properties(_conf_map); auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); bm_log("file_path: {}", file_path); + + auto start = std::chrono::high_resolution_clock::now(); std::shared_ptr fs; io::FileWriterPtr writer; RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); @@ -188,6 +168,8 @@ public: state.SetIterationTime(elapsed_seconds.count()); bm_log("finish to run {}", _name); + state.counters["WriteRate"] = benchmark::Counter(write_size, benchmark::Counter::kIsRate); + if (writer != nullptr) { writer->close(); } @@ -206,13 +188,14 @@ public: Status run(benchmark::State& state) override { bm_log("begin to run {}", _name); - auto start = std::chrono::high_resolution_clock::now(); - std::string base_dir = _conf_map["baseDir"]; + std::string base_dir = _conf_map["base_dir"]; io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); THdfsParams hdfs_params = parse_properties(_conf_map); auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); auto new_file_path = fmt::format("{}/test_{}_new", base_dir, state.thread_index()); bm_log("file_path: {}", file_path); + + auto start = std::chrono::high_resolution_clock::now(); std::shared_ptr fs; io::FileWriterPtr writer; RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); @@ -224,6 +207,9 @@ public: state.SetIterationTime(elapsed_seconds.count()); bm_log("finish to run {}", _name); + state.counters["RenameCost"] = + benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert); + if (writer != nullptr) { writer->close(); } @@ -233,38 +219,6 @@ public: private: }; -class HdfsDeleteBenchmark : public BaseBenchmark { -public: - HdfsDeleteBenchmark(int threads, int iterations, size_t file_size, - const std::map& conf_map) - : BaseBenchmark("HdfsDeleteBenchmark", threads, 1, file_size, 1, conf_map) {} - virtual ~HdfsDeleteBenchmark() = default; - - Status init() override { return Status::OK(); } - - Status run(benchmark::State& state) override { - bm_log("begin to run {}", _name); - auto start = std::chrono::high_resolution_clock::now(); - std::string base_dir = _conf_map["baseDir"]; - io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); - THdfsParams hdfs_params = parse_properties(_conf_map); - auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); - bm_log("file_path: {}", file_path); - std::shared_ptr fs; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); - RETURN_IF_ERROR(fs->delete_file(file_path)); - auto end = std::chrono::high_resolution_clock::now(); - auto elapsed_seconds = - std::chrono::duration_cast>(end - start); - - state.SetIterationTime(elapsed_seconds.count()); - bm_log("finish to run {}", _name); - return Status::OK(); - } - -private: -}; - class HdfsExistsBenchmark : public BaseBenchmark { public: HdfsExistsBenchmark(int threads, int iterations, size_t file_size, @@ -276,12 +230,13 @@ public: Status run(benchmark::State& state) override { bm_log("begin to run {}", _name); - auto start = std::chrono::high_resolution_clock::now(); - std::string base_dir = _conf_map["baseDir"]; + std::string base_dir = _conf_map["base_dir"]; io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); THdfsParams hdfs_params = parse_properties(_conf_map); auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index()); bm_log("file_path: {}", file_path); + + auto start = std::chrono::high_resolution_clock::now(); std::shared_ptr fs; RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); bool res = false; @@ -292,6 +247,9 @@ public: state.SetIterationTime(elapsed_seconds.count()); bm_log("finish to run {}", _name); + + state.counters["ExistsCost"] = + benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert); return Status::OK(); } }; diff --git a/bin/run-fs-benchmark.sh b/bin/run-fs-benchmark.sh index bf433ef178..9eb47d3ee8 100755 --- a/bin/run-fs-benchmark.sh +++ b/bin/run-fs-benchmark.sh @@ -26,58 +26,6 @@ if [[ "$(uname -s)" == 'Darwin' ]] && command -v brew &>/dev/null; then export PATH fi -OPTS="$(getopt \ - -n "$0" \ - -o '' \ - -l 'conf:,fs_type:,operation:,threads:,iterations:,file_size:' \ - -- "$@")" - -eval set -- "${OPTS}" - -while true; do - case "$1" in - --conf) - CONF="$2" - shift 2 - ;; - --fs_type) - FS_TYPE="$2" - shift 2 - ;; - --operation) - OPERATION="$2" - shift 2 - ;; - --threads) - THREADS="$2" - shift 2 - ;; - --iterations) - ITERATIONS="$2" - shift 2 - ;; - --file_size) - FILE_SIZE="$2" - shift 2 - ;; - --) - shift - break - ;; - *) - echo "Internal error" - exit 1 - ;; - esac -done - -echo "CONF: ${CONF}" -echo "FS_TYPE: ${FS_TYPE}" -echo "OPERATION: ${OPERATION}" -echo "THREADS: ${THREADS}" -echo "ITERATIONS: ${ITERATIONS}" -echo "FILE_SIZE: ${FILE_SIZE}" - DORIS_HOME="$( cd "${curdir}/.." pwd @@ -316,16 +264,5 @@ export LIBHDFS_OPTS="${final_java_opt}" # see https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile export JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:30000,dirty_decay_ms:30000,oversize_threshold:0,lg_tcache_max:16,prof_prefix:jeprof.out" -${LIMIT:+${LIMIT}} "${DORIS_HOME}/lib/fs_benchmark_tool" --conf "${CONF}" --fs_type="${FS_TYPE}" --operation="${OPERATION}" --threads="${THREADS}" --iterations="${ITERATIONS}" --file_size="${FILE_SIZE}" 2>&1 | tee "${LOG_DIR}/fs_benchmark_tool.log" - -qps="0MB/s" -latency="0ms" - -eval "$(grep "median" "${LOG_DIR}/fs_benchmark_tool.log" | awk '{printf("qps=%sMB/s latency=%sms", "'"${FILE_SIZE}"'" / 1024 / 1024 / ($2 * "'"${THREADS}"'" / 1000), $2 * "'"${THREADS}"'")}')" - -echo "------------------------------" -echo " Benchmark Result " -echo "------------------------------" -echo "thread_num: ${THREADS}." -echo "qps: ${qps}." -echo "latency: ${latency}." +echo "$@" +${LIMIT:+${LIMIT}} "${DORIS_HOME}/lib/fs_benchmark_tool" "$@" 2>&1 | tee "${LOG_DIR}/fs_benchmark_tool.log" diff --git a/build.sh b/build.sh index 44f69fc88f..dfc16b7936 100755 --- a/build.sh +++ b/build.sh @@ -637,6 +637,10 @@ EOF cp -r -p "${DORIS_HOME}/be/output/lib/debug_info" "${DORIS_OUTPUT}/be/lib"/ fi + if [[ "${BUILD_FS_BENCHMARK}" = "ON" ]]; then + cp -r -p "${DORIS_HOME}/bin/run-fs-benchmark.sh" "${DORIS_OUTPUT}/be/bin/"/ + fi + extensions_modules=("") extensions_modules+=("java-udf") extensions_modules+=("jdbc-scanner")