[improvement](fs_bench) optimize the usage of fs benchmark tool for hdfs (#21154)
Optimize the usage of fs benchmark tool:
1. Remove `Open` benchmark, it is useless.
2. Remove `Delete` benchmark, it is dangerous.
3. Add `SingleRead` benchmark, user can specify an exist file to test read operation:
`sh bin/run-fs-benchmark.sh --conf=conf/hdfs_read.conf --fs_type=hdfs --operation=single_read`
4. Modify the `run-fs-benchmark.sh`, remove `OPTS` section, use options in `fs_benchmark_tool` directly
5. Add some custom counters in the benchmark result, eg:
```
--------------------------------------------------------------------------------------------------------------------------------
Benchmark Time CPU Iterations UserCounters...
--------------------------------------------------------------------------------------------------------------------------------
HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1 6864 ms 2385 ms 1 ReadRate=200.936M/s
HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1 3919 ms 1828 ms 1 ReadRate=351.96M/s
HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1 3839 ms 1819 ms 1 ReadRate=359.265M/s
HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_mean 4874 ms 2011 ms 3 ReadRate=304.054M/s
HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_median 3919 ms 1828 ms 3 ReadRate=351.96M/s
HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_stddev 1724 ms 324 ms 3 ReadRate=89.3768M/s
HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_cv 35.37 % 16.11 % 3 ReadRate=29.40%
HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_max 6864 ms 2385 ms 3 ReadRate=359.265M/s
HdfsReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_min 3839 ms 1819 ms 3 ReadRate=200.936M/s
```
- For `open_read` and `single_read`, add `ReadRate` as `bytes per second`.
- For `create_write`, add `WriteRate` as `bytes per second`.
- For `exists` and `rename`, add `ExistsCost` and `RenameCost` as `time cost per one operation`.
This commit is contained in:
@ -97,7 +97,7 @@ protected:
|
||||
int _threads;
|
||||
int _iterations;
|
||||
size_t _file_size;
|
||||
int _repetitions = 3;
|
||||
int _repetitions = 1;
|
||||
std::map<std::string, std::string> _conf_map;
|
||||
};
|
||||
|
||||
|
||||
@ -50,12 +50,10 @@ Status BenchmarkFactory::getBm(const std::string fs_type, const std::string op_t
|
||||
*bm = new HdfsCreateWriteBenchmark(threads, iterations, file_size, conf_map);
|
||||
} else if (op_type == "open_read") {
|
||||
*bm = new HdfsOpenReadBenchmark(threads, iterations, file_size, conf_map);
|
||||
} else if (op_type == "open") {
|
||||
*bm = new HdfsOpenBenchmark(threads, iterations, file_size, conf_map);
|
||||
} else if (op_type == "single_read") {
|
||||
*bm = new HdfsSingleReadBenchmark(threads, iterations, file_size, conf_map);
|
||||
} else if (op_type == "rename") {
|
||||
*bm = new HdfsRenameBenchmark(threads, iterations, file_size, conf_map);
|
||||
} else if (op_type == "delete") {
|
||||
*bm = new HdfsDeleteBenchmark(threads, iterations, file_size, conf_map);
|
||||
} else if (op_type == "exists") {
|
||||
*bm = new HdfsExistsBenchmark(threads, iterations, file_size, conf_map);
|
||||
} else {
|
||||
@ -80,7 +78,7 @@ public:
|
||||
_conf_map(conf_map) {}
|
||||
|
||||
~MultiBenchmark() {
|
||||
for (auto bm : benchmarks) {
|
||||
for (auto bm : _benchmarks) {
|
||||
delete bm;
|
||||
}
|
||||
}
|
||||
@ -113,12 +111,12 @@ public:
|
||||
return st;
|
||||
}
|
||||
bm->register_bm();
|
||||
benchmarks.emplace_back(bm);
|
||||
_benchmarks.emplace_back(bm);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<BaseBenchmark*> benchmarks;
|
||||
std::vector<BaseBenchmark*> _benchmarks;
|
||||
std::string _type;
|
||||
std::string _operation;
|
||||
int64_t _threads;
|
||||
|
||||
@ -21,12 +21,13 @@
|
||||
|
||||
#include "io/fs/benchmark/benchmark_factory.hpp"
|
||||
|
||||
DEFINE_string(fs_type, "hdfs", "Supported File System: s3, hdfs, local");
|
||||
DEFINE_string(fs_type, "hdfs", "Supported File System: s3, hdfs");
|
||||
DEFINE_string(operation, "create_write",
|
||||
"Supported Operations: create_write, open_read, open, rename, delete, exists");
|
||||
DEFINE_string(threads, "10", "Number of threads");
|
||||
DEFINE_string(iterations, "10", "Number of runs");
|
||||
DEFINE_string(file_size, "104857600", "File size");
|
||||
DEFINE_string(threads, "1", "Number of threads");
|
||||
DEFINE_string(iterations, "1", "Number of runs of each thread");
|
||||
DEFINE_string(repetitions, "1", "Number of iterations");
|
||||
DEFINE_string(file_size, "0", "File size for read/write opertions");
|
||||
DEFINE_string(conf, "", "config file");
|
||||
|
||||
std::string get_usage(const std::string& progname) {
|
||||
@ -35,8 +36,9 @@ std::string get_usage(const std::string& progname) {
|
||||
|
||||
ss << "Usage:\n";
|
||||
ss << progname
|
||||
<< " --fs_type=[fs_type] --operation=[op_type] --threads=10 --iterations=10 "
|
||||
"--file_size=104857600\n";
|
||||
<< " --fs_type=[fs_type] --operation=[op_type] --threads=[num] --iterations=[num] "
|
||||
"--repetitions=[num] "
|
||||
"--file_size=[num]\n";
|
||||
ss << "\nfs_type:\n";
|
||||
ss << " hdfs\n";
|
||||
ss << " s3\n";
|
||||
@ -46,13 +48,15 @@ std::string get_usage(const std::string& progname) {
|
||||
ss << "\nthreads:\n";
|
||||
ss << " num of threads\n";
|
||||
ss << "\niterations:\n";
|
||||
ss << " num of run\n";
|
||||
ss << " Number of runs of each thread\n";
|
||||
ss << "\nrepetitions:\n";
|
||||
ss << " Number of iterations\n";
|
||||
ss << "\nfile_size:\n";
|
||||
ss << " file size\n";
|
||||
ss << " File size for read/write opertions\n";
|
||||
ss << "\nExample:\n";
|
||||
ss << progname
|
||||
<< " --conf my.conf --fs_type=s3 --operation=read --threads=10 --iterations=100 "
|
||||
"--file_size=104857600\n";
|
||||
<< " --conf my.conf --fs_type=hdfs --operation=create_write --threads=2 --iterations=100 "
|
||||
"--file_size=1048576\n";
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
|
||||
@ -38,17 +38,23 @@ public:
|
||||
|
||||
Status init() override { return Status::OK(); }
|
||||
|
||||
virtual std::string get_file_path(benchmark::State& state) {
|
||||
std::string base_dir = _conf_map["base_dir"];
|
||||
auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
|
||||
bm_log("file_path: {}", file_path);
|
||||
return file_path;
|
||||
}
|
||||
|
||||
Status run(benchmark::State& state) override {
|
||||
std::shared_ptr<io::FileSystem> fs;
|
||||
io::FileReaderSPtr reader;
|
||||
bm_log("begin to init {}", _name);
|
||||
std::string base_dir = _conf_map["baseDir"];
|
||||
size_t buffer_size =
|
||||
_conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L;
|
||||
io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
|
||||
THdfsParams hdfs_params = parse_properties(_conf_map);
|
||||
auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
|
||||
bm_log("file_path: {}", file_path);
|
||||
|
||||
auto file_path = get_file_path(state);
|
||||
RETURN_IF_ERROR(
|
||||
FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, &reader, reader_opts));
|
||||
bm_log("finish to init {}", _name);
|
||||
@ -61,10 +67,13 @@ public:
|
||||
size_t offset = 0;
|
||||
size_t bytes_read = 0;
|
||||
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
size_t read_size = _file_size;
|
||||
size_t read_size = reader->size();
|
||||
if (_file_size > 0) {
|
||||
read_size = std::min(read_size, _file_size);
|
||||
}
|
||||
long remaining_size = read_size;
|
||||
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
while (remaining_size > 0) {
|
||||
bytes_read = 0;
|
||||
size_t size = std::min(buffer_size, (size_t)remaining_size);
|
||||
@ -81,13 +90,13 @@ public:
|
||||
remaining_size -= bytes_read;
|
||||
}
|
||||
bm_log("finish to run {}", _name);
|
||||
|
||||
auto end = std::chrono::high_resolution_clock::now();
|
||||
|
||||
auto elapsed_seconds =
|
||||
std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
|
||||
|
||||
state.SetIterationTime(elapsed_seconds.count());
|
||||
state.counters["ReadRate"] = benchmark::Counter(read_size, benchmark::Counter::kIsRate);
|
||||
|
||||
if (reader != nullptr) {
|
||||
reader->close();
|
||||
@ -96,41 +105,19 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class HdfsOpenBenchmark : public BaseBenchmark {
|
||||
// Read a single specified file
|
||||
class HdfsSingleReadBenchmark : public HdfsOpenReadBenchmark {
|
||||
public:
|
||||
HdfsOpenBenchmark(int threads, int iterations, size_t file_size,
|
||||
const std::map<std::string, std::string>& conf_map)
|
||||
: BaseBenchmark("HdfsOpenBenchmark", threads, iterations, file_size, 3, conf_map) {}
|
||||
virtual ~HdfsOpenBenchmark() = default;
|
||||
HdfsSingleReadBenchmark(int threads, int iterations, size_t file_size,
|
||||
const std::map<std::string, std::string>& conf_map)
|
||||
: HdfsOpenReadBenchmark(threads, iterations, file_size, conf_map) {}
|
||||
virtual ~HdfsSingleReadBenchmark() = default;
|
||||
|
||||
Status init() override { return Status::OK(); }
|
||||
|
||||
Status run(benchmark::State& state) override {
|
||||
bm_log("begin to run {}", _name);
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
std::string base_dir = _conf_map["baseDir"];
|
||||
io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
|
||||
THdfsParams hdfs_params = parse_properties(_conf_map);
|
||||
auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
|
||||
virtual std::string get_file_path(benchmark::State& state) override {
|
||||
std::string file_path = _conf_map["file_path"];
|
||||
bm_log("file_path: {}", file_path);
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
io::FileReaderSPtr reader;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
RETURN_IF_ERROR(fs->open_file(file_path, reader_opts, &reader));
|
||||
auto end = std::chrono::high_resolution_clock::now();
|
||||
auto elapsed_seconds =
|
||||
std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
|
||||
|
||||
state.SetIterationTime(elapsed_seconds.count());
|
||||
bm_log("finish to run {}", _name);
|
||||
|
||||
if (reader != nullptr) {
|
||||
reader->close();
|
||||
}
|
||||
return Status::OK();
|
||||
return file_path;
|
||||
}
|
||||
|
||||
private:
|
||||
};
|
||||
|
||||
class HdfsCreateWriteBenchmark : public BaseBenchmark {
|
||||
@ -141,24 +128,17 @@ public:
|
||||
conf_map) {}
|
||||
virtual ~HdfsCreateWriteBenchmark() = default;
|
||||
|
||||
Status init() override {
|
||||
std::string base_dir = _conf_map["baseDir"];
|
||||
io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
|
||||
THdfsParams hdfs_params = parse_properties(_conf_map);
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
RETURN_IF_ERROR(fs->delete_directory(base_dir));
|
||||
return Status::OK();
|
||||
}
|
||||
Status init() override { return Status::OK(); }
|
||||
|
||||
Status run(benchmark::State& state) override {
|
||||
bm_log("begin to run {}", _name);
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
std::string base_dir = _conf_map["baseDir"];
|
||||
std::string base_dir = _conf_map["base_dir"];
|
||||
io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
|
||||
THdfsParams hdfs_params = parse_properties(_conf_map);
|
||||
auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
|
||||
bm_log("file_path: {}", file_path);
|
||||
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
io::FileWriterPtr writer;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
@ -188,6 +168,8 @@ public:
|
||||
state.SetIterationTime(elapsed_seconds.count());
|
||||
bm_log("finish to run {}", _name);
|
||||
|
||||
state.counters["WriteRate"] = benchmark::Counter(write_size, benchmark::Counter::kIsRate);
|
||||
|
||||
if (writer != nullptr) {
|
||||
writer->close();
|
||||
}
|
||||
@ -206,13 +188,14 @@ public:
|
||||
|
||||
Status run(benchmark::State& state) override {
|
||||
bm_log("begin to run {}", _name);
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
std::string base_dir = _conf_map["baseDir"];
|
||||
std::string base_dir = _conf_map["base_dir"];
|
||||
io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
|
||||
THdfsParams hdfs_params = parse_properties(_conf_map);
|
||||
auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
|
||||
auto new_file_path = fmt::format("{}/test_{}_new", base_dir, state.thread_index());
|
||||
bm_log("file_path: {}", file_path);
|
||||
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
io::FileWriterPtr writer;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
@ -224,6 +207,9 @@ public:
|
||||
state.SetIterationTime(elapsed_seconds.count());
|
||||
bm_log("finish to run {}", _name);
|
||||
|
||||
state.counters["RenameCost"] =
|
||||
benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
|
||||
|
||||
if (writer != nullptr) {
|
||||
writer->close();
|
||||
}
|
||||
@ -233,38 +219,6 @@ public:
|
||||
private:
|
||||
};
|
||||
|
||||
class HdfsDeleteBenchmark : public BaseBenchmark {
|
||||
public:
|
||||
HdfsDeleteBenchmark(int threads, int iterations, size_t file_size,
|
||||
const std::map<std::string, std::string>& conf_map)
|
||||
: BaseBenchmark("HdfsDeleteBenchmark", threads, 1, file_size, 1, conf_map) {}
|
||||
virtual ~HdfsDeleteBenchmark() = default;
|
||||
|
||||
Status init() override { return Status::OK(); }
|
||||
|
||||
Status run(benchmark::State& state) override {
|
||||
bm_log("begin to run {}", _name);
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
std::string base_dir = _conf_map["baseDir"];
|
||||
io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
|
||||
THdfsParams hdfs_params = parse_properties(_conf_map);
|
||||
auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
|
||||
bm_log("file_path: {}", file_path);
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
RETURN_IF_ERROR(fs->delete_file(file_path));
|
||||
auto end = std::chrono::high_resolution_clock::now();
|
||||
auto elapsed_seconds =
|
||||
std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
|
||||
|
||||
state.SetIterationTime(elapsed_seconds.count());
|
||||
bm_log("finish to run {}", _name);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
private:
|
||||
};
|
||||
|
||||
class HdfsExistsBenchmark : public BaseBenchmark {
|
||||
public:
|
||||
HdfsExistsBenchmark(int threads, int iterations, size_t file_size,
|
||||
@ -276,12 +230,13 @@ public:
|
||||
|
||||
Status run(benchmark::State& state) override {
|
||||
bm_log("begin to run {}", _name);
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
std::string base_dir = _conf_map["baseDir"];
|
||||
std::string base_dir = _conf_map["base_dir"];
|
||||
io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
|
||||
THdfsParams hdfs_params = parse_properties(_conf_map);
|
||||
auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
|
||||
bm_log("file_path: {}", file_path);
|
||||
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
bool res = false;
|
||||
@ -292,6 +247,9 @@ public:
|
||||
|
||||
state.SetIterationTime(elapsed_seconds.count());
|
||||
bm_log("finish to run {}", _name);
|
||||
|
||||
state.counters["ExistsCost"] =
|
||||
benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
|
||||
return Status::OK();
|
||||
}
|
||||
};
|
||||
|
||||
@ -26,58 +26,6 @@ if [[ "$(uname -s)" == 'Darwin' ]] && command -v brew &>/dev/null; then
|
||||
export PATH
|
||||
fi
|
||||
|
||||
OPTS="$(getopt \
|
||||
-n "$0" \
|
||||
-o '' \
|
||||
-l 'conf:,fs_type:,operation:,threads:,iterations:,file_size:' \
|
||||
-- "$@")"
|
||||
|
||||
eval set -- "${OPTS}"
|
||||
|
||||
while true; do
|
||||
case "$1" in
|
||||
--conf)
|
||||
CONF="$2"
|
||||
shift 2
|
||||
;;
|
||||
--fs_type)
|
||||
FS_TYPE="$2"
|
||||
shift 2
|
||||
;;
|
||||
--operation)
|
||||
OPERATION="$2"
|
||||
shift 2
|
||||
;;
|
||||
--threads)
|
||||
THREADS="$2"
|
||||
shift 2
|
||||
;;
|
||||
--iterations)
|
||||
ITERATIONS="$2"
|
||||
shift 2
|
||||
;;
|
||||
--file_size)
|
||||
FILE_SIZE="$2"
|
||||
shift 2
|
||||
;;
|
||||
--)
|
||||
shift
|
||||
break
|
||||
;;
|
||||
*)
|
||||
echo "Internal error"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
echo "CONF: ${CONF}"
|
||||
echo "FS_TYPE: ${FS_TYPE}"
|
||||
echo "OPERATION: ${OPERATION}"
|
||||
echo "THREADS: ${THREADS}"
|
||||
echo "ITERATIONS: ${ITERATIONS}"
|
||||
echo "FILE_SIZE: ${FILE_SIZE}"
|
||||
|
||||
DORIS_HOME="$(
|
||||
cd "${curdir}/.."
|
||||
pwd
|
||||
@ -316,16 +264,5 @@ export LIBHDFS_OPTS="${final_java_opt}"
|
||||
# see https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile
|
||||
export JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:30000,dirty_decay_ms:30000,oversize_threshold:0,lg_tcache_max:16,prof_prefix:jeprof.out"
|
||||
|
||||
${LIMIT:+${LIMIT}} "${DORIS_HOME}/lib/fs_benchmark_tool" --conf "${CONF}" --fs_type="${FS_TYPE}" --operation="${OPERATION}" --threads="${THREADS}" --iterations="${ITERATIONS}" --file_size="${FILE_SIZE}" 2>&1 | tee "${LOG_DIR}/fs_benchmark_tool.log"
|
||||
|
||||
qps="0MB/s"
|
||||
latency="0ms"
|
||||
|
||||
eval "$(grep "median" "${LOG_DIR}/fs_benchmark_tool.log" | awk '{printf("qps=%sMB/s latency=%sms", "'"${FILE_SIZE}"'" / 1024 / 1024 / ($2 * "'"${THREADS}"'" / 1000), $2 * "'"${THREADS}"'")}')"
|
||||
|
||||
echo "------------------------------"
|
||||
echo " Benchmark Result "
|
||||
echo "------------------------------"
|
||||
echo "thread_num: ${THREADS}."
|
||||
echo "qps: ${qps}."
|
||||
echo "latency: ${latency}."
|
||||
echo "$@"
|
||||
${LIMIT:+${LIMIT}} "${DORIS_HOME}/lib/fs_benchmark_tool" "$@" 2>&1 | tee "${LOG_DIR}/fs_benchmark_tool.log"
|
||||
|
||||
4
build.sh
4
build.sh
@ -637,6 +637,10 @@ EOF
|
||||
cp -r -p "${DORIS_HOME}/be/output/lib/debug_info" "${DORIS_OUTPUT}/be/lib"/
|
||||
fi
|
||||
|
||||
if [[ "${BUILD_FS_BENCHMARK}" = "ON" ]]; then
|
||||
cp -r -p "${DORIS_HOME}/bin/run-fs-benchmark.sh" "${DORIS_OUTPUT}/be/bin/"/
|
||||
fi
|
||||
|
||||
extensions_modules=("")
|
||||
extensions_modules+=("java-udf")
|
||||
extensions_modules+=("jdbc-scanner")
|
||||
|
||||
Reference in New Issue
Block a user