From 622e6a63d8da152631337ab5c470e2dbe9fca3da Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 25 Dec 2023 10:25:33 +0000 Subject: [PATCH] Implement the ObBackupIoAdapter performance testing tool --- deps/oblib/src/lib/restore/ob_i_storage.cpp | 1 + .../lib/restore/test_object_storage.cpp | 44 + tools/ob_admin/CMakeLists.txt | 10 + .../io_bench/ob_admin_io_adapter_bench.cpp | 433 ++++++++++ .../io_bench/ob_admin_io_adapter_bench.h | 76 ++ tools/ob_admin/io_bench/task_executor.cpp | 785 ++++++++++++++++++ tools/ob_admin/io_bench/task_executor.h | 199 +++++ tools/ob_admin/main.cpp | 3 + 8 files changed, 1551 insertions(+) create mode 100644 tools/ob_admin/io_bench/ob_admin_io_adapter_bench.cpp create mode 100644 tools/ob_admin/io_bench/ob_admin_io_adapter_bench.h create mode 100644 tools/ob_admin/io_bench/task_executor.cpp create mode 100644 tools/ob_admin/io_bench/task_executor.h diff --git a/deps/oblib/src/lib/restore/ob_i_storage.cpp b/deps/oblib/src/lib/restore/ob_i_storage.cpp index 1631e3ac6..01fc36b06 100644 --- a/deps/oblib/src/lib/restore/ob_i_storage.cpp +++ b/deps/oblib/src/lib/restore/ob_i_storage.cpp @@ -169,6 +169,7 @@ int build_bucket_and_object_name(ObIAllocator &allocator, OB_LOG(WARN, "object name is empty", K(uri), K(ret), K(bucket_end)); } else if (OB_ISNULL(bucket_name_buff = static_cast(allocator.alloc(bucket_length + 1)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; OB_LOG(WARN, "failed to alloc bucket name buff", K(ret), K(uri), K(bucket_length)); } else if (OB_FAIL(databuff_printf(bucket_name_buff, bucket_length + 1, "%.*s", bucket_length, uri.ptr() + bucket_start))) { diff --git a/deps/oblib/unittest/lib/restore/test_object_storage.cpp b/deps/oblib/unittest/lib/restore/test_object_storage.cpp index d3312a32d..00562baeb 100644 --- a/deps/oblib/unittest/lib/restore/test_object_storage.cpp +++ b/deps/oblib/unittest/lib/restore/test_object_storage.cpp @@ -761,6 +761,50 @@ TEST_F(TestObjectStorage, test_append_rw) { ObStorageAppender appender; + const char write_content[] = "123"; + ASSERT_EQ(OB_SUCCESS, databuff_printf(uri, sizeof(uri), "%s/a.b/%ld.back", + dir_uri, ObTimeUtility::current_time())); + ASSERT_EQ(OB_SUCCESS, appender.open(uri, &info_base)); + ASSERT_EQ(OB_SUCCESS, appender.pwrite(write_content, strlen(write_content), 0)); + ASSERT_EQ(OB_SUCCESS, appender.seal_for_adaptive()); + ASSERT_EQ(OB_SUCCESS, appender.close()); + + ObStorageAdaptiveReader reader; + char read_buf[5] = {0}; + int64_t read_size = 0; + ASSERT_EQ(OB_SUCCESS, reader.open(uri, &info_base)); + ASSERT_EQ(OB_SUCCESS, reader.pread(read_buf, 5, 0, read_size)); + ASSERT_EQ(strlen(write_content), read_size); + ASSERT_EQ('2', read_buf[1]); + ASSERT_EQ(OB_SUCCESS, reader.close()); + ASSERT_EQ(OB_SUCCESS, util.del_file(uri, true)); + } + + { + ObStorageAppender appender; + const char write_content[] = "123"; + ASSERT_EQ(OB_SUCCESS, databuff_printf(uri, sizeof(uri), "%s/a.b/%ld", + dir_uri, ObTimeUtility::current_time())); + ASSERT_EQ(OB_SUCCESS, appender.open(uri, &info_base)); + ASSERT_EQ(OB_SUCCESS, appender.pwrite(write_content, strlen(write_content), 0)); + ASSERT_EQ(OB_SUCCESS, appender.seal_for_adaptive()); + ASSERT_EQ(OB_SUCCESS, appender.close()); + + ObStorageAdaptiveReader reader; + char read_buf[5] = {0}; + int64_t read_size = 0; + ASSERT_EQ(OB_SUCCESS, reader.open(uri, &info_base)); + ASSERT_EQ(OB_SUCCESS, reader.pread(read_buf, 5, 0, read_size)); + ASSERT_EQ(strlen(write_content), read_size); + ASSERT_EQ('2', read_buf[1]); + ASSERT_EQ(OB_SUCCESS, reader.close()); + ASSERT_EQ(OB_SUCCESS, util.del_file(uri, true)); + } + + { + ObStorageAppender appender; + ASSERT_EQ(OB_SUCCESS, databuff_printf(uri, sizeof(uri), "%s/test_append_file_%ld.back", + dir_uri, ObTimeUtility::current_time())); ASSERT_EQ(OB_SUCCESS, appender.open(uri, &info_base)); // first append const char first_write[] = "123"; diff --git a/tools/ob_admin/CMakeLists.txt b/tools/ob_admin/CMakeLists.txt index 4d43d94e2..a034c23a1 100644 --- a/tools/ob_admin/CMakeLists.txt +++ b/tools/ob_admin/CMakeLists.txt @@ -54,6 +54,11 @@ add_executable(ob_admin backup_tool/ob_admin_dump_backup_data_executor.h backup_tool/ob_admin_dump_backup_data_executor.cpp + io_bench/task_executor.h + io_bench/task_executor.cpp + io_bench/ob_admin_io_adapter_bench.h + io_bench/ob_admin_io_adapter_bench.cpp + #trx_tool/ob_admin_trx_executor.h #trx_tool/ob_admin_trx_executor.cpp @@ -116,6 +121,11 @@ add_executable(ob_admin backup_tool/ob_admin_dump_backup_data_executor.h backup_tool/ob_admin_dump_backup_data_executor.cpp + io_bench/task_executor.h + io_bench/task_executor.cpp + io_bench/ob_admin_io_adapter_bench.h + io_bench/ob_admin_io_adapter_bench.cpp + #trx_tool/ob_admin_trx_executor.h #trx_tool/ob_admin_trx_executor.cpp diff --git a/tools/ob_admin/io_bench/ob_admin_io_adapter_bench.cpp b/tools/ob_admin/io_bench/ob_admin_io_adapter_bench.cpp new file mode 100644 index 000000000..7a5a148f8 --- /dev/null +++ b/tools/ob_admin/io_bench/ob_admin_io_adapter_bench.cpp @@ -0,0 +1,433 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include +#include "ob_admin_io_adapter_bench.h" +#include "share/backup/ob_backup_io_adapter.h" +#include "../dumpsst/ob_admin_dumpsst_print_helper.h" +#include "src/logservice/archiveservice/ob_archive_file_utils.h" +#include "src/share/backup/ob_backup_path.h" +#include "src/share/backup/ob_backup_clean_util.h" + +using namespace oceanbase::share; +using namespace oceanbase::common; + +namespace oceanbase +{ +namespace tools +{ + +ObAdminIOAdapterBenchmarkExecutor::ObAdminIOAdapterBenchmarkExecutor() + : clean_before_execution_(false), clean_after_execution_(false), config_() +{ + MEMSET(base_path_, 0, sizeof(base_path_)); + MEMSET(storage_info_str_, 0, sizeof(storage_info_str_)); +} + +int ObAdminIOAdapterBenchmarkExecutor::execute(int argc, char *argv[]) +{ + int ret = OB_SUCCESS; + lib::set_memory_limit(16 * 1024 * 1024 * 1024LL); + lib::set_tenant_memory_limit(500, 16 * 1024 * 1024 * 1024LL); + OB_LOGGER.set_log_level("INFO"); + if (OB_FAIL(parse_cmd_(argc, argv))) { + OB_LOG(WARN, "failed to parse cmd", K(ret), K(argc), K(argv)); + } else if (OB_FAIL(run_all_tests_())) { + OB_LOG(WARN, "failed to pass all tests", K(ret), K_(base_path), K_(storage_info_str)); + } + return ret; +} + +int ObAdminIOAdapterBenchmarkExecutor::parse_cmd_(int argc, char *argv[]) +{ + int ret = OB_SUCCESS; + int opt = 0; + int index = -1; + const char *opt_str = "h:d:s:t:r:l:o:n:f:p:b:c:j:"; + struct option longopts[] = {{"help", 0, NULL, 'h'}, + {"file-path-prefix", 1, NULL, 'd'}, + {"storage-info", 1, NULL, 's'}, + {"thread-num", 1, NULL, 't'}, + {"max-task-run-times", 1, NULL, 'r'}, + {"time-limit", 1, NULL, 'l'}, + {"obj-size", 1, NULL, 'o'}, + {"obj-num", 1, NULL, 'n'}, + {"fragment-size", 1, NULL, 'f'}, + {"type", 1, NULL, 'p'}, + {"is-adaptive", 1, NULL, 'j'}, + {"clean-before-execution", 0, NULL, 'b'}, + {"clean-after-execution", 0, NULL, 'c'}, + {NULL, 0, NULL, 0}}; + while (OB_SUCC(ret) && -1 != (opt = getopt_long(argc, argv, opt_str, longopts, &index))) { + switch (opt) { + case 'h': { + print_usage_(); + exit(1); + } + case 'd': { + time_t timestamp = time(NULL); + struct tm *timeinfo = localtime(×tamp); + char buffer[OB_MAX_TIME_STR_LENGTH]; + strftime(buffer, sizeof(buffer), "%Y-%m-%d-%H:%M:%S", timeinfo); + if (OB_FAIL(databuff_printf(base_path_, sizeof(base_path_), "%s", optarg))) { + OB_LOG(WARN, "failed to construct base path", K(ret), K((char *)optarg), K(buffer)); + } + break; + } + case 's': { + if (OB_FAIL(databuff_printf(storage_info_str_, sizeof(storage_info_str_), "%s", optarg))) { + OB_LOG(WARN, "failed to copy storage info str", K(ret), K((char *)optarg)); + } + break; + } + case 't': { + if (OB_FAIL(c_str_to_int(optarg, config_.thread_num_))) { + OB_LOG(WARN, "fail to parse thread num", K(ret), K((char *)optarg)); + } + break; + } + case 'r': { + if (OB_FAIL(c_str_to_int(optarg, config_.max_task_runs_))) { + OB_LOG(WARN, "fail to parse max task runs", K(ret), K((char *)optarg)); + } + break; + } + case 'l': { + if (OB_FAIL(c_str_to_int(optarg, config_.time_limit_s_))) { + OB_LOG(WARN, "fail to parse time limit", K(ret), K((char *)optarg)); + } + break; + } + case 'o': { + if (OB_FAIL(c_str_to_int(optarg, config_.obj_size_))) { + OB_LOG(WARN, "fail to parse object size", K(ret), K((char *)optarg)); + } + break; + } + case 'n': { + if (OB_FAIL(c_str_to_int(optarg, config_.obj_num_))) { + OB_LOG(WARN, "fail to parse object num", K(ret), K((char *)optarg)); + } + break; + } + case 'f': { + if (OB_FAIL(c_str_to_int(optarg, config_.fragment_size_))) { + OB_LOG(WARN, "fail to parse fragment size", K(ret), K((char *)optarg)); + } + break; + } + case 'p': { + if (OB_ISNULL(optarg)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "type is NULL", K(ret), K((char *)optarg)); + } else if (0 == STRCMP("write", optarg)) { + config_.type_ = BenchmarkTaskType::BENCHMARK_TASK_NORMAL_WRITE; + } else if (0 == STRCMP("append", optarg)) { + config_.type_ = BenchmarkTaskType::BENCHMARK_TASK_APPEND_WRITE; + } else if (0 == STRCMP("multi", optarg)) { + config_.type_ = BenchmarkTaskType::BENCHMARK_TASK_MULTIPART_WRITE; + } else if (0 == STRCMP("read", optarg)) { + config_.type_ = BenchmarkTaskType::BENCHMARK_TASK_READ; + } else if (0 == STRCMP("del", optarg)) { + config_.type_ = BenchmarkTaskType::BENCHMARK_TASK_DEL; + } else { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "unknown test type", K((char *)optarg), K(ret)); + } + break; + } + case 'j': { + config_.is_adaptive_ = true; + break; + } + case 'b': { + clean_before_execution_ = true; + break; + } + case 'c': { + clean_after_execution_ = true; + break; + } + default: { + print_usage_(); + exit(1); + } + } + } + + char print_config_buf[OB_MAX_URI_LENGTH]; + config_.to_string(print_config_buf, sizeof(print_config_buf)); + OB_LOG(INFO, "Task Config", K_(config)); + PrintHelper::print_dump_line("Task Config", print_config_buf); + return ret; +} + +class CleanOp : public ObBaseDirEntryOperator +{ +public: + CleanOp(char *base_path, share::ObBackupStorageInfo *storage_info) + : base_path_(base_path), storage_info_(storage_info), cleaned_objects_(0) + { + uri_[0] = '\0'; + } + ~CleanOp() {} + int func(const dirent *entry) override; + + char uri_[OB_MAX_URI_LENGTH]; + char *base_path_; + share::ObBackupStorageInfo *storage_info_; + int64_t cleaned_objects_; +}; + +int CleanOp::func(const dirent *entry) +{ + int ret = OB_SUCCESS; + ObBackupIoAdapter adapter; + + if (OB_FAIL(databuff_printf(uri_, sizeof(uri_), "%s/%s", base_path_, entry->d_name))) { + OB_LOG(WARN, "fail to set uri", K(ret), K_(uri), KPC_(storage_info)); + } else if (OB_FAIL(adapter.del_file(uri_, storage_info_))) { + OB_LOG(WARN, "fail to delete file", K(ret), K_(uri), KPC_(storage_info)); + } else { + cleaned_objects_++; + } + return ret; +} + +int ObAdminIOAdapterBenchmarkExecutor::clean_base_path_(share::ObBackupStorageInfo &info) +{ + int ret = OB_SUCCESS; + PrintHelper::print_dump_title("Cleaning"); + if (OB_UNLIKELY(!info.is_valid())) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid storage info", K(ret), K(info)); + } else if (info.get_type() == ObStorageType::OB_STORAGE_FILE) { + char cmd[OB_MAX_URI_LENGTH] = { 0 }; + if (OB_FAIL(databuff_printf(cmd, OB_MAX_URI_LENGTH, + "rm -rf %s/*", base_path_ + strlen(OB_FILE_PREFIX)))) { + OB_LOG(WARN, "fail to fill clean cmd", K(ret), K_(base_path)); + } else if (0 != std::system(cmd)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "fail to delete dir", K(ret), K_(base_path), K(cmd)); + } + } else { + ObBackupIoAdapter adapter; + CleanOp op(base_path_, &info); + if (OB_FAIL(adapter.list_files(base_path_, &info, op))) { + OB_LOG(WARN, "fail to clean", K(ret), K_(base_path), K(info)); + } + PrintHelper::print_dump_line("Cleaned Objects", op.cleaned_objects_); + } + + PrintHelper::print_dump_line("Clean Status", OB_SUCC(ret) ? "SUCCESS" : "FAIL"); + return ret; +} + +int ObAdminIOAdapterBenchmarkExecutor::run_all_tests_() +{ + int ret = OB_SUCCESS; + share::ObBackupStorageInfo storage_info; + ObBackupIoAdapterBenchmarkRunner runner; + + if (FALSE_IT(init_random_content())) { + } else if (OB_FAIL(storage_info.set(base_path_, storage_info_str_))) { + OB_LOG(WARN, "failed to set storage info", K(ret), K_(base_path), K_(storage_info_str)); + } else if (OB_FAIL(runner.init(base_path_, &storage_info, config_))) { + OB_LOG(WARN, "fail to init ObBackupIoAdapterBenchmarkRunner", + K(ret), K_(base_path), K(storage_info), K_(config)); + } else { + if (clean_before_execution_) { + if (OB_FAIL(clean_base_path_(storage_info))) { + OB_LOG(WARN, "fail to clean base path", K(ret), K(storage_info), K_(base_path)); + } + } + + if (OB_SUCC(ret)) { + PrintHelper::print_dump_title("Testing"); + if (OB_FAIL(runner.do_benchmark())) { + OB_LOG(WARN, "fail to do benchmark", + K(ret), K_(base_path), K(storage_info), K_(config)); + } + } + + if (OB_SUCC(ret) && clean_after_execution_) { + if (OB_FAIL(clean_base_path_(storage_info))) { + OB_LOG(WARN, "fail to clean base path", K(ret), K(storage_info), K_(base_path)); + } + } + } + return ret; +} + +int ObAdminIOAdapterBenchmarkExecutor::print_usage_() +{ + int ret = OB_SUCCESS; + printf("\n"); + printf("Usage: bench_io_adapter command [command args] [options]\n"); + printf("commands:\n"); + printf(HELP_FMT, "-h, --help", "display this message."); + printf("options:\n"); + printf(HELP_FMT, "-d, --file-path-prefix", "absolute file path with file prefix"); + printf(HELP_FMT, "-s, --storage-info", "oss/cos should provide storage info"); + printf(HELP_FMT, "-t, --thread-num", "thread num"); + printf(HELP_FMT, "-r, --max-task-run-times", "max task run times for each thread"); + printf(HELP_FMT, "-l, --time-limit", "time limit in second"); + printf(HELP_FMT, "-o, --object-size", "object size"); + printf(HELP_FMT, "-n, --object-num", "object num"); + printf(HELP_FMT, "-f, --fragment-size", + "for read operations, 'fragment-size' denotes the expected size of data to be read, " + "while for append/multipart write tasks, it specifies the size of each individual pwrite operation."); + printf(HELP_FMT, "-p, --type", "task type"); + printf(HELP_FMT, "-j, --is-adaptive", "use adative interface"); + printf(HELP_FMT, "-b, --clean-before-execution", "clean before execution"); + printf(HELP_FMT, "-c, --clean-after-execution", "clean after execution"); + printf("samples:\n"); + printf(" test nfs device: \n"); + printf("\tob_admin bench_io_adapter -dfile:///home/admin/backup_info \n"); + printf(" test object device: \n"); + printf("\tob_admin bench_io_adapter -d'oss://home/admin/backup_info' " + "-s'host=xxx.com&access_id=111&access_key=222'\n"); + printf("\tob_admin bench_io_adapter -d'cos://home/admin/backup_info' " + "-s'host=xxx.com&access_id=111&access_key=222&appid=333'\n"); + printf("\tob_admin bench_io_adapter -d's3://home/admin/backup_info' " + "-s'host=xxx.com&access_id=111&access_key=222&s3_region=333'\n"); + return ret; +} + +/*--------------------------------ObBackupIoAdapterBenchmarkRunner--------------------------------*/ +ObBackupIoAdapterBenchmarkRunner::ObBackupIoAdapterBenchmarkRunner() + : lock_(), is_inited_(false), tg_id_(-1), ret_code_(OB_SUCCESS), + config_(), metrics_(), storage_info_(nullptr) +{ +} + +ObBackupIoAdapterBenchmarkRunner::~ObBackupIoAdapterBenchmarkRunner() +{ + destroy(); +} + +int ObBackupIoAdapterBenchmarkRunner::init(const char *base_uri, + share::ObBackupStorageInfo *storage_info, const TaskConfig &config) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + OB_LOG(WARN, "ObBackupIoAdapterBenchmarkRunner init twice", K(ret)); + } else if (OB_ISNULL(base_uri) || OB_ISNULL(storage_info) + || OB_UNLIKELY(!storage_info->is_valid() || config.thread_num_ <= 0) + || OB_UNLIKELY(config.time_limit_s_ <= 0 && config.max_task_runs_ <= 0)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid arguments", K(ret), K(base_uri), KPC(storage_info), K(config)); + } else if (OB_FAIL(databuff_printf(base_uri_, sizeof(base_uri_), "%s", base_uri))) { + OB_LOG(WARN, "fail to deep copy base uri", K(ret), K(base_uri)); + } else if (OB_FAIL(config_.assign(config))) { + OB_LOG(WARN, "fail to assign task config", K(ret), K(config)); + } else { + storage_info_ = storage_info; + is_inited_ = true; + } + return ret; +} + +void ObBackupIoAdapterBenchmarkRunner::destroy() +{ + if (tg_id_ >= 0) { + TG_STOP(tg_id_); + TG_WAIT(tg_id_); + tg_id_ = -1; + } + is_inited_ = false; +} + +int ObBackupIoAdapterBenchmarkRunner::do_benchmark() +{ + int ret = OB_SUCCESS; + const int64_t thread_num = config_.thread_num_; + const int64_t time_limit_s = config_.time_limit_s_; + struct rusage start_usage; + struct timeval start_real_time; + getrusage(RUSAGE_SELF, &start_usage); + gettimeofday(&start_real_time, nullptr); + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "ObBackupIoAdapterBenchmarkRunner not init", K(ret)); + } else if (OB_FAIL(TG_CREATE(lib::TGDefIDs::COMMON_THREAD_POOL, tg_id_))) { + OB_LOG(WARN, "create thread group failed", K(ret)); + } else if (OB_FAIL(TG_SET_RUNNABLE(tg_id_, *this))) { + OB_LOG(WARN, "set tg_runnable failed", K(ret), K_(tg_id)); + } else if (OB_FAIL(TG_SET_THREAD_CNT(tg_id_, thread_num))) { + OB_LOG(WARN, "set thread count failed", K(ret), K_(tg_id), K(thread_num)); + } else if (OB_FAIL(TG_START(tg_id_))) { + OB_LOG(WARN, "start thread failed", K(ret), K_(tg_id), K(thread_num)); + } else { + if (time_limit_s > 0) { + sleep(time_limit_s); + TG_STOP(tg_id_); + } + TG_WAIT(tg_id_); + + if (OB_SUCC(ret_code_)) { + metrics_.summary(start_real_time, start_usage, config_.thread_num_); + } else { + OB_LOG(WARN, "some threads failed, check log", K_(ret_code), K(thread_num)); + } + } + return ret; +} + +void ObBackupIoAdapterBenchmarkRunner::run1() +{ + int ret = OB_SUCCESS; + ObBackupIoAdapter util; + ITaskExecutor *executor = nullptr; + const uint64_t thread_idx = get_thread_idx(); + char uri[common::OB_MAX_URI_LENGTH]; + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "ObBackupIoAdapterBenchmarkRunner not init", K(ret)); + } else if (OB_FAIL(databuff_printf(uri, sizeof(uri), "%s/%ld", base_uri_, thread_idx))) { + OB_LOG(WARN, "fail to construct base task dir for current thread", + K(ret), K_(base_uri), K(thread_idx)); + } else if (OB_FAIL(util.mkdir(uri, storage_info_))) { + OB_LOG(WARN, "fail to make base task dir for current thread", + K(ret), K_(base_uri), K(thread_idx)); + } else if (OB_FAIL(init_task_executor(uri, storage_info_, config_, executor))) { + OB_LOG(WARN, "fail to create and init task executor", K(ret), K(uri), K(thread_idx)); + } else { + const bool is_limit_task_runs = (config_.max_task_runs_ > 0); + for (int64_t i = 0; OB_SUCC(ret) && !has_set_stop(); i++) { + if (is_limit_task_runs && i >= config_.max_task_runs_) { + break; + } else if (OB_FAIL(executor->execute())) { + OB_LOG(WARN, "fail to execute task", K(ret), K(i), K(uri), K(thread_idx)); + } + } + + if (OB_SUCC(ret)) { + SpinWLockGuard guard(lock_); + metrics_.add(executor->get_metrics()); + } else { + ret_code_ = ret; + } + } + + if (OB_NOT_NULL(executor)) { + executor->~ITaskExecutor(); + ob_free(executor); + executor = nullptr; + } +} + +} //tools +} //oceanbase \ No newline at end of file diff --git a/tools/ob_admin/io_bench/ob_admin_io_adapter_bench.h b/tools/ob_admin/io_bench/ob_admin_io_adapter_bench.h new file mode 100644 index 000000000..0b671d687 --- /dev/null +++ b/tools/ob_admin/io_bench/ob_admin_io_adapter_bench.h @@ -0,0 +1,76 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OB_ADMIN_IO_ADAPTER_BENCH_H_ +#define OB_ADMIN_IO_ADAPTER_BENCH_H_ +#include +#include "../ob_admin_executor.h" +#include "deps/oblib/src/lib/ob_define.h" // OB_MAX_URI_LENGTH +#include "share/backup/ob_backup_struct.h" // OB_MAX_BACKUP_STORAGE_INFO_LENGTH +#include "share/ob_thread_mgr.h" +#include "task_executor.h" + +namespace oceanbase +{ +namespace tools +{ + +class ObAdminIOAdapterBenchmarkExecutor : public ObAdminExecutor +{ +public: + ObAdminIOAdapterBenchmarkExecutor(); + virtual ~ObAdminIOAdapterBenchmarkExecutor() {} + virtual int execute(int argc, char *argv[]) override; + +private: + int parse_cmd_(int argc, char *argv[]); + int run_all_tests_(); + int print_usage_(); + int clean_base_path_(share::ObBackupStorageInfo &info); + +private: + char base_path_[common::OB_MAX_URI_LENGTH]; + char storage_info_str_[common::OB_MAX_BACKUP_STORAGE_INFO_LENGTH]; + bool clean_before_execution_; + bool clean_after_execution_; + TaskConfig config_; + +private: + static constexpr char *HELP_FMT = const_cast("\t%-30s%-12s\n"); + DISALLOW_COPY_AND_ASSIGN(ObAdminIOAdapterBenchmarkExecutor); +}; + +class ObBackupIoAdapterBenchmarkRunner : public lib::TGRunnable +{ +public: + ObBackupIoAdapterBenchmarkRunner(); + ~ObBackupIoAdapterBenchmarkRunner(); + int init(const char *base_uri, share::ObBackupStorageInfo *storage_info, const TaskConfig &config); + void destroy(); + int do_benchmark(); + virtual void run1() override; + +private: + common::SpinRWLock lock_; + bool is_inited_; + int tg_id_; + int ret_code_; + TaskConfig config_; + Metrics metrics_; + char base_uri_[common::OB_MAX_URI_LENGTH]; + share::ObBackupStorageInfo *storage_info_; +}; + +} //namespace tools +} //namespace oceanbase + +#endif // OB_ADMIN_IO_ADAPTER_BENCH_H_ \ No newline at end of file diff --git a/tools/ob_admin/io_bench/task_executor.cpp b/tools/ob_admin/io_bench/task_executor.cpp new file mode 100644 index 000000000..ad84471da --- /dev/null +++ b/tools/ob_admin/io_bench/task_executor.cpp @@ -0,0 +1,785 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "share/backup/ob_backup_io_adapter.h" +#include "task_executor.h" +#include "../dumpsst/ob_admin_dumpsst_print_helper.h" + +using namespace oceanbase::share; +using namespace oceanbase::common; + +namespace oceanbase +{ +namespace tools +{ + +/*--------------------------------TaskConfig--------------------------------*/ +TaskConfig::TaskConfig() + : thread_num_(-1), max_task_runs_(-1), time_limit_s_(-1), + obj_size_(-1), obj_num_(-1), fragment_size_(-1), is_adaptive_(false), + type_(BenchmarkTaskType::BENCHMARK_TASK_MAX_TYPE) +{ +} + +int TaskConfig::assign(const TaskConfig &other) +{ + int ret = OB_SUCCESS; + thread_num_ = other.thread_num_; + max_task_runs_ = other.max_task_runs_; + time_limit_s_ = other.time_limit_s_; + obj_size_ = other.obj_size_; + obj_num_ = other.obj_num_; + fragment_size_ = other.fragment_size_; + is_adaptive_ = other.is_adaptive_; + type_ = other.type_; + return ret; +} + +/*--------------------------------Metrics--------------------------------*/ +TimeMap::TimeMap() : total_entry_(0) +{ +} + +int TimeMap::log_entry(const int64_t start_time_us) +{ + int ret = OB_SUCCESS; + const int64_t cost_time_ms = (ObTimeUtility::current_time() - start_time_us) / 1000; + time_ms_map_[cost_time_ms]++; + total_entry_++; + return ret; +} + +int TimeMap::add(const TimeMap &other) +{ + int ret = OB_SUCCESS; + std::map::const_iterator it = other.time_ms_map_.begin(); + while (OB_SUCC(ret) && it != other.time_ms_map_.end()) { + time_ms_map_[it->first] += it->second; + ++it; + } + if (OB_SUCC(ret)) { + total_entry_ += other.total_entry_; + } + return ret; +} + +void TimeMap::summary(const char *map_name_str) +{ + const char *map_name = "Anonymous Time Map"; + if (OB_NOT_NULL(map_name_str)) { + map_name = map_name_str; + } + if (total_entry_ <= 0) { + PrintHelper::print_dump_line(map_name, "Empty Time Map"); + } else { + const int64_t th_50_count = total_entry_ * 0.5; + const int64_t th_90_count = total_entry_ * 0.9; + const int64_t th_99_count = total_entry_ * 0.99; + const int64_t th_999_count = total_entry_ * 0.999; + int64_t min_ms = 0; + int64_t th_50_ms = 0; + int64_t th_90_ms = 0; + int64_t th_99_ms = 0; + int64_t th_999_ms = 0; + int64_t cur_count = 0; + int64_t max_ms = 0; + + std::map::const_iterator it = time_ms_map_.begin(); + min_ms = it->first; + while (it != time_ms_map_.end()) { + cur_count += it->second; + if (th_50_ms == 0 && cur_count >= th_50_count) { + th_50_ms = it->first; + } + if (th_90_ms == 0 && cur_count >= th_90_count) { + th_90_ms = it->first; + } + if (th_99_ms == 0 && cur_count >= th_99_count) { + th_99_ms = it->first; + } + if (th_999_ms == 0 && cur_count >= th_999_count) { + th_999_ms = it->first; + } + if (max_ms == 0 && cur_count == total_entry_) { + max_ms = it->first; + } + + ++it; + } + + char buf[2048]; + int ret = OB_SUCCESS; + if (OB_FAIL(databuff_printf(buf, sizeof(buf), + "total_entry=%ld, min_ms=%ld, th_50_ms=%ld, th_90_ms=%ld, th_99_ms=%ld, th_999_ms=%ld, max_ms=%ld", + total_entry_, min_ms, th_50_ms, th_90_ms,th_99_ms, th_999_ms, max_ms))) { + OB_LOG(WARN, "fail to set log str", K(ret), K_(total_entry), + K(min_ms), K(th_50_ms), K(th_90_ms), K(th_99_ms), K(th_999_ms), K(max_ms)); + } else { + OB_LOG(INFO, "time map status", K(ret), K_(total_entry), + K(min_ms), K(th_50_ms), K(th_90_ms), K(th_99_ms), K(th_999_ms), K(max_ms)); + PrintHelper::print_dump_line(map_name, buf); + } + } +} + +int TimeMap::assign(const TimeMap &other) +{ + int ret = OB_SUCCESS; + total_entry_ = other.total_entry_; + time_ms_map_ = other.time_ms_map_; + return ret; +} + +Metrics::Metrics() + : status_(OB_SUCCESS), throughput_bytes_(0), operation_num_(0), + total_op_time_ms_map_(), open_time_ms_map_(), close_time_ms_map_() +{ +} + +int Metrics::assign(const Metrics &other) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(total_op_time_ms_map_.assign(other.total_op_time_ms_map_))) { + OB_LOG(WARN, "fail to assign total_op_time_ms_map", K(ret)); + } else if (OB_FAIL(open_time_ms_map_.assign(other.open_time_ms_map_))) { + OB_LOG(WARN, "fail to assign open_time_ms_map", K(ret)); + } else if (OB_FAIL(close_time_ms_map_.assign(other.close_time_ms_map_))) { + OB_LOG(WARN, "fail to assign close_time_ms_map", K(ret)); + } else { + status_ = other.status_; + throughput_bytes_ = other.throughput_bytes_; + operation_num_ = other.operation_num_; + } + return ret; +} + +int Metrics::add(const Metrics &other) +{ + int ret = OB_SUCCESS; + if (OB_SUCC(status_)) { + if (OB_SUCC(other.status_)) { + if (OB_FAIL(total_op_time_ms_map_.add(other.total_op_time_ms_map_))) { + OB_LOG(WARN, "fail to add total_op_time_ms_map", K(ret)); + } else if (OB_FAIL(open_time_ms_map_.add(other.open_time_ms_map_))) { + OB_LOG(WARN, "fail to add open_time_ms_map", K(ret)); + } else if (OB_FAIL(close_time_ms_map_.add(other.close_time_ms_map_))) { + OB_LOG(WARN, "fail to add close_time_ms_map", K(ret)); + } else { + throughput_bytes_ += other.throughput_bytes_; + operation_num_ += other.operation_num_; + } + } else { + status_ = other.status_; + } + } + return ret; +} + +static double cal_time_diff(const timeval& start, const timeval& end) +{ + return (end.tv_sec - start.tv_sec) + (end.tv_usec - start.tv_usec) / 1e6; +} + +void Metrics::summary( + const struct timeval &start_real_time, + const struct rusage &start_usage, + const int64_t thread_num) +{ + PrintHelper::print_dump_line("Status", status_ == OB_SUCCESS ? "SUCCESS" : "FAIL"); + if (OB_LIKELY(status_ == OB_SUCCESS)) { + if (OB_UNLIKELY(operation_num_ <= 0)) { + PrintHelper::print_dump_line("Operation num is unexpected", operation_num_); + } else { + struct rusage end_usage; + struct timeval end_real_time; + getrusage(RUSAGE_SELF, &end_usage); + gettimeofday(&end_real_time, nullptr); + + const double cost_time_s = cal_time_diff(start_real_time, end_real_time); + const double user_cpu_time_s = cal_time_diff(start_usage.ru_utime, end_usage.ru_utime); + const double sys_cpu_time_s = cal_time_diff(start_usage.ru_stime, end_usage.ru_stime); + const double cpu_usage = (user_cpu_time_s + sys_cpu_time_s) / (cost_time_s) * 100.0; + const double QPS = ((double)operation_num_) / cost_time_s; + const double BW = ( ((double)throughput_bytes_) / 1024.0 / 1024.0 ) / cost_time_s; + + PrintHelper::print_dump_line("Total operation num", operation_num_); + PrintHelper::print_dump_line("Total execution time", (std::to_string(cost_time_s) + " s").c_str()); + PrintHelper::print_dump_line("Total user time", (std::to_string(user_cpu_time_s) + " s").c_str()); + PrintHelper::print_dump_line("Total system time", (std::to_string(sys_cpu_time_s) + " s").c_str()); + if (BW > 1e-6) { + const double cpu_usage_for_100MB_bw = (100.0 / BW) * cpu_usage; + PrintHelper::print_dump_line("CPU usage for 100MB/s BW", + (std::to_string(cpu_usage_for_100MB_bw) + "% per 100MB/s").c_str()); + } else { + PrintHelper::print_dump_line("Total CPU usage", (std::to_string(cpu_usage) + "%").c_str()); + } + PrintHelper::print_dump_line("Total throughput bytes", throughput_bytes_); + PrintHelper::print_dump_line("Total QPS", std::to_string(QPS).c_str()); + PrintHelper::print_dump_line("Per Thread QPS", std::to_string(QPS / thread_num).c_str()); + PrintHelper::print_dump_line("Total BW", (std::to_string(BW) + " MB/s").c_str()); + PrintHelper::print_dump_line("Per Thread BW", (std::to_string(BW / thread_num) + " MB/s").c_str()); + total_op_time_ms_map_.summary("Total Op Time Map"); + open_time_ms_map_.summary("Open Time Map"); + close_time_ms_map_.summary("Close Op Time Map"); + } + } +} + +/*--------------------------------ITaskExecutor--------------------------------*/ +ITaskExecutor::ITaskExecutor() + : is_inited_(false), base_uri_len_(-1), storage_info_(nullptr), metrics_() +{ + base_uri_[0] = '\0'; +} + +void ITaskExecutor::reset() +{ + is_inited_ = false; + base_uri_len_ = -1; + storage_info_ = nullptr; + base_uri_[0] = '\0'; +} + +int ITaskExecutor::init(const char *base_uri, + share::ObBackupStorageInfo *storage_info, const TaskConfig &config) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + OB_LOG(WARN, "Task Executor init twice", K(ret)); + } else if (OB_ISNULL(base_uri) || OB_ISNULL(storage_info) + || OB_UNLIKELY(!storage_info->is_valid())) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid arguments", K(ret), K(base_uri), KPC(storage_info)); + } else if (OB_FAIL(databuff_printf(base_uri_, sizeof(base_uri_), "%s", base_uri))) { + OB_LOG(WARN, "fail to deep copy base uri", K(ret), K(base_uri)); + } else { + base_uri_len_ = strlen(base_uri_); + storage_info_ = storage_info; + is_inited_ = true; + } + return ret; +} + +int ITaskExecutor::prepare_(const int64_t object_id) +{ + int ret = OB_SUCCESS; + int64_t pos = base_uri_len_; + if (OB_FAIL(databuff_printf(base_uri_, sizeof(base_uri_), pos, "/%ld", object_id))) { + OB_LOG(WARN, "fail to construct object name", K(ret), K_(base_uri), K(object_id)); + } + return ret; +} + +void ITaskExecutor::finish_(const int64_t ob_errcode) +{ + metrics_.status_ = ob_errcode; + base_uri_[base_uri_len_] = '\0'; +} + +template +typename std::enable_if::value, int>::type +alloc_executor(ITaskExecutor *&executor, ObMemAttr &attr) +{ + int ret = OB_SUCCESS; + void *buf = nullptr; + if (OB_ISNULL(buf = ob_malloc(sizeof(Executor), attr))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN,"fail to alloc buf for task executor", K(ret)); + } else { + executor = new(buf) Executor(); + } + return ret; +} + +int init_task_executor(const char *base_uri, + share::ObBackupStorageInfo *storage_info, const TaskConfig &config, ITaskExecutor *&executor) +{ + int ret = OB_SUCCESS; + executor = nullptr; + void *buf = nullptr; + ObMemAttr attr; + attr.label_ = "TMPSTR"; + if (OB_ISNULL(base_uri) || OB_ISNULL(storage_info) + || OB_UNLIKELY(!storage_info->is_valid() || config.type_ == BenchmarkTaskType::BENCHMARK_TASK_MAX_TYPE)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid arguments", K(ret), K(base_uri), KPC(storage_info), K(config)); + } else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_NORMAL_WRITE) { + if (OB_FAIL(alloc_executor(executor, attr))) { + OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config)); + } + } else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_READ) { + if (OB_FAIL(alloc_executor(executor, attr))) { + OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config)); + } + } else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_APPEND_WRITE) { + if (OB_FAIL(alloc_executor(executor, attr))) { + OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config)); + } + } else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_MULTIPART_WRITE) { + if (OB_FAIL(alloc_executor(executor, attr))) { + OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config)); + } + } else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_DEL) { + if (OB_FAIL(alloc_executor(executor, attr))) { + OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config)); + } + } else { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "unknown benchmark task type", K(ret), K(config)); + } + + if (FAILEDx(executor->init(base_uri, storage_info, config))) { + OB_LOG(WARN, "fail to init executor", K(ret), K(base_uri), KPC(storage_info), K(config)); + } + + if (OB_FAIL(ret)) { + if (OB_NOT_NULL(executor)) { + executor->~ITaskExecutor(); + ob_free(executor); + executor = nullptr; + } + } + return ret; +} + +/*--------------------------------Write Task Executor--------------------------------*/ +static const int64_t MAX_RANDOM_CONTENT_LEN = 128 * 1024 * 1024L; +static char RANDOM_CONTENT[MAX_RANDOM_CONTENT_LEN + 1]; +void init_random_content() +{ + static bool is_inited = false; + if (!is_inited) { + static const char alphanum[] = + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + for (int64_t i = 0; i < MAX_RANDOM_CONTENT_LEN; i++) { + RANDOM_CONTENT[i] = alphanum[ObRandom::rand(0, sizeof(alphanum) - 2)]; + } + RANDOM_CONTENT[MAX_RANDOM_CONTENT_LEN] = '\0'; + is_inited = true; + } +} + +WriteTaskExecutor::WriteTaskExecutor() + : ITaskExecutor(), obj_size_(-1), write_buf_(nullptr), allocator_() +{ +} + +void WriteTaskExecutor::reset() +{ + obj_size_ = -1; + write_buf_ = nullptr; + allocator_.clear(); + ITaskExecutor::reset(); +} + +int WriteTaskExecutor::init(const char *base_uri, + share::ObBackupStorageInfo *storage_info, const TaskConfig &config) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) { + OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config)); + } else if (OB_UNLIKELY(config.obj_size_ <= 0 || config.obj_size_ >= MAX_RANDOM_CONTENT_LEN)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid arguments", K(ret), K(config)); + } else if (FALSE_IT(obj_size_ = config.obj_size_)) { + } else if (OB_ISNULL(write_buf_ = (char *)allocator_.alloc(obj_size_ + 1))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "fail to alloc memory for write buf", K(ret), K_(obj_size)); + } else { + is_inited_ = true; + } + + if (OB_FAIL(ret)) { + reset(); + } + return ret; +} + +int WriteTaskExecutor::execute() +{ + int ret = OB_SUCCESS; + const int64_t object_id = metrics_.operation_num_; + const int64_t start_time_us = ObTimeUtility::current_time(); + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "WriteTaskExecutor not init", K(ret), K_(base_uri)); + } else if (OB_FAIL(prepare_(object_id))) { + OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id)); + } else { + ObBackupIoAdapter adapter; + MEMCPY(write_buf_, + RANDOM_CONTENT + ObRandom::rand(0, MAX_RANDOM_CONTENT_LEN - obj_size_), + obj_size_); + write_buf_[obj_size_] = '\0'; + + if (OB_FAIL(adapter.write_single_file(base_uri_, storage_info_, write_buf_, obj_size_))) { + OB_LOG(WARN, "fail to write file", + K(ret), K_(base_uri), KPC_(storage_info), K_(obj_size), K(object_id)); + } else { + metrics_.operation_num_++; + metrics_.throughput_bytes_ += obj_size_; + metrics_.total_op_time_ms_map_.log_entry(start_time_us); + } + } + + finish_(ret); + return ret; +} + +AppendWriteTaskExecutor::AppendWriteTaskExecutor() + : ITaskExecutor(), obj_size_(-1), fragment_size_(-1), write_buf_(nullptr), allocator_() +{ +} + +void AppendWriteTaskExecutor::reset() +{ + obj_size_ = -1; + fragment_size_ = -1; + write_buf_ = nullptr; + allocator_.clear(); + ITaskExecutor::reset(); +} + +int AppendWriteTaskExecutor::init(const char *base_uri, + share::ObBackupStorageInfo *storage_info, const TaskConfig &config) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) { + OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config)); + } else if (OB_UNLIKELY(config.fragment_size_ <= 0 || config.fragment_size_ > config.obj_size_ + || config.fragment_size_ >= MAX_RANDOM_CONTENT_LEN)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid arguments", K(ret), K(config)); + } else if (FALSE_IT(obj_size_ = config.obj_size_)) { + } else if (FALSE_IT(fragment_size_ = config.fragment_size_)) { + } else if (OB_ISNULL(write_buf_ = (char *)allocator_.alloc(fragment_size_ + 1))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "fail to alloc memory for write buf", K(ret), K_(fragment_size)); + } else { + is_inited_ = true; + } + + if (OB_FAIL(ret)) { + reset(); + } + return ret; +} + +int AppendWriteTaskExecutor::execute() +{ + int ret = OB_SUCCESS; + const int64_t object_id = metrics_.operation_num_; + const int64_t start_time_us = ObTimeUtility::current_time(); + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "AppendWriteTaskExecutor not init", K(ret), K_(base_uri)); + } else if (OB_FAIL(prepare_(object_id))) { + OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id)); + } else { + ObBackupIoAdapter adapter; + ObIOFd fd; + ObIODevice *device_handle = nullptr; + ObStorageAccessType access_type = OB_STORAGE_ACCESS_RANDOMWRITER; + + const int64_t open_start_time_us = ObTimeUtility::current_time(); + if (OB_FAIL(adapter.open_with_access_type( + device_handle, fd, storage_info_, base_uri_, access_type))) { + OB_LOG(WARN, "failed to open device with access type", + K(ret), K_(base_uri), KPC_(storage_info), K(access_type)); + } else if (OB_ISNULL(device_handle)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "device handle is NULL", K(ret), KP(device_handle), K_(base_uri)); + } else { + metrics_.open_time_ms_map_.log_entry(open_start_time_us); + } + + int64_t cur_offset = 0; + int64_t actual_write_size = -1; + int64_t cur_append_size = -1; + while (OB_SUCC(ret) && cur_offset < obj_size_) { + cur_append_size = MIN(obj_size_ - cur_offset, fragment_size_); + MEMCPY(write_buf_, + RANDOM_CONTENT + ObRandom::rand(0, MAX_RANDOM_CONTENT_LEN - cur_append_size), + cur_append_size); + write_buf_[cur_append_size] = '\0'; + + if (OB_FAIL(device_handle->pwrite(fd, cur_offset, cur_append_size, + write_buf_, actual_write_size))) { + OB_LOG(WARN, "fail to append object", + K(ret), K_(base_uri), K(cur_offset), K(cur_append_size)); + } else { + cur_offset += cur_append_size; + } + } + + const int64_t close_start_time_us = ObTimeUtility::current_time(); + if (FAILEDx(device_handle->seal_file(fd))) { + OB_LOG(WARN, "fail to seal file", K(ret), K_(base_uri)); + } else if (OB_FAIL(adapter.close_device_and_fd(device_handle, fd))) { + OB_LOG(WARN, "fail to close device handle", K(ret), K_(base_uri)); + } else { + metrics_.close_time_ms_map_.log_entry(close_start_time_us); + } + + if (OB_SUCC(ret)) { + metrics_.operation_num_++; + metrics_.throughput_bytes_ += obj_size_; + metrics_.total_op_time_ms_map_.log_entry(start_time_us); + } + } + + finish_(ret); + return ret; +} + +MultipartWriteTaskExecutor::MultipartWriteTaskExecutor() + : ITaskExecutor(), obj_size_(-1), part_size_(-1), write_buf_(nullptr), allocator_() +{ +} + +void MultipartWriteTaskExecutor::reset() +{ + obj_size_ = -1; + part_size_ = -1; + write_buf_ = nullptr; + allocator_.clear(); + ITaskExecutor::reset(); +} + +int MultipartWriteTaskExecutor::init(const char *base_uri, + share::ObBackupStorageInfo *storage_info, const TaskConfig &config) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) { + OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config)); + } else if (OB_UNLIKELY(config.fragment_size_ <= 0 || config.fragment_size_ > config.obj_size_ + || config.fragment_size_ >= MAX_RANDOM_CONTENT_LEN)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid arguments", K(ret), K(config)); + } else if (FALSE_IT(obj_size_ = config.obj_size_)) { + } else if (FALSE_IT(part_size_ = config.fragment_size_)) { + } else if (OB_ISNULL(write_buf_ = (char *)allocator_.alloc(part_size_ + 1))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "fail to alloc memory for write buf", K(ret), K_(part_size)); + } else { + is_inited_ = true; + } + + if (OB_FAIL(ret)) { + reset(); + } + return ret; +} + +int MultipartWriteTaskExecutor::execute() +{ + int ret = OB_SUCCESS; + const int64_t object_id = metrics_.operation_num_; + const int64_t start_time_us = ObTimeUtility::current_time(); + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "MultipartWriteTaskExecutor not init", K(ret), K_(base_uri)); + } else if (OB_FAIL(prepare_(object_id))) { + OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id)); + } else { + ObBackupIoAdapter adapter; + ObIOFd fd; + ObIODevice *device_handle = nullptr; + ObStorageAccessType access_type = OB_STORAGE_ACCESS_MULTIPART_WRITER; + + const int64_t open_start_time_us = ObTimeUtility::current_time(); + if (OB_FAIL(adapter.open_with_access_type( + device_handle, fd, storage_info_, base_uri_, access_type))) { + OB_LOG(WARN, "failed to open device with access type", + K(ret), K_(base_uri), KPC_(storage_info), K(access_type)); + } else if (OB_ISNULL(device_handle)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "device handle is NULL", K(ret), KP(device_handle), K_(base_uri)); + } else { + metrics_.open_time_ms_map_.log_entry(open_start_time_us); + } + + int64_t cur_offset = 0; + int64_t actual_write_size = -1; + int64_t cur_part_size = -1; + while (OB_SUCC(ret) && cur_offset < obj_size_) { + cur_part_size = MIN(obj_size_ - cur_offset, part_size_); + MEMCPY(write_buf_, + RANDOM_CONTENT + ObRandom::rand(0, MAX_RANDOM_CONTENT_LEN - cur_part_size), + cur_part_size); + write_buf_[cur_part_size] = '\0'; + + if (OB_FAIL(device_handle->pwrite(fd, cur_offset, cur_part_size, + write_buf_, actual_write_size))) { + OB_LOG(WARN, "fail to upload part", + K(ret), K_(base_uri), K(cur_offset), K(cur_part_size)); + } else { + cur_offset += cur_part_size; + } + } + + const int64_t close_start_time_us = ObTimeUtility::current_time(); + if (FAILEDx(adapter.close_device_and_fd(device_handle, fd))) { + OB_LOG(WARN, "fail to close device handle", K(ret), K_(base_uri)); + } else { + metrics_.close_time_ms_map_.log_entry(close_start_time_us); + } + + if (OB_SUCC(ret)) { + metrics_.operation_num_++; + metrics_.throughput_bytes_ += obj_size_; + metrics_.total_op_time_ms_map_.log_entry(start_time_us); + } + } + + finish_(ret); + return ret; +} + +/*--------------------------------Read Task Executor--------------------------------*/ +ReadTaskExecutor::ReadTaskExecutor() + : ITaskExecutor(), obj_size_(-1), obj_num_(-1), is_adaptive_(false), + expected_read_size_(-1), read_buf_(nullptr), allocator_() +{ +} + +void ReadTaskExecutor::reset() +{ + obj_size_ = -1; + obj_num_ = -1; + is_adaptive_ = false; + expected_read_size_ = -1; + read_buf_ = nullptr; + allocator_.clear(); + ITaskExecutor::reset(); +} + +int ReadTaskExecutor::init(const char *base_uri, + share::ObBackupStorageInfo *storage_info, const TaskConfig &config) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) { + OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config)); + } else if (OB_UNLIKELY(config.fragment_size_ <= 0 || config.fragment_size_ > config.obj_size_ + || config.obj_num_ <= 0)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid arguments", K(ret), K(config)); + } else if (FALSE_IT(obj_size_ = config.obj_size_)) { + } else if (FALSE_IT(expected_read_size_ = config.fragment_size_)) { + } else if (OB_ISNULL(read_buf_ = (char *)allocator_.alloc(expected_read_size_ + 1))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "fail to alloc memory for read buf", K(ret), K_(obj_size), K_(expected_read_size)); + } else { + obj_num_ = config.obj_num_; + is_adaptive_ = config.is_adaptive_; + is_inited_ = true; + } + + if (OB_FAIL(ret)) { + reset(); + } + return ret; +} + +int ReadTaskExecutor::execute() +{ + int ret = OB_SUCCESS; + const int64_t object_id = ObRandom::rand(0, obj_num_ - 1); + const int64_t start_time_us = ObTimeUtility::current_time(); + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "ReadTaskExecutor not init", K(ret), K_(base_uri)); + } else if (OB_FAIL(prepare_(object_id))) { + OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id)); + } else { + ObBackupIoAdapter adapter; + const int64_t offset = (ObRandom::rand(0, obj_size_ - expected_read_size_) / ALIGNMENT) * ALIGNMENT; + int64_t read_size = -1; + + if (is_adaptive_ && OB_FAIL(adapter.adaptively_read_part_file(base_uri_, + storage_info_, read_buf_, expected_read_size_, offset, read_size))) { + OB_LOG(WARN, "fail to read adaptive file", K(ret), K_(base_uri), + KPC_(storage_info), K_(expected_read_size), K_(obj_size), K(offset), K(object_id)); + } else if (!is_adaptive_ && OB_FAIL(adapter.read_part_file(base_uri_, + storage_info_, read_buf_, expected_read_size_, offset, read_size))) { + OB_LOG(WARN, "fail to read file", K(ret), K_(base_uri), + KPC_(storage_info), K_(expected_read_size), K_(obj_size), K(offset), K(object_id)); + } else { + metrics_.operation_num_++; + metrics_.throughput_bytes_ += read_size; + metrics_.total_op_time_ms_map_.log_entry(start_time_us); + } + } + + finish_(ret); + return ret; +} + +/*--------------------------------Del Task Executor--------------------------------*/ +DelTaskExecutor::DelTaskExecutor() + : ITaskExecutor(), is_adaptive_(false) +{ +} + +int DelTaskExecutor::init(const char *base_uri, + share::ObBackupStorageInfo *storage_info, const TaskConfig &config) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) { + OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config)); + } else { + is_adaptive_ = config.is_adaptive_; + is_inited_ = true; + } + + if (OB_FAIL(ret)) { + reset(); + } + return ret; +} + +int DelTaskExecutor::execute() +{ + int ret = OB_SUCCESS; + const int64_t object_id = metrics_.operation_num_; + const int64_t start_time_us = ObTimeUtility::current_time(); + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "DelTaskExecutor not init", K(ret), K_(base_uri)); + } else if (OB_FAIL(prepare_(object_id))) { + OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id)); + } else { + ObBackupIoAdapter adapter; + + if (is_adaptive_ && OB_FAIL(adapter.adaptively_del_file(base_uri_, storage_info_))) { + OB_LOG(WARN, "fail to delete adaptive file", + K(ret), K_(base_uri), KPC_(storage_info), K(object_id)); + } else if (!is_adaptive_ && OB_FAIL(adapter.del_file(base_uri_, storage_info_))) { + OB_LOG(WARN, "fail to delete file", K(ret), K_(base_uri), KPC_(storage_info), K(object_id)); + } else { + metrics_.operation_num_++; + metrics_.total_op_time_ms_map_.log_entry(start_time_us); + } + } + + finish_(ret); + return ret; +} + +} //tools +} //oceanbase \ No newline at end of file diff --git a/tools/ob_admin/io_bench/task_executor.h b/tools/ob_admin/io_bench/task_executor.h new file mode 100644 index 000000000..0304270bf --- /dev/null +++ b/tools/ob_admin/io_bench/task_executor.h @@ -0,0 +1,199 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OB_ADMIN_BENCHMARK_TASK_EXECUTOR_H_ +#define OB_ADMIN_BENCHMARK_TASK_EXECUTOR_H_ + +#include "share/backup/ob_backup_struct.h" +#include "deps/oblib/src/lib/ob_define.h" +#include +#include +#include + +namespace oceanbase +{ +namespace tools +{ + +enum BenchmarkTaskType +{ + BENCHMARK_TASK_NORMAL_WRITE = 0, + BENCHMARK_TASK_APPEND_WRITE = 1, + BENCHMARK_TASK_MULTIPART_WRITE = 2, + BENCHMARK_TASK_READ = 3, + BENCHMARK_TASK_DEL = 4, + BENCHMARK_TASK_MAX_TYPE +}; + +struct TaskConfig +{ + TaskConfig(); + int assign(const TaskConfig &other); + TO_STRING_KV(K_(thread_num), K_(max_task_runs), K_(time_limit_s), + K_(obj_size), K_(obj_num), K_(fragment_size), K_(is_adaptive), K_(type)); + + int64_t thread_num_; + int64_t max_task_runs_; // 每个线程执行次数 + int64_t time_limit_s_; + int64_t obj_size_; + int64_t obj_num_; + int64_t fragment_size_; + bool is_adaptive_; + BenchmarkTaskType type_; +}; + +struct TimeMap +{ + TimeMap(); + int log_entry(const int64_t start_time_us); + int add(const TimeMap &other); + int assign(const TimeMap &other); + void summary(const char *map_name_str); + + int64_t total_entry_; + std::map time_ms_map_; +}; + +struct Metrics +{ + Metrics(); + int assign(const Metrics &other); + int add(const Metrics &other); + void summary( + const struct timeval &start_real_time, + const struct rusage &start_usage, + const int64_t thread_num); + + int64_t status_; + int64_t throughput_bytes_; + int64_t operation_num_; + TimeMap total_op_time_ms_map_; + TimeMap open_time_ms_map_; + TimeMap close_time_ms_map_; +}; + +class ITaskExecutor +{ +public: + ITaskExecutor(); + virtual ~ITaskExecutor() {} + void reset(); + virtual int init(const char *base_uri, + share::ObBackupStorageInfo *storage_info, const TaskConfig &config); + virtual int execute() { return OB_SUCCESS; }; + const Metrics &get_metrics() { return metrics_; } + +protected: + int prepare_(const int64_t object_id); + void finish_(const int64_t ob_errcode); + +protected: + bool is_inited_; + int64_t base_uri_len_; + char base_uri_[common::OB_MAX_URI_LENGTH]; + share::ObBackupStorageInfo *storage_info_; + Metrics metrics_; +}; + +void init_random_content(); +int init_task_executor(const char *base_uri, + share::ObBackupStorageInfo *storage_info, const TaskConfig &config, ITaskExecutor *&executor); + +class WriteTaskExecutor : public ITaskExecutor +{ +public: + WriteTaskExecutor(); + virtual ~WriteTaskExecutor() {} + void reset(); + virtual int init(const char *base_uri, + share::ObBackupStorageInfo *storage_info, const TaskConfig &config) override; + virtual int execute() override; + +private: + int64_t obj_size_; + char *write_buf_; + ObArenaAllocator allocator_; +}; + +class AppendWriteTaskExecutor : public ITaskExecutor +{ +public: + AppendWriteTaskExecutor(); + virtual ~AppendWriteTaskExecutor() {} + void reset(); + virtual int init(const char *base_uri, + share::ObBackupStorageInfo *storage_info, const TaskConfig &config) override; + virtual int execute() override; + +private: + int64_t obj_size_; + int64_t fragment_size_; + char *write_buf_; + ObArenaAllocator allocator_; +}; + +class MultipartWriteTaskExecutor : public ITaskExecutor +{ +public: + MultipartWriteTaskExecutor(); + virtual ~MultipartWriteTaskExecutor() {} + void reset(); + virtual int init(const char *base_uri, + share::ObBackupStorageInfo *storage_info, const TaskConfig &config) override; + virtual int execute() override; + +private: + int64_t obj_size_; + int64_t part_size_; + char *write_buf_; + ObArenaAllocator allocator_; +}; + +class ReadTaskExecutor : public ITaskExecutor +{ +public: + ReadTaskExecutor(); + virtual ~ReadTaskExecutor() {} + void reset(); + virtual int init(const char *base_uri, + share::ObBackupStorageInfo *storage_info, const TaskConfig &config) override; + virtual int execute() override; + +private: + int64_t obj_size_; + int64_t obj_num_; + bool is_adaptive_; + int64_t expected_read_size_; + char *read_buf_; + ObArenaAllocator allocator_; + +private: + static const int64_t ALIGNMENT = 16 * 1024; +}; + +class DelTaskExecutor : public ITaskExecutor +{ +public: + DelTaskExecutor(); + virtual ~DelTaskExecutor() {} + virtual int init(const char *base_uri, + share::ObBackupStorageInfo *storage_info, const TaskConfig &config) override; + virtual int execute() override; + +private: + bool is_adaptive_; +}; + +} //namespace tools +} //namespace oceanbase + +#endif // OB_ADMIN_BENCHMARK_TASK_EXECUTOR_H_ \ No newline at end of file diff --git a/tools/ob_admin/main.cpp b/tools/ob_admin/main.cpp index c6a2dba73..78cfb8b89 100644 --- a/tools/ob_admin/main.cpp +++ b/tools/ob_admin/main.cpp @@ -29,6 +29,7 @@ #include "log_tool/ob_admin_log_tool_executor.h" #include "slog_tool/ob_admin_slog_executor.h" #include "dump_ckpt/ob_admin_dump_ckpt_executor.h" +#include "io_bench/ob_admin_io_adapter_bench.h" #include "lib/utility/ob_print_utils.h" using namespace oceanbase::common; @@ -143,6 +144,8 @@ int main(int argc, char *argv[]) executor = new ObAdminSlogExecutor(); } else if (0 == strcmp("dump_ckpt", argv[1])) { executor = new ObAdminDumpCkptExecutor(); + } else if (0 == strcmp("io_adapter_benchmark", argv[1])) { + executor = new ObAdminIOAdapterBenchmarkExecutor(); } else if (0 == strncmp("-h", argv[1], 2) || 0 == strncmp("-S", argv[1], 2)) { executor = new ObAdminServerExecutor(); } else {