Files
oceanbase/tools/ob_admin/io_bench/task_executor.cpp

785 lines
27 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "share/backup/ob_backup_io_adapter.h"
#include "task_executor.h"
#include "../dumpsst/ob_admin_dumpsst_print_helper.h"
using namespace oceanbase::share;
using namespace oceanbase::common;
namespace oceanbase
{
namespace tools
{
/*--------------------------------TaskConfig--------------------------------*/
TaskConfig::TaskConfig()
: thread_num_(-1), max_task_runs_(-1), time_limit_s_(-1),
obj_size_(-1), obj_num_(-1), fragment_size_(-1), is_adaptive_(false),
type_(BenchmarkTaskType::BENCHMARK_TASK_MAX_TYPE)
{
}
int TaskConfig::assign(const TaskConfig &other)
{
int ret = OB_SUCCESS;
thread_num_ = other.thread_num_;
max_task_runs_ = other.max_task_runs_;
time_limit_s_ = other.time_limit_s_;
obj_size_ = other.obj_size_;
obj_num_ = other.obj_num_;
fragment_size_ = other.fragment_size_;
is_adaptive_ = other.is_adaptive_;
type_ = other.type_;
return ret;
}
/*--------------------------------Metrics--------------------------------*/
TimeMap::TimeMap() : total_entry_(0)
{
}
int TimeMap::log_entry(const int64_t start_time_us)
{
int ret = OB_SUCCESS;
const int64_t cost_time_ms = (ObTimeUtility::current_time() - start_time_us) / 1000;
time_ms_map_[cost_time_ms]++;
total_entry_++;
return ret;
}
int TimeMap::add(const TimeMap &other)
{
int ret = OB_SUCCESS;
std::map<int64_t, int64_t>::const_iterator it = other.time_ms_map_.begin();
while (OB_SUCC(ret) && it != other.time_ms_map_.end()) {
time_ms_map_[it->first] += it->second;
++it;
}
if (OB_SUCC(ret)) {
total_entry_ += other.total_entry_;
}
return ret;
}
void TimeMap::summary(const char *map_name_str)
{
const char *map_name = "Anonymous Time Map";
if (OB_NOT_NULL(map_name_str)) {
map_name = map_name_str;
}
if (total_entry_ <= 0) {
PrintHelper::print_dump_line(map_name, "Empty Time Map");
} else {
const int64_t th_50_count = total_entry_ * 0.5;
const int64_t th_90_count = total_entry_ * 0.9;
const int64_t th_99_count = total_entry_ * 0.99;
const int64_t th_999_count = total_entry_ * 0.999;
int64_t min_ms = 0;
int64_t th_50_ms = 0;
int64_t th_90_ms = 0;
int64_t th_99_ms = 0;
int64_t th_999_ms = 0;
int64_t cur_count = 0;
int64_t max_ms = 0;
std::map<int64_t, int64_t>::const_iterator it = time_ms_map_.begin();
min_ms = it->first;
while (it != time_ms_map_.end()) {
cur_count += it->second;
if (th_50_ms == 0 && cur_count >= th_50_count) {
th_50_ms = it->first;
}
if (th_90_ms == 0 && cur_count >= th_90_count) {
th_90_ms = it->first;
}
if (th_99_ms == 0 && cur_count >= th_99_count) {
th_99_ms = it->first;
}
if (th_999_ms == 0 && cur_count >= th_999_count) {
th_999_ms = it->first;
}
if (max_ms == 0 && cur_count == total_entry_) {
max_ms = it->first;
}
++it;
}
char buf[2048];
int ret = OB_SUCCESS;
if (OB_FAIL(databuff_printf(buf, sizeof(buf),
"total_entry=%ld, min_ms=%ld, th_50_ms=%ld, th_90_ms=%ld, th_99_ms=%ld, th_999_ms=%ld, max_ms=%ld",
total_entry_, min_ms, th_50_ms, th_90_ms,th_99_ms, th_999_ms, max_ms))) {
OB_LOG(WARN, "fail to set log str", K(ret), K_(total_entry),
K(min_ms), K(th_50_ms), K(th_90_ms), K(th_99_ms), K(th_999_ms), K(max_ms));
} else {
OB_LOG(INFO, "time map status", K(ret), K_(total_entry),
K(min_ms), K(th_50_ms), K(th_90_ms), K(th_99_ms), K(th_999_ms), K(max_ms));
PrintHelper::print_dump_line(map_name, buf);
}
}
}
int TimeMap::assign(const TimeMap &other)
{
int ret = OB_SUCCESS;
total_entry_ = other.total_entry_;
time_ms_map_ = other.time_ms_map_;
return ret;
}
Metrics::Metrics()
: status_(OB_SUCCESS), throughput_bytes_(0), operation_num_(0),
total_op_time_ms_map_(), open_time_ms_map_(), close_time_ms_map_()
{
}
int Metrics::assign(const Metrics &other)
{
int ret = OB_SUCCESS;
if (OB_FAIL(total_op_time_ms_map_.assign(other.total_op_time_ms_map_))) {
OB_LOG(WARN, "fail to assign total_op_time_ms_map", K(ret));
} else if (OB_FAIL(open_time_ms_map_.assign(other.open_time_ms_map_))) {
OB_LOG(WARN, "fail to assign open_time_ms_map", K(ret));
} else if (OB_FAIL(close_time_ms_map_.assign(other.close_time_ms_map_))) {
OB_LOG(WARN, "fail to assign close_time_ms_map", K(ret));
} else {
status_ = other.status_;
throughput_bytes_ = other.throughput_bytes_;
operation_num_ = other.operation_num_;
}
return ret;
}
int Metrics::add(const Metrics &other)
{
int ret = OB_SUCCESS;
if (OB_SUCC(status_)) {
if (OB_SUCC(other.status_)) {
if (OB_FAIL(total_op_time_ms_map_.add(other.total_op_time_ms_map_))) {
OB_LOG(WARN, "fail to add total_op_time_ms_map", K(ret));
} else if (OB_FAIL(open_time_ms_map_.add(other.open_time_ms_map_))) {
OB_LOG(WARN, "fail to add open_time_ms_map", K(ret));
} else if (OB_FAIL(close_time_ms_map_.add(other.close_time_ms_map_))) {
OB_LOG(WARN, "fail to add close_time_ms_map", K(ret));
} else {
throughput_bytes_ += other.throughput_bytes_;
operation_num_ += other.operation_num_;
}
} else {
status_ = other.status_;
}
}
return ret;
}
static double cal_time_diff(const timeval& start, const timeval& end)
{
return (end.tv_sec - start.tv_sec) + (end.tv_usec - start.tv_usec) / 1e6;
}
void Metrics::summary(
const struct timeval &start_real_time,
const struct rusage &start_usage,
const int64_t thread_num)
{
PrintHelper::print_dump_line("Status", status_ == OB_SUCCESS ? "SUCCESS" : "FAIL");
if (OB_LIKELY(status_ == OB_SUCCESS)) {
if (OB_UNLIKELY(operation_num_ <= 0)) {
PrintHelper::print_dump_line("Operation num is unexpected", operation_num_);
} else {
struct rusage end_usage;
struct timeval end_real_time;
getrusage(RUSAGE_SELF, &end_usage);
gettimeofday(&end_real_time, nullptr);
const double cost_time_s = cal_time_diff(start_real_time, end_real_time);
const double user_cpu_time_s = cal_time_diff(start_usage.ru_utime, end_usage.ru_utime);
const double sys_cpu_time_s = cal_time_diff(start_usage.ru_stime, end_usage.ru_stime);
const double cpu_usage = (user_cpu_time_s + sys_cpu_time_s) / (cost_time_s) * 100.0;
const double QPS = ((double)operation_num_) / cost_time_s;
const double BW = ( ((double)throughput_bytes_) / 1024.0 / 1024.0 ) / cost_time_s;
PrintHelper::print_dump_line("Total operation num", operation_num_);
PrintHelper::print_dump_line("Total execution time", (std::to_string(cost_time_s) + " s").c_str());
PrintHelper::print_dump_line("Total user time", (std::to_string(user_cpu_time_s) + " s").c_str());
PrintHelper::print_dump_line("Total system time", (std::to_string(sys_cpu_time_s) + " s").c_str());
if (BW > 1e-6) {
const double cpu_usage_for_100MB_bw = (100.0 / BW) * cpu_usage;
PrintHelper::print_dump_line("CPU usage for 100MB/s BW",
(std::to_string(cpu_usage_for_100MB_bw) + "% per 100MB/s").c_str());
} else {
PrintHelper::print_dump_line("Total CPU usage", (std::to_string(cpu_usage) + "%").c_str());
}
PrintHelper::print_dump_line("Total throughput bytes", throughput_bytes_);
PrintHelper::print_dump_line("Total QPS", std::to_string(QPS).c_str());
PrintHelper::print_dump_line("Per Thread QPS", std::to_string(QPS / thread_num).c_str());
PrintHelper::print_dump_line("Total BW", (std::to_string(BW) + " MB/s").c_str());
PrintHelper::print_dump_line("Per Thread BW", (std::to_string(BW / thread_num) + " MB/s").c_str());
total_op_time_ms_map_.summary("Total Op Time Map");
open_time_ms_map_.summary("Open Time Map");
close_time_ms_map_.summary("Close Op Time Map");
}
}
}
/*--------------------------------ITaskExecutor--------------------------------*/
ITaskExecutor::ITaskExecutor()
: is_inited_(false), base_uri_len_(-1), storage_info_(nullptr), metrics_()
{
base_uri_[0] = '\0';
}
void ITaskExecutor::reset()
{
is_inited_ = false;
base_uri_len_ = -1;
storage_info_ = nullptr;
base_uri_[0] = '\0';
}
int ITaskExecutor::init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
OB_LOG(WARN, "Task Executor init twice", K(ret));
} else if (OB_ISNULL(base_uri) || OB_ISNULL(storage_info)
|| OB_UNLIKELY(!storage_info->is_valid())) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid arguments", K(ret), K(base_uri), KPC(storage_info));
} else if (OB_FAIL(databuff_printf(base_uri_, sizeof(base_uri_), "%s", base_uri))) {
OB_LOG(WARN, "fail to deep copy base uri", K(ret), K(base_uri));
} else {
base_uri_len_ = strlen(base_uri_);
storage_info_ = storage_info;
is_inited_ = true;
}
return ret;
}
int ITaskExecutor::prepare_(const int64_t object_id)
{
int ret = OB_SUCCESS;
int64_t pos = base_uri_len_;
if (OB_FAIL(databuff_printf(base_uri_, sizeof(base_uri_), pos, "/%ld", object_id))) {
OB_LOG(WARN, "fail to construct object name", K(ret), K_(base_uri), K(object_id));
}
return ret;
}
void ITaskExecutor::finish_(const int64_t ob_errcode)
{
metrics_.status_ = ob_errcode;
base_uri_[base_uri_len_] = '\0';
}
template<typename Executor>
typename std::enable_if<std::is_base_of<ITaskExecutor, Executor>::value, int>::type
alloc_executor(ITaskExecutor *&executor, ObMemAttr &attr)
{
int ret = OB_SUCCESS;
void *buf = nullptr;
if (OB_ISNULL(buf = ob_malloc(sizeof(Executor), attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN,"fail to alloc buf for task executor", K(ret));
} else {
executor = new(buf) Executor();
}
return ret;
}
int init_task_executor(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config, ITaskExecutor *&executor)
{
int ret = OB_SUCCESS;
executor = nullptr;
void *buf = nullptr;
ObMemAttr attr;
attr.label_ = "TMPSTR";
if (OB_ISNULL(base_uri) || OB_ISNULL(storage_info)
|| OB_UNLIKELY(!storage_info->is_valid() || config.type_ == BenchmarkTaskType::BENCHMARK_TASK_MAX_TYPE)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid arguments", K(ret), K(base_uri), KPC(storage_info), K(config));
} else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_NORMAL_WRITE) {
if (OB_FAIL(alloc_executor<WriteTaskExecutor>(executor, attr))) {
OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config));
}
} else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_READ) {
if (OB_FAIL(alloc_executor<ReadTaskExecutor>(executor, attr))) {
OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config));
}
} else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_APPEND_WRITE) {
if (OB_FAIL(alloc_executor<AppendWriteTaskExecutor>(executor, attr))) {
OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config));
}
} else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_MULTIPART_WRITE) {
if (OB_FAIL(alloc_executor<MultipartWriteTaskExecutor>(executor, attr))) {
OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config));
}
} else if (config.type_ == BenchmarkTaskType::BENCHMARK_TASK_DEL) {
if (OB_FAIL(alloc_executor<DelTaskExecutor>(executor, attr))) {
OB_LOG(WARN, "fail to alloc and construct executor", K(ret), K(config));
}
} else {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "unknown benchmark task type", K(ret), K(config));
}
if (FAILEDx(executor->init(base_uri, storage_info, config))) {
OB_LOG(WARN, "fail to init executor", K(ret), K(base_uri), KPC(storage_info), K(config));
}
if (OB_FAIL(ret)) {
if (OB_NOT_NULL(executor)) {
executor->~ITaskExecutor();
ob_free(executor);
executor = nullptr;
}
}
return ret;
}
/*--------------------------------Write Task Executor--------------------------------*/
static const int64_t MAX_RANDOM_CONTENT_LEN = 128 * 1024 * 1024L;
static char RANDOM_CONTENT[MAX_RANDOM_CONTENT_LEN + 1];
void init_random_content()
{
static bool is_inited = false;
if (!is_inited) {
static const char alphanum[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
for (int64_t i = 0; i < MAX_RANDOM_CONTENT_LEN; i++) {
RANDOM_CONTENT[i] = alphanum[ObRandom::rand(0, sizeof(alphanum) - 2)];
}
RANDOM_CONTENT[MAX_RANDOM_CONTENT_LEN] = '\0';
is_inited = true;
}
}
WriteTaskExecutor::WriteTaskExecutor()
: ITaskExecutor(), obj_size_(-1), write_buf_(nullptr), allocator_()
{
}
void WriteTaskExecutor::reset()
{
obj_size_ = -1;
write_buf_ = nullptr;
allocator_.clear();
ITaskExecutor::reset();
}
int WriteTaskExecutor::init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) {
OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config));
} else if (OB_UNLIKELY(config.obj_size_ <= 0 || config.obj_size_ >= MAX_RANDOM_CONTENT_LEN)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid arguments", K(ret), K(config));
} else if (FALSE_IT(obj_size_ = config.obj_size_)) {
} else if (OB_ISNULL(write_buf_ = (char *)allocator_.alloc(obj_size_ + 1))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to alloc memory for write buf", K(ret), K_(obj_size));
} else {
is_inited_ = true;
}
if (OB_FAIL(ret)) {
reset();
}
return ret;
}
int WriteTaskExecutor::execute()
{
int ret = OB_SUCCESS;
const int64_t object_id = metrics_.operation_num_;
const int64_t start_time_us = ObTimeUtility::current_time();
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "WriteTaskExecutor not init", K(ret), K_(base_uri));
} else if (OB_FAIL(prepare_(object_id))) {
OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id));
} else {
ObBackupIoAdapter adapter;
MEMCPY(write_buf_,
RANDOM_CONTENT + ObRandom::rand(0, MAX_RANDOM_CONTENT_LEN - obj_size_),
obj_size_);
write_buf_[obj_size_] = '\0';
if (OB_FAIL(adapter.write_single_file(base_uri_, storage_info_, write_buf_, obj_size_))) {
OB_LOG(WARN, "fail to write file",
K(ret), K_(base_uri), KPC_(storage_info), K_(obj_size), K(object_id));
} else {
metrics_.operation_num_++;
metrics_.throughput_bytes_ += obj_size_;
metrics_.total_op_time_ms_map_.log_entry(start_time_us);
}
}
finish_(ret);
return ret;
}
AppendWriteTaskExecutor::AppendWriteTaskExecutor()
: ITaskExecutor(), obj_size_(-1), fragment_size_(-1), write_buf_(nullptr), allocator_()
{
}
void AppendWriteTaskExecutor::reset()
{
obj_size_ = -1;
fragment_size_ = -1;
write_buf_ = nullptr;
allocator_.clear();
ITaskExecutor::reset();
}
int AppendWriteTaskExecutor::init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) {
OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config));
} else if (OB_UNLIKELY(config.fragment_size_ <= 0 || config.fragment_size_ > config.obj_size_
|| config.fragment_size_ >= MAX_RANDOM_CONTENT_LEN)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid arguments", K(ret), K(config));
} else if (FALSE_IT(obj_size_ = config.obj_size_)) {
} else if (FALSE_IT(fragment_size_ = config.fragment_size_)) {
} else if (OB_ISNULL(write_buf_ = (char *)allocator_.alloc(fragment_size_ + 1))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to alloc memory for write buf", K(ret), K_(fragment_size));
} else {
is_inited_ = true;
}
if (OB_FAIL(ret)) {
reset();
}
return ret;
}
int AppendWriteTaskExecutor::execute()
{
int ret = OB_SUCCESS;
const int64_t object_id = metrics_.operation_num_;
const int64_t start_time_us = ObTimeUtility::current_time();
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "AppendWriteTaskExecutor not init", K(ret), K_(base_uri));
} else if (OB_FAIL(prepare_(object_id))) {
OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id));
} else {
ObBackupIoAdapter adapter;
ObIOFd fd;
ObIODevice *device_handle = nullptr;
ObStorageAccessType access_type = OB_STORAGE_ACCESS_RANDOMWRITER;
const int64_t open_start_time_us = ObTimeUtility::current_time();
if (OB_FAIL(adapter.open_with_access_type(
device_handle, fd, storage_info_, base_uri_, access_type))) {
OB_LOG(WARN, "failed to open device with access type",
K(ret), K_(base_uri), KPC_(storage_info), K(access_type));
} else if (OB_ISNULL(device_handle)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "device handle is NULL", K(ret), KP(device_handle), K_(base_uri));
} else {
metrics_.open_time_ms_map_.log_entry(open_start_time_us);
}
int64_t cur_offset = 0;
int64_t actual_write_size = -1;
int64_t cur_append_size = -1;
while (OB_SUCC(ret) && cur_offset < obj_size_) {
cur_append_size = MIN(obj_size_ - cur_offset, fragment_size_);
MEMCPY(write_buf_,
RANDOM_CONTENT + ObRandom::rand(0, MAX_RANDOM_CONTENT_LEN - cur_append_size),
cur_append_size);
write_buf_[cur_append_size] = '\0';
if (OB_FAIL(device_handle->pwrite(fd, cur_offset, cur_append_size,
write_buf_, actual_write_size))) {
OB_LOG(WARN, "fail to append object",
K(ret), K_(base_uri), K(cur_offset), K(cur_append_size));
} else {
cur_offset += cur_append_size;
}
}
const int64_t close_start_time_us = ObTimeUtility::current_time();
if (FAILEDx(device_handle->seal_file(fd))) {
OB_LOG(WARN, "fail to seal file", K(ret), K_(base_uri));
} else if (OB_FAIL(adapter.close_device_and_fd(device_handle, fd))) {
OB_LOG(WARN, "fail to close device handle", K(ret), K_(base_uri));
} else {
metrics_.close_time_ms_map_.log_entry(close_start_time_us);
}
if (OB_SUCC(ret)) {
metrics_.operation_num_++;
metrics_.throughput_bytes_ += obj_size_;
metrics_.total_op_time_ms_map_.log_entry(start_time_us);
}
}
finish_(ret);
return ret;
}
MultipartWriteTaskExecutor::MultipartWriteTaskExecutor()
: ITaskExecutor(), obj_size_(-1), part_size_(-1), write_buf_(nullptr), allocator_()
{
}
void MultipartWriteTaskExecutor::reset()
{
obj_size_ = -1;
part_size_ = -1;
write_buf_ = nullptr;
allocator_.clear();
ITaskExecutor::reset();
}
int MultipartWriteTaskExecutor::init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) {
OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config));
} else if (OB_UNLIKELY(config.fragment_size_ <= 0 || config.fragment_size_ > config.obj_size_
|| config.fragment_size_ >= MAX_RANDOM_CONTENT_LEN)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid arguments", K(ret), K(config));
} else if (FALSE_IT(obj_size_ = config.obj_size_)) {
} else if (FALSE_IT(part_size_ = config.fragment_size_)) {
} else if (OB_ISNULL(write_buf_ = (char *)allocator_.alloc(part_size_ + 1))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to alloc memory for write buf", K(ret), K_(part_size));
} else {
is_inited_ = true;
}
if (OB_FAIL(ret)) {
reset();
}
return ret;
}
int MultipartWriteTaskExecutor::execute()
{
int ret = OB_SUCCESS;
const int64_t object_id = metrics_.operation_num_;
const int64_t start_time_us = ObTimeUtility::current_time();
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "MultipartWriteTaskExecutor not init", K(ret), K_(base_uri));
} else if (OB_FAIL(prepare_(object_id))) {
OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id));
} else {
ObBackupIoAdapter adapter;
ObIOFd fd;
ObIODevice *device_handle = nullptr;
ObStorageAccessType access_type = OB_STORAGE_ACCESS_MULTIPART_WRITER;
const int64_t open_start_time_us = ObTimeUtility::current_time();
if (OB_FAIL(adapter.open_with_access_type(
device_handle, fd, storage_info_, base_uri_, access_type))) {
OB_LOG(WARN, "failed to open device with access type",
K(ret), K_(base_uri), KPC_(storage_info), K(access_type));
} else if (OB_ISNULL(device_handle)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "device handle is NULL", K(ret), KP(device_handle), K_(base_uri));
} else {
metrics_.open_time_ms_map_.log_entry(open_start_time_us);
}
int64_t cur_offset = 0;
int64_t actual_write_size = -1;
int64_t cur_part_size = -1;
while (OB_SUCC(ret) && cur_offset < obj_size_) {
cur_part_size = MIN(obj_size_ - cur_offset, part_size_);
MEMCPY(write_buf_,
RANDOM_CONTENT + ObRandom::rand(0, MAX_RANDOM_CONTENT_LEN - cur_part_size),
cur_part_size);
write_buf_[cur_part_size] = '\0';
if (OB_FAIL(device_handle->pwrite(fd, cur_offset, cur_part_size,
write_buf_, actual_write_size))) {
OB_LOG(WARN, "fail to upload part",
K(ret), K_(base_uri), K(cur_offset), K(cur_part_size));
} else {
cur_offset += cur_part_size;
}
}
const int64_t close_start_time_us = ObTimeUtility::current_time();
if (FAILEDx(adapter.close_device_and_fd(device_handle, fd))) {
OB_LOG(WARN, "fail to close device handle", K(ret), K_(base_uri));
} else {
metrics_.close_time_ms_map_.log_entry(close_start_time_us);
}
if (OB_SUCC(ret)) {
metrics_.operation_num_++;
metrics_.throughput_bytes_ += obj_size_;
metrics_.total_op_time_ms_map_.log_entry(start_time_us);
}
}
finish_(ret);
return ret;
}
/*--------------------------------Read Task Executor--------------------------------*/
ReadTaskExecutor::ReadTaskExecutor()
: ITaskExecutor(), obj_size_(-1), obj_num_(-1), is_adaptive_(false),
expected_read_size_(-1), read_buf_(nullptr), allocator_()
{
}
void ReadTaskExecutor::reset()
{
obj_size_ = -1;
obj_num_ = -1;
is_adaptive_ = false;
expected_read_size_ = -1;
read_buf_ = nullptr;
allocator_.clear();
ITaskExecutor::reset();
}
int ReadTaskExecutor::init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) {
OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config));
} else if (OB_UNLIKELY(config.fragment_size_ <= 0 || config.fragment_size_ > config.obj_size_
|| config.obj_num_ <= 0)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid arguments", K(ret), K(config));
} else if (FALSE_IT(obj_size_ = config.obj_size_)) {
} else if (FALSE_IT(expected_read_size_ = config.fragment_size_)) {
} else if (OB_ISNULL(read_buf_ = (char *)allocator_.alloc(expected_read_size_ + 1))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to alloc memory for read buf", K(ret), K_(obj_size), K_(expected_read_size));
} else {
obj_num_ = config.obj_num_;
is_adaptive_ = config.is_adaptive_;
is_inited_ = true;
}
if (OB_FAIL(ret)) {
reset();
}
return ret;
}
int ReadTaskExecutor::execute()
{
int ret = OB_SUCCESS;
const int64_t object_id = ObRandom::rand(0, obj_num_ - 1);
const int64_t start_time_us = ObTimeUtility::current_time();
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "ReadTaskExecutor not init", K(ret), K_(base_uri));
} else if (OB_FAIL(prepare_(object_id))) {
OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id));
} else {
ObBackupIoAdapter adapter;
const int64_t offset = (ObRandom::rand(0, obj_size_ - expected_read_size_) / ALIGNMENT) * ALIGNMENT;
int64_t read_size = -1;
if (is_adaptive_ && OB_FAIL(adapter.adaptively_read_part_file(base_uri_,
storage_info_, read_buf_, expected_read_size_, offset, read_size))) {
OB_LOG(WARN, "fail to read adaptive file", K(ret), K_(base_uri),
KPC_(storage_info), K_(expected_read_size), K_(obj_size), K(offset), K(object_id));
} else if (!is_adaptive_ && OB_FAIL(adapter.read_part_file(base_uri_,
storage_info_, read_buf_, expected_read_size_, offset, read_size))) {
OB_LOG(WARN, "fail to read file", K(ret), K_(base_uri),
KPC_(storage_info), K_(expected_read_size), K_(obj_size), K(offset), K(object_id));
} else {
metrics_.operation_num_++;
metrics_.throughput_bytes_ += read_size;
metrics_.total_op_time_ms_map_.log_entry(start_time_us);
}
}
finish_(ret);
return ret;
}
/*--------------------------------Del Task Executor--------------------------------*/
DelTaskExecutor::DelTaskExecutor()
: ITaskExecutor(), is_adaptive_(false)
{
}
int DelTaskExecutor::init(const char *base_uri,
share::ObBackupStorageInfo *storage_info, const TaskConfig &config)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ITaskExecutor::init(base_uri, storage_info, config))) {
OB_LOG(WARN, "fail to init ITaskExecutor", K(ret), K(base_uri), KPC(storage_info), K(config));
} else {
is_adaptive_ = config.is_adaptive_;
is_inited_ = true;
}
if (OB_FAIL(ret)) {
reset();
}
return ret;
}
int DelTaskExecutor::execute()
{
int ret = OB_SUCCESS;
const int64_t object_id = metrics_.operation_num_;
const int64_t start_time_us = ObTimeUtility::current_time();
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "DelTaskExecutor not init", K(ret), K_(base_uri));
} else if (OB_FAIL(prepare_(object_id))) {
OB_LOG(WARN, "fail to prepare", K(ret), K_(base_uri), K(object_id));
} else {
ObBackupIoAdapter adapter;
if (is_adaptive_ && OB_FAIL(adapter.adaptively_del_file(base_uri_, storage_info_))) {
OB_LOG(WARN, "fail to delete adaptive file",
K(ret), K_(base_uri), KPC_(storage_info), K(object_id));
} else if (!is_adaptive_ && OB_FAIL(adapter.del_file(base_uri_, storage_info_))) {
OB_LOG(WARN, "fail to delete file", K(ret), K_(base_uri), KPC_(storage_info), K(object_id));
} else {
metrics_.operation_num_++;
metrics_.total_op_time_ms_map_.log_entry(start_time_us);
}
}
finish_(ret);
return ret;
}
} //tools
} //oceanbase