Implement the ObBackupIoAdapter performance testing tool

This commit is contained in:
obdev
2024-02-08 09:53:26 +00:00
committed by ob-robot
parent 0385f460a4
commit bb1fc9e320
8 changed files with 1551 additions and 0 deletions

View File

@ -169,6 +169,7 @@ int build_bucket_and_object_name(ObIAllocator &allocator,
OB_LOG(WARN, "object name is empty", K(uri), K(ret), K(bucket_end));
} else if (OB_ISNULL(bucket_name_buff =
static_cast<char *>(allocator.alloc(bucket_length + 1)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "failed to alloc bucket name buff", K(ret), K(uri), K(bucket_length));
} else if (OB_FAIL(databuff_printf(bucket_name_buff, bucket_length + 1,
"%.*s", bucket_length, uri.ptr() + bucket_start))) {

View File

@ -761,6 +761,50 @@ TEST_F(TestObjectStorage, test_append_rw)
{
ObStorageAppender appender;
const char write_content[] = "123";
ASSERT_EQ(OB_SUCCESS, databuff_printf(uri, sizeof(uri), "%s/a.b/%ld.back",
dir_uri, ObTimeUtility::current_time()));
ASSERT_EQ(OB_SUCCESS, appender.open(uri, &info_base));
ASSERT_EQ(OB_SUCCESS, appender.pwrite(write_content, strlen(write_content), 0));
ASSERT_EQ(OB_SUCCESS, appender.seal_for_adaptive());
ASSERT_EQ(OB_SUCCESS, appender.close());
ObStorageAdaptiveReader reader;
char read_buf[5] = {0};
int64_t read_size = 0;
ASSERT_EQ(OB_SUCCESS, reader.open(uri, &info_base));
ASSERT_EQ(OB_SUCCESS, reader.pread(read_buf, 5, 0, read_size));
ASSERT_EQ(strlen(write_content), read_size);
ASSERT_EQ('2', read_buf[1]);
ASSERT_EQ(OB_SUCCESS, reader.close());
ASSERT_EQ(OB_SUCCESS, util.del_file(uri, true));
}
{
ObStorageAppender appender;
const char write_content[] = "123";
ASSERT_EQ(OB_SUCCESS, databuff_printf(uri, sizeof(uri), "%s/a.b/%ld",
dir_uri, ObTimeUtility::current_time()));
ASSERT_EQ(OB_SUCCESS, appender.open(uri, &info_base));
ASSERT_EQ(OB_SUCCESS, appender.pwrite(write_content, strlen(write_content), 0));
ASSERT_EQ(OB_SUCCESS, appender.seal_for_adaptive());
ASSERT_EQ(OB_SUCCESS, appender.close());
ObStorageAdaptiveReader reader;
char read_buf[5] = {0};
int64_t read_size = 0;
ASSERT_EQ(OB_SUCCESS, reader.open(uri, &info_base));
ASSERT_EQ(OB_SUCCESS, reader.pread(read_buf, 5, 0, read_size));
ASSERT_EQ(strlen(write_content), read_size);
ASSERT_EQ('2', read_buf[1]);
ASSERT_EQ(OB_SUCCESS, reader.close());
ASSERT_EQ(OB_SUCCESS, util.del_file(uri, true));
}
{
ObStorageAppender appender;
ASSERT_EQ(OB_SUCCESS, databuff_printf(uri, sizeof(uri), "%s/test_append_file_%ld.back",
dir_uri, ObTimeUtility::current_time()));
ASSERT_EQ(OB_SUCCESS, appender.open(uri, &info_base));
// first append
const char first_write[] = "123";

View File

@ -54,6 +54,11 @@ add_executable(ob_admin
backup_tool/ob_admin_dump_backup_data_executor.h
backup_tool/ob_admin_dump_backup_data_executor.cpp
io_bench/task_executor.h
io_bench/task_executor.cpp
io_bench/ob_admin_io_adapter_bench.h
io_bench/ob_admin_io_adapter_bench.cpp
#trx_tool/ob_admin_trx_executor.h
#trx_tool/ob_admin_trx_executor.cpp
@ -116,6 +121,11 @@ add_executable(ob_admin
backup_tool/ob_admin_dump_backup_data_executor.h
backup_tool/ob_admin_dump_backup_data_executor.cpp
io_bench/task_executor.h
io_bench/task_executor.cpp
io_bench/ob_admin_io_adapter_bench.h
io_bench/ob_admin_io_adapter_bench.cpp
#trx_tool/ob_admin_trx_executor.h
#trx_tool/ob_admin_trx_executor.cpp

View File

@ -0,0 +1,433 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <time.h>
#include "ob_admin_io_adapter_bench.h"
#include "share/backup/ob_backup_io_adapter.h"
#include "../dumpsst/ob_admin_dumpsst_print_helper.h"
#include "src/logservice/archiveservice/ob_archive_file_utils.h"
#include "src/share/backup/ob_backup_path.h"
#include "src/share/backup/ob_backup_clean_util.h"
using namespace oceanbase::share;
using namespace oceanbase::common;
namespace oceanbase
{
namespace tools
{
ObAdminIOAdapterBenchmarkExecutor::ObAdminIOAdapterBenchmarkExecutor()
: clean_before_execution_(false), clean_after_execution_(false), config_()
{
MEMSET(base_path_, 0, sizeof(base_path_));
MEMSET(storage_info_str_, 0, sizeof(storage_info_str_));
}
int ObAdminIOAdapterBenchmarkExecutor::execute(int argc, char *argv[])
{
int ret = OB_SUCCESS;
lib::set_memory_limit(16 * 1024 * 1024 * 1024LL);
lib::set_tenant_memory_limit(500, 16 * 1024 * 1024 * 1024LL);
OB_LOGGER.set_log_level("INFO");
if (OB_FAIL(parse_cmd_(argc, argv))) {
OB_LOG(WARN, "failed to parse cmd", K(ret), K(argc), K(argv));
} else if (OB_FAIL(run_all_tests_())) {
OB_LOG(WARN, "failed to pass all tests", K(ret), K_(base_path), K_(storage_info_str));
}
return ret;
}
int ObAdminIOAdapterBenchmarkExecutor::parse_cmd_(int argc, char *argv[])
{
int ret = OB_SUCCESS;
int opt = 0;
int index = -1;
const char *opt_str = "h:d:s:t:r:l:o:n:f:p:b:c:j:";
struct option longopts[] = {{"help", 0, NULL, 'h'},
{"file-path-prefix", 1, NULL, 'd'},
{"storage-info", 1, NULL, 's'},
{"thread-num", 1, NULL, 't'},
{"max-task-run-times", 1, NULL, 'r'},
{"time-limit", 1, NULL, 'l'},
{"obj-size", 1, NULL, 'o'},
{"obj-num", 1, NULL, 'n'},
{"fragment-size", 1, NULL, 'f'},
{"type", 1, NULL, 'p'},
{"is-adaptive", 1, NULL, 'j'},
{"clean-before-execution", 0, NULL, 'b'},
{"clean-after-execution", 0, NULL, 'c'},
{NULL, 0, NULL, 0}};
while (OB_SUCC(ret) && -1 != (opt = getopt_long(argc, argv, opt_str, longopts, &index))) {
switch (opt) {
case 'h': {
print_usage_();
exit(1);
}
case 'd': {
time_t timestamp = time(NULL);
struct tm *timeinfo = localtime(&timestamp);
char buffer[OB_MAX_TIME_STR_LENGTH];
strftime(buffer, sizeof(buffer), "%Y-%m-%d-%H:%M:%S", timeinfo);
if (OB_FAIL(databuff_printf(base_path_, sizeof(base_path_), "%s", optarg))) {
OB_LOG(WARN, "failed to construct base path", K(ret), K((char *)optarg), K(buffer));
}
break;
}
case 's': {
if (OB_FAIL(databuff_printf(storage_info_str_, sizeof(storage_info_str_), "%s", optarg))) {
OB_LOG(WARN, "failed to copy storage info str", K(ret), K((char *)optarg));
}
break;
}
case 't': {
if (OB_FAIL(c_str_to_int(optarg, config_.thread_num_))) {
OB_LOG(WARN, "fail to parse thread num", K(ret), K((char *)optarg));
}
break;
}
case 'r': {
if (OB_FAIL(c_str_to_int(optarg, config_.max_task_runs_))) {
OB_LOG(WARN, "fail to parse max task runs", K(ret), K((char *)optarg));
}
break;
}
case 'l': {
if (OB_FAIL(c_str_to_int(optarg, config_.time_limit_s_))) {
OB_LOG(WARN, "fail to parse time limit", K(ret), K((char *)optarg));
}
break;
}
case 'o': {
if (OB_FAIL(c_str_to_int(optarg, config_.obj_size_))) {
OB_LOG(WARN, "fail to parse object size", K(ret), K((char *)optarg));
}
break;
}
case 'n': {
if (OB_FAIL(c_str_to_int(optarg, config_.obj_num_))) {
OB_LOG(WARN, "fail to parse object num", K(ret), K((char *)optarg));
}
break;
}
case 'f': {
if (OB_FAIL(c_str_to_int(optarg, config_.fragment_size_))) {
OB_LOG(WARN, "fail to parse fragment size", K(ret), K((char *)optarg));
}
break;
}
case 'p': {
if (OB_ISNULL(optarg)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "type is NULL", K(ret), K((char *)optarg));
} else if (0 == STRCMP("write", optarg)) {
config_.type_ = BenchmarkTaskType::BENCHMARK_TASK_NORMAL_WRITE;
} else if (0 == STRCMP("append", optarg)) {
config_.type_ = BenchmarkTaskType::BENCHMARK_TASK_APPEND_WRITE;
} else if (0 == STRCMP("multi", optarg)) {
config_.type_ = BenchmarkTaskType::BENCHMARK_TASK_MULTIPART_WRITE;
} else if (0 == STRCMP("read", optarg)) {
config_.type_ = BenchmarkTaskType::BENCHMARK_TASK_READ;
} else if (0 == STRCMP("del", optarg)) {
config_.type_ = BenchmarkTaskType::BENCHMARK_TASK_DEL;
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "unknown test type", K((char *)optarg), K(ret));
}
break;
}
case 'j': {
config_.is_adaptive_ = true;
break;
}
case 'b': {
clean_before_execution_ = true;
break;
}
case 'c': {
clean_after_execution_ = true;
break;
}
default: {
print_usage_();
exit(1);
}
}
}
char print_config_buf[OB_MAX_URI_LENGTH];
config_.to_string(print_config_buf, sizeof(print_config_buf));
OB_LOG(INFO, "Task Config", K_(config));
PrintHelper::print_dump_line("Task Config", print_config_buf);
return ret;
}
class CleanOp : public ObBaseDirEntryOperator
{
public:
CleanOp(char *base_path, share::ObBackupStorageInfo *storage_info)
: base_path_(base_path), storage_info_(storage_info), cleaned_objects_(0)
{
uri_[0] = '\0';
}
~CleanOp() {}
int func(const dirent *entry) override;
char uri_[OB_MAX_URI_LENGTH];
char *base_path_;
share::ObBackupStorageInfo *storage_info_;
int64_t cleaned_objects_;
};
int CleanOp::func(const dirent *entry)
{
int ret = OB_SUCCESS;
ObBackupIoAdapter adapter;
if (OB_FAIL(databuff_printf(uri_, sizeof(uri_), "%s/%s", base_path_, entry->d_name))) {
OB_LOG(WARN, "fail to set uri", K(ret), K_(uri), KPC_(storage_info));
} else if (OB_FAIL(adapter.del_file(uri_, storage_info_))) {
OB_LOG(WARN, "fail to delete file", K(ret), K_(uri), KPC_(storage_info));
} else {
cleaned_objects_++;
}
return ret;
}
int ObAdminIOAdapterBenchmarkExecutor::clean_base_path_(share::ObBackupStorageInfo &info)
{
int ret = OB_SUCCESS;
PrintHelper::print_dump_title("Cleaning");
if (OB_UNLIKELY(!info.is_valid())) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid storage info", K(ret), K(info));
} else if (info.get_type() == ObStorageType::OB_STORAGE_FILE) {
char cmd[OB_MAX_URI_LENGTH] = { 0 };
if (OB_FAIL(databuff_printf(cmd, OB_MAX_URI_LENGTH,
"rm -rf %s/*", base_path_ + strlen(OB_FILE_PREFIX)))) {
OB_LOG(WARN, "fail to fill clean cmd", K(ret), K_(base_path));
} else if (0 != std::system(cmd)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "fail to delete dir", K(ret), K_(base_path), K(cmd));
}
} else {
ObBackupIoAdapter adapter;
CleanOp op(base_path_, &info);
if (OB_FAIL(adapter.list_files(base_path_, &info, op))) {
OB_LOG(WARN, "fail to clean", K(ret), K_(base_path), K(info));
}
PrintHelper::print_dump_line("Cleaned Objects", op.cleaned_objects_);
}
PrintHelper::print_dump_line("Clean Status", OB_SUCC(ret) ? "SUCCESS" : "FAIL");
return ret;
}
int ObAdminIOAdapterBenchmarkExecutor::run_all_tests_()
{
int ret = OB_SUCCESS;
share::ObBackupStorageInfo storage_info;
ObBackupIoAdapterBenchmarkRunner runner;
if (FALSE_IT(init_random_content())) {
} else if (OB_FAIL(storage_info.set(base_path_, storage_info_str_))) {
OB_LOG(WARN, "failed to set storage info", K(ret), K_(base_path), K_(storage_info_str));
} else if (OB_FAIL(runner.init(base_path_, &storage_info, config_))) {
OB_LOG(WARN, "fail to init ObBackupIoAdapterBenchmarkRunner",
K(ret), K_(base_path), K(storage_info), K_(config));
} else {
if (clean_before_execution_) {
if (OB_FAIL(clean_base_path_(storage_info))) {
OB_LOG(WARN, "fail to clean base path", K(ret), K(storage_info), K_(base_path));
}
}
if (OB_SUCC(ret)) {
PrintHelper::print_dump_title("Testing");
if (OB_FAIL(runner.do_benchmark())) {
OB_LOG(WARN, "fail to do benchmark",
K(ret), K_(base_path), K(storage_info), K_(config));
}
}
if (OB_SUCC(ret) && clean_after_execution_) {
if (OB_FAIL(clean_base_path_(storage_info))) {
OB_LOG(WARN, "fail to clean base path", K(ret), K(storage_info), K_(base_path));
}
}
}
return ret;
}
int ObAdminIOAdapterBenchmarkExecutor::print_usage_()
{
int ret = OB_SUCCESS;
printf("\n");
printf("Usage: bench_io_adapter command [command args] [options]\n");
printf("commands:\n");
printf(HELP_FMT, "-h, --help", "display this message.");
printf("options:\n");
printf(HELP_FMT, "-d, --file-path-prefix", "absolute file path with file prefix");
printf(HELP_FMT, "-s, --storage-info", "oss/cos should provide storage info");
printf(HELP_FMT, "-t, --thread-num", "thread num");
printf(HELP_FMT, "-r, --max-task-run-times", "max task run times for each thread");
printf(HELP_FMT, "-l, --time-limit", "time limit in second");
printf(HELP_FMT, "-o, --object-size", "object size");
printf(HELP_FMT, "-n, --object-num", "object num");
printf(HELP_FMT, "-f, --fragment-size",
"for read operations, 'fragment-size' denotes the expected size of data to be read, "
"while for append/multipart write tasks, it specifies the size of each individual pwrite operation.");
printf(HELP_FMT, "-p, --type", "task type");
printf(HELP_FMT, "-j, --is-adaptive", "use adative interface");
printf(HELP_FMT, "-b, --clean-before-execution", "clean before execution");
printf(HELP_FMT, "-c, --clean-after-execution", "clean after execution");
printf("samples:\n");
printf(" test nfs device: \n");
printf("\tob_admin bench_io_adapter -dfile:///home/admin/backup_info \n");
printf(" test object device: \n");
printf("\tob_admin bench_io_adapter -d'oss://home/admin/backup_info' "
"-s'host=xxx.com&access_id=111&access_key=222'\n");
printf("\tob_admin bench_io_adapter -d'cos://home/admin/backup_info' "
"-s'host=xxx.com&access_id=111&access_key=222&appid=333'\n");
printf("\tob_admin bench_io_adapter -d's3://home/admin/backup_info' "
"-s'host=xxx.com&access_id=111&access_key=222&s3_region=333'\n");
return ret;
}
/*--------------------------------ObBackupIoAdapterBenchmarkRunner--------------------------------*/
ObBackupIoAdapterBenchmarkRunner::ObBackupIoAdapterBenchmarkRunner()
: lock_(), is_inited_(false), tg_id_(-1), ret_code_(OB_SUCCESS),
config_(), metrics_(), storage_info_(nullptr)
{
}
ObBackupIoAdapterBenchmarkRunner::~ObBackupIoAdapterBenchmarkRunner()
{
destroy();
}
int ObBackupIoAdapterBenchmarkRunner::init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
OB_LOG(WARN, "ObBackupIoAdapterBenchmarkRunner init twice", K(ret));
} else if (OB_ISNULL(base_uri) || OB_ISNULL(storage_info)
|| OB_UNLIKELY(!storage_info->is_valid() || config.thread_num_ <= 0)
|| OB_UNLIKELY(config.time_limit_s_ <= 0 && config.max_task_runs_ <= 0)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid arguments", K(ret), K(base_uri), KPC(storage_info), K(config));
} else if (OB_FAIL(databuff_printf(base_uri_, sizeof(base_uri_), "%s", base_uri))) {
OB_LOG(WARN, "fail to deep copy base uri", K(ret), K(base_uri));
} else if (OB_FAIL(config_.assign(config))) {
OB_LOG(WARN, "fail to assign task config", K(ret), K(config));
} else {
storage_info_ = storage_info;
is_inited_ = true;
}
return ret;
}
void ObBackupIoAdapterBenchmarkRunner::destroy()
{
if (tg_id_ >= 0) {
TG_STOP(tg_id_);
TG_WAIT(tg_id_);
tg_id_ = -1;
}
is_inited_ = false;
}
int ObBackupIoAdapterBenchmarkRunner::do_benchmark()
{
int ret = OB_SUCCESS;
const int64_t thread_num = config_.thread_num_;
const int64_t time_limit_s = config_.time_limit_s_;
struct rusage start_usage;
struct timeval start_real_time;
getrusage(RUSAGE_SELF, &start_usage);
gettimeofday(&start_real_time, nullptr);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "ObBackupIoAdapterBenchmarkRunner not init", K(ret));
} else if (OB_FAIL(TG_CREATE(lib::TGDefIDs::COMMON_THREAD_POOL, tg_id_))) {
OB_LOG(WARN, "create thread group failed", K(ret));
} else if (OB_FAIL(TG_SET_RUNNABLE(tg_id_, *this))) {
OB_LOG(WARN, "set tg_runnable failed", K(ret), K_(tg_id));
} else if (OB_FAIL(TG_SET_THREAD_CNT(tg_id_, thread_num))) {
OB_LOG(WARN, "set thread count failed", K(ret), K_(tg_id), K(thread_num));
} else if (OB_FAIL(TG_START(tg_id_))) {
OB_LOG(WARN, "start thread failed", K(ret), K_(tg_id), K(thread_num));
} else {
if (time_limit_s > 0) {
sleep(time_limit_s);
TG_STOP(tg_id_);
}
TG_WAIT(tg_id_);
if (OB_SUCC(ret_code_)) {
metrics_.summary(start_real_time, start_usage, config_.thread_num_);
} else {
OB_LOG(WARN, "some threads failed, check log", K_(ret_code), K(thread_num));
}
}
return ret;
}
void ObBackupIoAdapterBenchmarkRunner::run1()
{
int ret = OB_SUCCESS;
ObBackupIoAdapter util;
ITaskExecutor *executor = nullptr;
const uint64_t thread_idx = get_thread_idx();
char uri[common::OB_MAX_URI_LENGTH];
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "ObBackupIoAdapterBenchmarkRunner not init", K(ret));
} else if (OB_FAIL(databuff_printf(uri, sizeof(uri), "%s/%ld", base_uri_, thread_idx))) {
OB_LOG(WARN, "fail to construct base task dir for current thread",
K(ret), K_(base_uri), K(thread_idx));
} else if (OB_FAIL(util.mkdir(uri, storage_info_))) {
OB_LOG(WARN, "fail to make base task dir for current thread",
K(ret), K_(base_uri), K(thread_idx));
} else if (OB_FAIL(init_task_executor(uri, storage_info_, config_, executor))) {
OB_LOG(WARN, "fail to create and init task executor", K(ret), K(uri), K(thread_idx));
} else {
const bool is_limit_task_runs = (config_.max_task_runs_ > 0);
for (int64_t i = 0; OB_SUCC(ret) && !has_set_stop(); i++) {
if (is_limit_task_runs && i >= config_.max_task_runs_) {
break;
} else if (OB_FAIL(executor->execute())) {
OB_LOG(WARN, "fail to execute task", K(ret), K(i), K(uri), K(thread_idx));
}
}
if (OB_SUCC(ret)) {
SpinWLockGuard guard(lock_);
metrics_.add(executor->get_metrics());
} else {
ret_code_ = ret;
}
}
if (OB_NOT_NULL(executor)) {
executor->~ITaskExecutor();
ob_free(executor);
executor = nullptr;
}
}
} //tools
} //oceanbase

View File

@ -0,0 +1,76 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_ADMIN_IO_ADAPTER_BENCH_H_
#define OB_ADMIN_IO_ADAPTER_BENCH_H_
#include <gtest/gtest.h>
#include "../ob_admin_executor.h"
#include "deps/oblib/src/lib/ob_define.h" // OB_MAX_URI_LENGTH
#include "share/backup/ob_backup_struct.h" // OB_MAX_BACKUP_STORAGE_INFO_LENGTH
#include "share/ob_thread_mgr.h"
#include "task_executor.h"
namespace oceanbase
{
namespace tools
{
class ObAdminIOAdapterBenchmarkExecutor : public ObAdminExecutor
{
public:
ObAdminIOAdapterBenchmarkExecutor();
virtual ~ObAdminIOAdapterBenchmarkExecutor() {}
virtual int execute(int argc, char *argv[]) override;
private:
int parse_cmd_(int argc, char *argv[]);
int run_all_tests_();
int print_usage_();
int clean_base_path_(share::ObBackupStorageInfo &info);
private:
char base_path_[common::OB_MAX_URI_LENGTH];
char storage_info_str_[common::OB_MAX_BACKUP_STORAGE_INFO_LENGTH];
bool clean_before_execution_;
bool clean_after_execution_;
TaskConfig config_;
private:
static constexpr char *HELP_FMT = const_cast<char*>("\t%-30s%-12s\n");
DISALLOW_COPY_AND_ASSIGN(ObAdminIOAdapterBenchmarkExecutor);
};
class ObBackupIoAdapterBenchmarkRunner : public lib::TGRunnable
{
public:
ObBackupIoAdapterBenchmarkRunner();
~ObBackupIoAdapterBenchmarkRunner();
int init(const char *base_uri, share::ObBackupStorageInfo *storage_info, const TaskConfig &config);
void destroy();
int do_benchmark();
virtual void run1() override;
private:
common::SpinRWLock lock_;
bool is_inited_;
int tg_id_;
int ret_code_;
TaskConfig config_;
Metrics metrics_;
char base_uri_[common::OB_MAX_URI_LENGTH];
share::ObBackupStorageInfo *storage_info_;
};
} //namespace tools
} //namespace oceanbase
#endif // OB_ADMIN_IO_ADAPTER_BENCH_H_

View File

@ -0,0 +1,785 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "share/backup/ob_backup_io_adapter.h"
#include "task_executor.h"
#include "../dumpsst/ob_admin_dumpsst_print_helper.h"
using namespace oceanbase::share;
using namespace oceanbase::common;
namespace oceanbase
{
namespace tools
{
/*--------------------------------TaskConfig--------------------------------*/
TaskConfig::TaskConfig()
: thread_num_(-1), max_task_runs_(-1), time_limit_s_(-1),
obj_size_(-1), obj_num_(-1), fragment_size_(-1), is_adaptive_(false),
type_(BenchmarkTaskType::BENCHMARK_TASK_MAX_TYPE)
{
}
int TaskConfig::assign(const TaskConfig &other)
{
int ret = OB_SUCCESS;
thread_num_ = other.thread_num_;
max_task_runs_ = other.max_task_runs_;
time_limit_s_ = other.time_limit_s_;
obj_size_ = other.obj_size_;
obj_num_ = other.obj_num_;
fragment_size_ = other.fragment_size_;
is_adaptive_ = other.is_adaptive_;
type_ = other.type_;
return ret;
}
/*--------------------------------Metrics--------------------------------*/
TimeMap::TimeMap() : total_entry_(0)
{
}
int TimeMap::log_entry(const int64_t start_time_us)
{
int ret = OB_SUCCESS;
const int64_t cost_time_ms = (ObTimeUtility::current_time() - start_time_us) / 1000;
time_ms_map_[cost_time_ms]++;
total_entry_++;
return ret;
}
int TimeMap::add(const TimeMap &other)
{
int ret = OB_SUCCESS;
std::map<int64_t, int64_t>::const_iterator it = other.time_ms_map_.begin();
while (OB_SUCC(ret) && it != other.time_ms_map_.end()) {
time_ms_map_[it->first] += it->second;
++it;
}
if (OB_SUCC(ret)) {
total_entry_ += other.total_entry_;
}
return ret;
}
void TimeMap::summary(const char *map_name_str)
{
const char *map_name = "Anonymous Time Map";
if (OB_NOT_NULL(map_name_str)) {
map_name = map_name_str;
}
if (total_entry_ <= 0) {
PrintHelper::print_dump_line(map_name, "Empty Time Map");
} else {
const int64_t th_50_count = total_entry_ * 0.5;
const int64_t th_90_count = total_entry_ * 0.9;
const int64_t th_99_count = total_entry_ * 0.99;
const int64_t th_999_count = total_entry_ * 0.999;
int64_t min_ms = 0;
int64_t th_50_ms = 0;
int64_t th_90_ms = 0;
int64_t th_99_ms = 0;
int64_t th_999_ms = 0;
int64_t cur_count = 0;
int64_t max_ms = 0;
std::map<int64_t, int64_t>::const_iterator it = time_ms_map_.begin();
min_ms = it->first;
while (it != time_ms_map_.end()) {
cur_count += it->second;
if (th_50_ms == 0 && cur_count >= th_50_count) {
th_50_ms = it->first;
}
if (th_90_ms == 0 && cur_count >= th_90_count) {
th_90_ms = it->first;
}
if (th_99_ms == 0 && cur_count >= th_99_count) {
th_99_ms = it->first;
}
if (th_999_ms == 0 && cur_count >= th_999_count) {
th_999_ms = it->first;
}
if (max_ms == 0 && cur_count == total_entry_) {
max_ms = it->first;
}
++it;
}
char buf[2048];
int ret = OB_SUCCESS;
if (OB_FAIL(databuff_printf(buf, sizeof(buf),
"total_entry=%ld, min_ms=%ld, th_50_ms=%ld, th_90_ms=%ld, th_99_ms=%ld, th_999_ms=%ld, max_ms=%ld",
total_entry_, min_ms, th_50_ms, th_90_ms,th_99_ms, th_999_ms, max_ms))) {
OB_LOG(WARN, "fail to set log str", K(ret), K_(total_entry),
K(min_ms), K(th_50_ms), K(th_90_ms), K(th_99_ms), K(th_999_ms), K(max_ms));
} else {
OB_LOG(INFO, "time map status", K(ret), K_(total_entry),
K(min_ms), K(th_50_ms), K(th_90_ms), K(th_99_ms), K(th_999_ms), K(max_ms));
PrintHelper::print_dump_line(map_name, buf);
}
}
}
int TimeMap::assign(const TimeMap &other)
{
int ret = OB_SUCCESS;
total_entry_ = other.total_entry_;
time_ms_map_ = other.time_ms_map_;
return ret;
}
Metrics::Metrics()
: status_(OB_SUCCESS), throughput_bytes_(0), operation_num_(0),
total_op_time_ms_map_(), open_time_ms_map_(), close_time_ms_map_()
{
}
int Metrics::assign(const Metrics &other)
{
int ret = OB_SUCCESS;
if (OB_FAIL(total_op_time_ms_map_.assign(other.total_op_time_ms_map_))) {
OB_LOG(WARN, "fail to assign total_op_time_ms_map", K(ret));
} else if (OB_FAIL(open_time_ms_map_.assign(other.open_time_ms_map_))) {
OB_LOG(WARN, "fail to assign open_time_ms_map", K(ret));
} else if (OB_FAIL(close_time_ms_map_.assign(other.close_time_ms_map_))) {
OB_LOG(WARN, "fail to assign close_time_ms_map", K(ret));
} else {
status_ = other.status_;
throughput_bytes_ = other.throughput_bytes_;
operation_num_ = other.operation_num_;
}
return ret;
}
int Metrics::add(const Metrics &other)
{
int ret = OB_SUCCESS;
if (OB_SUCC(status_)) {
if (OB_SUCC(other.status_)) {
if (OB_FAIL(total_op_time_ms_map_.add(other.total_op_time_ms_map_))) {
OB_LOG(WARN, "fail to add total_op_time_ms_map", K(ret));
} else if (OB_FAIL(open_time_ms_map_.add(other.open_time_ms_map_))) {
OB_LOG(WARN, "fail to add open_time_ms_map", K(ret));
} else if (OB_FAIL(close_time_ms_map_.add(other.close_time_ms_map_))) {
OB_LOG(WARN, "fail to add close_time_ms_map", K(ret));
} else {
throughput_bytes_ += other.throughput_bytes_;
operation_num_ += other.operation_num_;
}
} else {
status_ = other.status_;
}
}
return ret;
}
static double cal_time_diff(const timeval& start, const timeval& end)
{
return (end.tv_sec - start.tv_sec) + (end.tv_usec - start.tv_usec) / 1e6;
}
void Metrics::summary(
const struct timeval &start_real_time,
const struct rusage &start_usage,
const int64_t thread_num)
{
PrintHelper::print_dump_line("Status", status_ == OB_SUCCESS ? "SUCCESS" : "FAIL");
if (OB_LIKELY(status_ == OB_SUCCESS)) {
if (OB_UNLIKELY(operation_num_ <= 0)) {
PrintHelper::print_dump_line("Operation num is unexpected", operation_num_);
} else {
struct rusage end_usage;
struct timeval end_real_time;
getrusage(RUSAGE_SELF, &end_usage);
gettimeofday(&end_real_time, nullptr);
const double cost_time_s = cal_time_diff(start_real_time, end_real_time);
const double user_cpu_time_s = cal_time_diff(start_usage.ru_utime, end_usage.ru_utime);
const double sys_cpu_time_s = cal_time_diff(start_usage.ru_stime, end_usage.ru_stime);
const double cpu_usage = (user_cpu_time_s + sys_cpu_time_s) / (cost_time_s) * 100.0;
const double QPS = ((double)operation_num_) / cost_time_s;
const double BW = ( ((double)throughput_bytes_) / 1024.0 / 1024.0 ) / cost_time_s;
PrintHelper::print_dump_line("Total operation num", operation_num_);
PrintHelper::print_dump_line("Total execution time", (std::to_string(cost_time_s) + " s").c_str());
PrintHelper::print_dump_line("Total user time", (std::to_string(user_cpu_time_s) + " s").c_str());
PrintHelper::print_dump_line("Total system time", (std::to_string(sys_cpu_time_s) + " s").c_str());
if (BW > 1e-6) {
const double cpu_usage_for_100MB_bw = (100.0 / BW) * cpu_usage;
PrintHelper::print_dump_line("CPU usage for 100MB/s BW",
(std::to_string(cpu_usage_for_100MB_bw) + "% per 100MB/s").c_str());
} else {
PrintHelper::print_dump_line("Total CPU usage", (std::to_string(cpu_usage) + "%").c_str());
}
PrintHelper::print_dump_line("Total throughput bytes", throughput_bytes_);
PrintHelper::print_dump_line("Total QPS", std::to_string(QPS).c_str());
PrintHelper::print_dump_line("Per Thread QPS", std::to_string(QPS / thread_num).c_str());
PrintHelper::print_dump_line("Total BW", (std::to_string(BW) + " MB/s").c_str());
PrintHelper::print_dump_line("Per Thread BW", (std::to_string(BW / thread_num) + " MB/s").c_str());
total_op_time_ms_map_.summary("Total Op Time Map");
open_time_ms_map_.summary("Open Time Map");
close_time_ms_map_.summary("Close Op Time Map");
}
}
}
/*--------------------------------ITaskExecutor--------------------------------*/
ITaskExecutor::ITaskExecutor()
: is_inited_(false), base_uri_len_(-1), storage_info_(nullptr), metrics_()
{
base_uri_[0] = '\0';
}
void ITaskExecutor::reset()
{
is_inited_ = false;
base_uri_len_ = -1;
storage_info_ = nullptr;
base_uri_[0] = '\0';
}
int ITaskExecutor::init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
OB_LOG(WARN, "Task Executor init twice", K(ret));
} else if (OB_ISNULL(base_uri) || OB_ISNULL(storage_info)
|| OB_UNLIKELY(!storage_info->is_valid())) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid arguments", K(ret), K(base_uri), KPC(storage_info));
} else if (OB_FAIL(databuff_printf(base_uri_, sizeof(base_uri_), "%s", base_uri))) {
OB_LOG(WARN, "fail to deep copy base uri", K(ret), K(base_uri));
} else {
base_uri_len_ = strlen(base_uri_);
storage_info_ = storage_info;
is_inited_ = true;
}
return ret;
}
int ITaskExecutor::prepare_(const int64_t object_id)
{
int ret = OB_SUCCESS;
int64_t pos = base_uri_len_;
if (OB_FAIL(databuff_printf(base_uri_, sizeof(base_uri_), pos, "/%ld", object_id))) {
OB_LOG(WARN, "fail to construct object name", K(ret), K_(base_uri), K(object_id));
}
return ret;
}
void ITaskExecutor::finish_(const int64_t ob_errcode)
{
metrics_.status_ = ob_errcode;
base_uri_[base_uri_len_] = '\0';
}
template<typename Executor>
typename std::enable_if<std::is_base_of<ITaskExecutor, Executor>::value, int>::type
alloc_executor(ITaskExecutor *&executor, ObMemAttr &attr)
{
int ret = OB_SUCCESS;
void *buf = nullptr;
if (OB_ISNULL(buf = ob_malloc(sizeof(Executor), attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN,"fail to alloc buf for task executor", K(ret));
} else {
executor = new(buf) Executor();
}
return ret;
}
int init_task_executor(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config, ITaskExecutor *&executor)
{
int ret = OB_SUCCESS;
executor = nullptr;
void *buf = nullptr;
ObMemAttr attr;
attr.label_ = "TMPSTR";
if (OB_ISNULL(base_uri) || OB_ISNULL(storage_info)
|| OB_UNLIKELY(!storage_info->is_valid() || config.type_ == BenchmarkTaskType::BENCHMARK_TASK_MAX_TYPE)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid arguments", K(ret), K(base_uri), KPC(storage_info), K(config));
} else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_NORMAL_WRITE) {
if (OB_FAIL(alloc_executor<WriteTaskExecutor>(executor, attr))) {
OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config));
}
} else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_READ) {
if (OB_FAIL(alloc_executor<ReadTaskExecutor>(executor, attr))) {
OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config));
}
} else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_APPEND_WRITE) {
if (OB_FAIL(alloc_executor<AppendWriteTaskExecutor>(executor, attr))) {
OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config));
}
} else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_MULTIPART_WRITE) {
if (OB_FAIL(alloc_executor<MultipartWriteTaskExecutor>(executor, attr))) {
OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config));
}
} else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_DEL) {
if (OB_FAIL(alloc_executor<DelTaskExecutor>(executor, attr))) {
OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config));
}
} else {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "unknown benchmark task type", K(ret), K(config));
}
if (FAILEDx(executor->init(base_uri, storage_info, config))) {
OB_LOG(WARN, "fail to init executor", K(ret), K(base_uri), KPC(storage_info), K(config));
}
if (OB_FAIL(ret)) {
if (OB_NOT_NULL(executor)) {
executor->~ITaskExecutor();
ob_free(executor);
executor = nullptr;
}
}
return ret;
}
/*--------------------------------Write Task Executor--------------------------------*/
static const int64_t MAX_RANDOM_CONTENT_LEN = 128 * 1024 * 1024L;
static char RANDOM_CONTENT[MAX_RANDOM_CONTENT_LEN + 1];
void init_random_content()
{
static bool is_inited = false;
if (!is_inited) {
static const char alphanum[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
for (int64_t i = 0; i < MAX_RANDOM_CONTENT_LEN; i++) {
RANDOM_CONTENT[i] = alphanum[ObRandom::rand(0, sizeof(alphanum) - 2)];
}
RANDOM_CONTENT[MAX_RANDOM_CONTENT_LEN] = '\0';
is_inited = true;
}
}
WriteTaskExecutor::WriteTaskExecutor()
: ITaskExecutor(), obj_size_(-1), write_buf_(nullptr), allocator_()
{
}
void WriteTaskExecutor::reset()
{
obj_size_ = -1;
write_buf_ = nullptr;
allocator_.clear();
ITaskExecutor::reset();
}
int WriteTaskExecutor::init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) {
OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config));
} else if (OB_UNLIKELY(config.obj_size_ <= 0 || config.obj_size_ >= MAX_RANDOM_CONTENT_LEN)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid arguments", K(ret), K(config));
} else if (FALSE_IT(obj_size_ = config.obj_size_)) {
} else if (OB_ISNULL(write_buf_ = (char *)allocator_.alloc(obj_size_ + 1))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to alloc memory for write buf", K(ret), K_(obj_size));
} else {
is_inited_ = true;
}
if (OB_FAIL(ret)) {
reset();
}
return ret;
}
int WriteTaskExecutor::execute()
{
int ret = OB_SUCCESS;
const int64_t object_id = metrics_.operation_num_;
const int64_t start_time_us = ObTimeUtility::current_time();
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "WriteTaskExecutor not init", K(ret), K_(base_uri));
} else if (OB_FAIL(prepare_(object_id))) {
OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id));
} else {
ObBackupIoAdapter adapter;
MEMCPY(write_buf_,
RANDOM_CONTENT + ObRandom::rand(0, MAX_RANDOM_CONTENT_LEN - obj_size_),
obj_size_);
write_buf_[obj_size_] = '\0';
if (OB_FAIL(adapter.write_single_file(base_uri_, storage_info_, write_buf_, obj_size_))) {
OB_LOG(WARN, "fail to write file",
K(ret), K_(base_uri), KPC_(storage_info), K_(obj_size), K(object_id));
} else {
metrics_.operation_num_++;
metrics_.throughput_bytes_ += obj_size_;
metrics_.total_op_time_ms_map_.log_entry(start_time_us);
}
}
finish_(ret);
return ret;
}
AppendWriteTaskExecutor::AppendWriteTaskExecutor()
: ITaskExecutor(), obj_size_(-1), fragment_size_(-1), write_buf_(nullptr), allocator_()
{
}
void AppendWriteTaskExecutor::reset()
{
obj_size_ = -1;
fragment_size_ = -1;
write_buf_ = nullptr;
allocator_.clear();
ITaskExecutor::reset();
}
int AppendWriteTaskExecutor::init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) {
OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config));
} else if (OB_UNLIKELY(config.fragment_size_ <= 0 || config.fragment_size_ > config.obj_size_
|| config.fragment_size_ >= MAX_RANDOM_CONTENT_LEN)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid arguments", K(ret), K(config));
} else if (FALSE_IT(obj_size_ = config.obj_size_)) {
} else if (FALSE_IT(fragment_size_ = config.fragment_size_)) {
} else if (OB_ISNULL(write_buf_ = (char *)allocator_.alloc(fragment_size_ + 1))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to alloc memory for write buf", K(ret), K_(fragment_size));
} else {
is_inited_ = true;
}
if (OB_FAIL(ret)) {
reset();
}
return ret;
}
int AppendWriteTaskExecutor::execute()
{
int ret = OB_SUCCESS;
const int64_t object_id = metrics_.operation_num_;
const int64_t start_time_us = ObTimeUtility::current_time();
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "AppendWriteTaskExecutor not init", K(ret), K_(base_uri));
} else if (OB_FAIL(prepare_(object_id))) {
OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id));
} else {
ObBackupIoAdapter adapter;
ObIOFd fd;
ObIODevice *device_handle = nullptr;
ObStorageAccessType access_type = OB_STORAGE_ACCESS_RANDOMWRITER;
const int64_t open_start_time_us = ObTimeUtility::current_time();
if (OB_FAIL(adapter.open_with_access_type(
device_handle, fd, storage_info_, base_uri_, access_type))) {
OB_LOG(WARN, "failed to open device with access type",
K(ret), K_(base_uri), KPC_(storage_info), K(access_type));
} else if (OB_ISNULL(device_handle)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "device handle is NULL", K(ret), KP(device_handle), K_(base_uri));
} else {
metrics_.open_time_ms_map_.log_entry(open_start_time_us);
}
int64_t cur_offset = 0;
int64_t actual_write_size = -1;
int64_t cur_append_size = -1;
while (OB_SUCC(ret) && cur_offset < obj_size_) {
cur_append_size = MIN(obj_size_ - cur_offset, fragment_size_);
MEMCPY(write_buf_,
RANDOM_CONTENT + ObRandom::rand(0, MAX_RANDOM_CONTENT_LEN - cur_append_size),
cur_append_size);
write_buf_[cur_append_size] = '\0';
if (OB_FAIL(device_handle->pwrite(fd, cur_offset, cur_append_size,
write_buf_, actual_write_size))) {
OB_LOG(WARN, "fail to append object",
K(ret), K_(base_uri), K(cur_offset), K(cur_append_size));
} else {
cur_offset += cur_append_size;
}
}
const int64_t close_start_time_us = ObTimeUtility::current_time();
if (FAILEDx(device_handle->seal_file(fd))) {
OB_LOG(WARN, "fail to seal file", K(ret), K_(base_uri));
} else if (OB_FAIL(adapter.close_device_and_fd(device_handle, fd))) {
OB_LOG(WARN, "fail to close device handle", K(ret), K_(base_uri));
} else {
metrics_.close_time_ms_map_.log_entry(close_start_time_us);
}
if (OB_SUCC(ret)) {
metrics_.operation_num_++;
metrics_.throughput_bytes_ += obj_size_;
metrics_.total_op_time_ms_map_.log_entry(start_time_us);
}
}
finish_(ret);
return ret;
}
MultipartWriteTaskExecutor::MultipartWriteTaskExecutor()
: ITaskExecutor(), obj_size_(-1), part_size_(-1), write_buf_(nullptr), allocator_()
{
}
void MultipartWriteTaskExecutor::reset()
{
obj_size_ = -1;
part_size_ = -1;
write_buf_ = nullptr;
allocator_.clear();
ITaskExecutor::reset();
}
int MultipartWriteTaskExecutor::init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) {
OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config));
} else if (OB_UNLIKELY(config.fragment_size_ <= 0 || config.fragment_size_ > config.obj_size_
|| config.fragment_size_ >= MAX_RANDOM_CONTENT_LEN)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid arguments", K(ret), K(config));
} else if (FALSE_IT(obj_size_ = config.obj_size_)) {
} else if (FALSE_IT(part_size_ = config.fragment_size_)) {
} else if (OB_ISNULL(write_buf_ = (char *)allocator_.alloc(part_size_ + 1))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to alloc memory for write buf", K(ret), K_(part_size));
} else {
is_inited_ = true;
}
if (OB_FAIL(ret)) {
reset();
}
return ret;
}
int MultipartWriteTaskExecutor::execute()
{
int ret = OB_SUCCESS;
const int64_t object_id = metrics_.operation_num_;
const int64_t start_time_us = ObTimeUtility::current_time();
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "MultipartWriteTaskExecutor not init", K(ret), K_(base_uri));
} else if (OB_FAIL(prepare_(object_id))) {
OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id));
} else {
ObBackupIoAdapter adapter;
ObIOFd fd;
ObIODevice *device_handle = nullptr;
ObStorageAccessType access_type = OB_STORAGE_ACCESS_MULTIPART_WRITER;
const int64_t open_start_time_us = ObTimeUtility::current_time();
if (OB_FAIL(adapter.open_with_access_type(
device_handle, fd, storage_info_, base_uri_, access_type))) {
OB_LOG(WARN, "failed to open device with access type",
K(ret), K_(base_uri), KPC_(storage_info), K(access_type));
} else if (OB_ISNULL(device_handle)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "device handle is NULL", K(ret), KP(device_handle), K_(base_uri));
} else {
metrics_.open_time_ms_map_.log_entry(open_start_time_us);
}
int64_t cur_offset = 0;
int64_t actual_write_size = -1;
int64_t cur_part_size = -1;
while (OB_SUCC(ret) && cur_offset < obj_size_) {
cur_part_size = MIN(obj_size_ - cur_offset, part_size_);
MEMCPY(write_buf_,
RANDOM_CONTENT + ObRandom::rand(0, MAX_RANDOM_CONTENT_LEN - cur_part_size),
cur_part_size);
write_buf_[cur_part_size] = '\0';
if (OB_FAIL(device_handle->pwrite(fd, cur_offset, cur_part_size,
write_buf_, actual_write_size))) {
OB_LOG(WARN, "fail to upload part",
K(ret), K_(base_uri), K(cur_offset), K(cur_part_size));
} else {
cur_offset += cur_part_size;
}
}
const int64_t close_start_time_us = ObTimeUtility::current_time();
if (FAILEDx(adapter.close_device_and_fd(device_handle, fd))) {
OB_LOG(WARN, "fail to close device handle", K(ret), K_(base_uri));
} else {
metrics_.close_time_ms_map_.log_entry(close_start_time_us);
}
if (OB_SUCC(ret)) {
metrics_.operation_num_++;
metrics_.throughput_bytes_ += obj_size_;
metrics_.total_op_time_ms_map_.log_entry(start_time_us);
}
}
finish_(ret);
return ret;
}
/*--------------------------------Read Task Executor--------------------------------*/
ReadTaskExecutor::ReadTaskExecutor()
: ITaskExecutor(), obj_size_(-1), obj_num_(-1), is_adaptive_(false),
expected_read_size_(-1), read_buf_(nullptr), allocator_()
{
}
void ReadTaskExecutor::reset()
{
obj_size_ = -1;
obj_num_ = -1;
is_adaptive_ = false;
expected_read_size_ = -1;
read_buf_ = nullptr;
allocator_.clear();
ITaskExecutor::reset();
}
int ReadTaskExecutor::init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) {
OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config));
} else if (OB_UNLIKELY(config.fragment_size_ <= 0 || config.fragment_size_ > config.obj_size_
|| config.obj_num_ <= 0)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid arguments", K(ret), K(config));
} else if (FALSE_IT(obj_size_ = config.obj_size_)) {
} else if (FALSE_IT(expected_read_size_ = config.fragment_size_)) {
} else if (OB_ISNULL(read_buf_ = (char *)allocator_.alloc(expected_read_size_ + 1))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to alloc memory for read buf", K(ret), K_(obj_size), K_(expected_read_size));
} else {
obj_num_ = config.obj_num_;
is_adaptive_ = config.is_adaptive_;
is_inited_ = true;
}
if (OB_FAIL(ret)) {
reset();
}
return ret;
}
int ReadTaskExecutor::execute()
{
int ret = OB_SUCCESS;
const int64_t object_id = ObRandom::rand(0, obj_num_ - 1);
const int64_t start_time_us = ObTimeUtility::current_time();
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "ReadTaskExecutor not init", K(ret), K_(base_uri));
} else if (OB_FAIL(prepare_(object_id))) {
OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id));
} else {
ObBackupIoAdapter adapter;
const int64_t offset = (ObRandom::rand(0, obj_size_ - expected_read_size_) / ALIGNMENT) * ALIGNMENT;
int64_t read_size = -1;
if (is_adaptive_ && OB_FAIL(adapter.adaptively_read_part_file(base_uri_,
storage_info_, read_buf_, expected_read_size_, offset, read_size))) {
OB_LOG(WARN, "fail to read adaptive file", K(ret), K_(base_uri),
KPC_(storage_info), K_(expected_read_size), K_(obj_size), K(offset), K(object_id));
} else if (!is_adaptive_ && OB_FAIL(adapter.read_part_file(base_uri_,
storage_info_, read_buf_, expected_read_size_, offset, read_size))) {
OB_LOG(WARN, "fail to read file", K(ret), K_(base_uri),
KPC_(storage_info), K_(expected_read_size), K_(obj_size), K(offset), K(object_id));
} else {
metrics_.operation_num_++;
metrics_.throughput_bytes_ += read_size;
metrics_.total_op_time_ms_map_.log_entry(start_time_us);
}
}
finish_(ret);
return ret;
}
/*--------------------------------Del Task Executor--------------------------------*/
DelTaskExecutor::DelTaskExecutor()
: ITaskExecutor(), is_adaptive_(false)
{
}
int DelTaskExecutor::init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) {
OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config));
} else {
is_adaptive_ = config.is_adaptive_;
is_inited_ = true;
}
if (OB_FAIL(ret)) {
reset();
}
return ret;
}
int DelTaskExecutor::execute()
{
int ret = OB_SUCCESS;
const int64_t object_id = metrics_.operation_num_;
const int64_t start_time_us = ObTimeUtility::current_time();
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "DelTaskExecutor not init", K(ret), K_(base_uri));
} else if (OB_FAIL(prepare_(object_id))) {
OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id));
} else {
ObBackupIoAdapter adapter;
if (is_adaptive_ && OB_FAIL(adapter.adaptively_del_file(base_uri_, storage_info_))) {
OB_LOG(WARN, "fail to delete adaptive file",
K(ret), K_(base_uri), KPC_(storage_info), K(object_id));
} else if (!is_adaptive_ && OB_FAIL(adapter.del_file(base_uri_, storage_info_))) {
OB_LOG(WARN, "fail to delete file", K(ret), K_(base_uri), KPC_(storage_info), K(object_id));
} else {
metrics_.operation_num_++;
metrics_.total_op_time_ms_map_.log_entry(start_time_us);
}
}
finish_(ret);
return ret;
}
} //tools
} //oceanbase

View File

@ -0,0 +1,199 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_ADMIN_BENCHMARK_TASK_EXECUTOR_H_
#define OB_ADMIN_BENCHMARK_TASK_EXECUTOR_H_
#include "share/backup/ob_backup_struct.h"
#include "deps/oblib/src/lib/ob_define.h"
#include <sys/time.h>
#include <sys/resource.h>
#include <unistd.h>
namespace oceanbase
{
namespace tools
{
enum BenchmarkTaskType
{
BENCHMARK_TASK_NORMAL_WRITE = 0,
BENCHMARK_TASK_APPEND_WRITE = 1,
BENCHMARK_TASK_MULTIPART_WRITE = 2,
BENCHMARK_TASK_READ = 3,
BENCHMARK_TASK_DEL = 4,
BENCHMARK_TASK_MAX_TYPE
};
struct TaskConfig
{
TaskConfig();
int assign(const TaskConfig &other);
TO_STRING_KV(K_(thread_num), K_(max_task_runs), K_(time_limit_s),
K_(obj_size), K_(obj_num), K_(fragment_size), K_(is_adaptive), K_(type));
int64_t thread_num_;
int64_t max_task_runs_; // 每个线程执行次数
int64_t time_limit_s_;
int64_t obj_size_;
int64_t obj_num_;
int64_t fragment_size_;
bool is_adaptive_;
BenchmarkTaskType type_;
};
struct TimeMap
{
TimeMap();
int log_entry(const int64_t start_time_us);
int add(const TimeMap &other);
int assign(const TimeMap &other);
void summary(const char *map_name_str);
int64_t total_entry_;
std::map<int64_t, int64_t> time_ms_map_;
};
struct Metrics
{
Metrics();
int assign(const Metrics &other);
int add(const Metrics &other);
void summary(
const struct timeval &start_real_time,
const struct rusage &start_usage,
const int64_t thread_num);
int64_t status_;
int64_t throughput_bytes_;
int64_t operation_num_;
TimeMap total_op_time_ms_map_;
TimeMap open_time_ms_map_;
TimeMap close_time_ms_map_;
};
class ITaskExecutor
{
public:
ITaskExecutor();
virtual ~ITaskExecutor() {}
void reset();
virtual int init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config);
virtual int execute() { return OB_SUCCESS; };
const Metrics &get_metrics() { return metrics_; }
protected:
int prepare_(const int64_t object_id);
void finish_(const int64_t ob_errcode);
protected:
bool is_inited_;
int64_t base_uri_len_;
char base_uri_[common::OB_MAX_URI_LENGTH];
share::ObBackupStorageInfo *storage_info_;
Metrics metrics_;
};
void init_random_content();
int init_task_executor(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config, ITaskExecutor *&executor);
class WriteTaskExecutor : public ITaskExecutor
{
public:
WriteTaskExecutor();
virtual ~WriteTaskExecutor() {}
void reset();
virtual int init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config) override;
virtual int execute() override;
private:
int64_t obj_size_;
char *write_buf_;
ObArenaAllocator allocator_;
};
class AppendWriteTaskExecutor : public ITaskExecutor
{
public:
AppendWriteTaskExecutor();
virtual ~AppendWriteTaskExecutor() {}
void reset();
virtual int init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config) override;
virtual int execute() override;
private:
int64_t obj_size_;
int64_t fragment_size_;
char *write_buf_;
ObArenaAllocator allocator_;
};
class MultipartWriteTaskExecutor : public ITaskExecutor
{
public:
MultipartWriteTaskExecutor();
virtual ~MultipartWriteTaskExecutor() {}
void reset();
virtual int init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config) override;
virtual int execute() override;
private:
int64_t obj_size_;
int64_t part_size_;
char *write_buf_;
ObArenaAllocator allocator_;
};
class ReadTaskExecutor : public ITaskExecutor
{
public:
ReadTaskExecutor();
virtual ~ReadTaskExecutor() {}
void reset();
virtual int init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config) override;
virtual int execute() override;
private:
int64_t obj_size_;
int64_t obj_num_;
bool is_adaptive_;
int64_t expected_read_size_;
char *read_buf_;
ObArenaAllocator allocator_;
private:
static const int64_t ALIGNMENT = 16 * 1024;
};
class DelTaskExecutor : public ITaskExecutor
{
public:
DelTaskExecutor();
virtual ~DelTaskExecutor() {}
virtual int init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config) override;
virtual int execute() override;
private:
bool is_adaptive_;
};
} //namespace tools
} //namespace oceanbase
#endif // OB_ADMIN_BENCHMARK_TASK_EXECUTOR_H_

View File

@ -29,6 +29,7 @@
#include "log_tool/ob_admin_log_tool_executor.h"
#include "slog_tool/ob_admin_slog_executor.h"
#include "dump_ckpt/ob_admin_dump_ckpt_executor.h"
#include "io_bench/ob_admin_io_adapter_bench.h"
#include "lib/utility/ob_print_utils.h"
using namespace oceanbase::common;
@ -143,6 +144,8 @@ int main(int argc, char *argv[])
executor = new ObAdminSlogExecutor();
} else if (0 == strcmp("dump_ckpt", argv[1])) {
executor = new ObAdminDumpCkptExecutor();
} else if (0 == strcmp("io_adapter_benchmark", argv[1])) {
executor = new ObAdminIOAdapterBenchmarkExecutor();
} else if (0 == strncmp("-h", argv[1], 2) || 0 == strncmp("-S", argv[1], 2)) {
executor = new ObAdminServerExecutor();
} else {