771 lines
25 KiB
C++
771 lines
25 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#ifndef TEST_IO_PERFORMANCE_H
|
|
#define TEST_IO_PERFORMANCE_H
|
|
#include "lib/io/ob_io_manager.h"
|
|
#include "lib/file/file_directory_utils.h"
|
|
#include "common/ob_cost_consts_def.h"
|
|
|
|
namespace oceanbase {
|
|
namespace common {
|
|
|
|
struct IOWorkload {
|
|
public:
|
|
IOWorkload() : category_(USER_IO), size_(0), iops_(0), depth_(0), is_sequence_(0)
|
|
{}
|
|
bool is_valid() const
|
|
{
|
|
return category_ < MAX_IO_CATEGORY && size_ > 0 && iops_ > 0 && depth_ > 0;
|
|
}
|
|
TO_STRING_KV(K_(category), K_(size), K_(iops), K_(depth), K_(is_sequence))
|
|
|
|
public:
|
|
ObIOCategory category_;
|
|
int32_t size_;
|
|
int64_t iops_;
|
|
int32_t depth_;
|
|
bool is_sequence_;
|
|
};
|
|
|
|
class IOInfoGenerator {
|
|
friend struct IORunContext;
|
|
|
|
public:
|
|
IOInfoGenerator() : inited_(false), mode_(IO_MODE_READ), io_category_(USER_IO), io_size_(0), is_sequence_(false)
|
|
{}
|
|
virtual ~IOInfoGenerator()
|
|
{}
|
|
bool is_inited() const
|
|
{
|
|
return inited_;
|
|
}
|
|
virtual int get_io_info(const char* buf, ObIOInfo& info) = 0;
|
|
TO_STRING_KV(K_(inited), K_(mode), K_(io_category), K_(io_size), K_(is_sequence));
|
|
|
|
protected:
|
|
bool inited_;
|
|
ObIOMode mode_;
|
|
ObIOCategory io_category_;
|
|
int32_t io_size_;
|
|
bool is_sequence_;
|
|
};
|
|
|
|
struct IORunContext {
|
|
public:
|
|
IORunContext() : thread_cnt_(0), generator_(NULL), succ_cnt_(0), fail_cnt_(0), sum_rt_(0)
|
|
{}
|
|
TO_STRING_KV(K_(thread_cnt), K_(workload), K_(succ_cnt), K_(fail_cnt), KP_(generator), K_(sum_rt));
|
|
bool is_valid() const
|
|
{
|
|
bool bret = thread_cnt_ >= 0 && succ_cnt_ >= 0 && fail_cnt_ >= 0 && sum_rt_ >= 0 && OB_NOT_NULL(generator_) &&
|
|
workload_.is_valid() && generator_->is_inited() && generator_->io_category_ == workload_.category_ &&
|
|
generator_->io_size_ == workload_.size_ && generator_->is_sequence_ == workload_.is_sequence_;
|
|
return bret;
|
|
}
|
|
|
|
public:
|
|
int64_t thread_cnt_;
|
|
IOWorkload workload_;
|
|
IOInfoGenerator* generator_;
|
|
int64_t succ_cnt_;
|
|
int64_t fail_cnt_;
|
|
int64_t sum_rt_;
|
|
};
|
|
|
|
class IORunner : public lib::ThreadPool {
|
|
public:
|
|
IORunner() : inited_(0), duration_ms_(0), begin_time_(0), end_time_(0){};
|
|
~IORunner()
|
|
{}
|
|
int init(const IORunContext& read_ctx, const IORunContext& write_ctx, const int64_t duration_ms);
|
|
int run_workload();
|
|
bool is_inited() const
|
|
{
|
|
return inited_;
|
|
}
|
|
TO_STRING_KV(K_(inited), K_(duration_ms), K_(read_ctx), K_(write_ctx), K_(begin_time), K_(end_time));
|
|
|
|
private:
|
|
void run1() override;
|
|
int run_read();
|
|
int run_write(const char* data);
|
|
void print_result();
|
|
inline int64_t get_wait_interval(const int64_t iops, const int64_t thread_cnt, const int32_t io_depth);
|
|
|
|
private:
|
|
bool inited_;
|
|
int64_t duration_ms_;
|
|
IORunContext read_ctx_;
|
|
IORunContext write_ctx_;
|
|
int64_t begin_time_;
|
|
int64_t end_time_;
|
|
};
|
|
|
|
class SingleIOInfoGenerator : public IOInfoGenerator {
|
|
public:
|
|
SingleIOInfoGenerator() : fd_(), file_size_(0), last_offset_(0)
|
|
{}
|
|
virtual ~SingleIOInfoGenerator()
|
|
{}
|
|
int init(const ObIOMode mode, const IOWorkload& workload, const ObDiskFd& fd, const int64_t file_size);
|
|
virtual int get_io_info(const char* buf, ObIOInfo& info) override;
|
|
|
|
private:
|
|
ObDiskFd fd_;
|
|
int64_t file_size_;
|
|
int64_t last_offset_;
|
|
};
|
|
|
|
class MultiIOInfoGenerator : public IOInfoGenerator {
|
|
public:
|
|
MultiIOInfoGenerator() : file_size_(0), last_offset_(0)
|
|
{}
|
|
virtual ~MultiIOInfoGenerator()
|
|
{}
|
|
int init(const ObIOMode mode, const IOWorkload& workload, const ObArray<ObDiskFd>& fds, const int64_t file_size);
|
|
virtual int get_io_info(const char* buf, ObIOInfo& info) override;
|
|
|
|
private:
|
|
ObArray<ObDiskFd> fds_;
|
|
int64_t file_size_;
|
|
int64_t last_offset_;
|
|
};
|
|
|
|
class IOStress {
|
|
public:
|
|
IOStress() : inited_(false), file_ready_(false), file_size_(0)
|
|
{}
|
|
~IOStress();
|
|
|
|
int init(const char* config_file_path);
|
|
int run();
|
|
int prepare_files(const int64_t file_num, const int64_t file_size);
|
|
|
|
private:
|
|
int prepare_one_file(const char* file_path, const int64_t file_size, int& fd);
|
|
void destroy_files();
|
|
int load_io_runner(FILE* config_file, IORunner& runner);
|
|
int get_info_generator(
|
|
const ObIOMode mode, const bool multi_disk, const IOWorkload& workload, IOInfoGenerator*& generator);
|
|
|
|
private:
|
|
bool inited_;
|
|
bool file_ready_;
|
|
int64_t file_size_;
|
|
ObArray<ObDiskFd> fds_;
|
|
ObArray<IORunner*> runners_;
|
|
};
|
|
|
|
/**
|
|
* ---------------------------------- IORunner -------------------------------
|
|
*/
|
|
int IORunner::init(const IORunContext& read_ctx, const IORunContext& write_ctx, const int64_t duration_ms)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (inited_) {
|
|
ret = OB_INIT_TWICE;
|
|
COMMON_LOG(WARN, "init twice", K(ret));
|
|
} else if ((!read_ctx.is_valid() && !write_ctx.is_valid()) ||
|
|
duration_ms <= 0) { // require at least one ctx is valid
|
|
ret = OB_INVALID_ARGUMENT;
|
|
COMMON_LOG(WARN, "invalid argument", K(ret), K(read_ctx), K(write_ctx), K(duration_ms));
|
|
} else {
|
|
read_ctx_ = read_ctx;
|
|
write_ctx_ = write_ctx;
|
|
duration_ms_ = duration_ms;
|
|
set_thread_count(read_ctx_.thread_cnt_ + write_ctx_.thread_cnt_);
|
|
inited_ = true;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
void IORunner::run1()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
const int64_t thread_id = get_thread_idx();
|
|
if (!inited_) {
|
|
ret = OB_NOT_INIT;
|
|
COMMON_LOG(WARN, "not init", K(ret), K(thread_id));
|
|
} else if (thread_id < read_ctx_.thread_cnt_) {
|
|
while (!has_set_stop() && read_ctx_.is_valid()) {
|
|
const int64_t begin_time = ObTimeUtility::current_time();
|
|
if (OB_FAIL(run_read())) {
|
|
COMMON_LOG(WARN, "fail to run read", K(ret));
|
|
}
|
|
const int64_t cost_time = ObTimeUtility::current_time() - begin_time;
|
|
const int64_t interval =
|
|
get_wait_interval(read_ctx_.workload_.iops_, read_ctx_.thread_cnt_, read_ctx_.workload_.depth_);
|
|
if (cost_time < interval) {
|
|
usleep(static_cast<int>(interval - cost_time));
|
|
}
|
|
}
|
|
} else if (thread_id < read_ctx_.thread_cnt_ + write_ctx_.thread_cnt_) {
|
|
while (!has_set_stop() && write_ctx_.is_valid()) {
|
|
char* data = new char[write_ctx_.workload_.size_];
|
|
if (OB_ISNULL(data)) {
|
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
|
COMMON_LOG(WARN, "fail to allocate write buf", K(ret), "data_size", write_ctx_.workload_.size_);
|
|
} else {
|
|
const int64_t begin_time = ObTimeUtility::current_time();
|
|
if (OB_FAIL(run_write(data))) {
|
|
COMMON_LOG(WARN, "fail to run write", K(ret));
|
|
}
|
|
const int64_t interval =
|
|
get_wait_interval(write_ctx_.workload_.iops_, write_ctx_.thread_cnt_, write_ctx_.workload_.depth_);
|
|
const int64_t cost_time = ObTimeUtility::current_time() - begin_time;
|
|
if (cost_time < interval) {
|
|
usleep(static_cast<int>(interval - cost_time));
|
|
}
|
|
}
|
|
if (OB_NOT_NULL(data)) {
|
|
delete[] data;
|
|
data = NULL;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void IORunner::print_result()
|
|
{
|
|
if (end_time_ <= begin_time_) {
|
|
COMMON_LOG(WARN, "wrong time", K(begin_time_), K(end_time_));
|
|
} else {
|
|
COMMON_LOG(INFO,
|
|
"Read Result",
|
|
"workload",
|
|
read_ctx_.workload_,
|
|
"succ_cnt",
|
|
read_ctx_.succ_cnt_,
|
|
"fail_cnt",
|
|
read_ctx_.fail_cnt_,
|
|
"time_ms",
|
|
(end_time_ - begin_time_) / 1000,
|
|
"iops",
|
|
read_ctx_.succ_cnt_ * 1000000L / (end_time_ - begin_time_),
|
|
"rt",
|
|
read_ctx_.sum_rt_ / (read_ctx_.succ_cnt_ > 0 ? read_ctx_.succ_cnt_ : 1));
|
|
COMMON_LOG(INFO,
|
|
"Write Result",
|
|
"workload",
|
|
read_ctx_.workload_,
|
|
"succ_cnt",
|
|
write_ctx_.succ_cnt_,
|
|
"fail_cnt",
|
|
write_ctx_.fail_cnt_,
|
|
"time_ms",
|
|
(end_time_ - begin_time_) / 1000,
|
|
"iops",
|
|
write_ctx_.succ_cnt_ * 1000000L / (end_time_ - begin_time_),
|
|
"rt",
|
|
write_ctx_.sum_rt_ / (write_ctx_.succ_cnt_ > 0 ? write_ctx_.succ_cnt_ : 1));
|
|
}
|
|
}
|
|
|
|
int IORunner::run_workload()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (!inited_) {
|
|
ret = OB_NOT_INIT;
|
|
COMMON_LOG(WARN, "not init", K(ret));
|
|
} else {
|
|
begin_time_ = ObTimeUtility::current_time();
|
|
if (OB_FAIL(ThreadPool::start())) {
|
|
COMMON_LOG(WARN, "fail to start threads", K(ret));
|
|
} else {
|
|
usleep(static_cast<int>(duration_ms_ * 1000));
|
|
end_time_ = ObTimeUtility::current_time();
|
|
ThreadPool::stop();
|
|
}
|
|
}
|
|
if (OB_SUCC(ret)) {
|
|
print_result();
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int IORunner::run_read()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (!inited_) {
|
|
ret = OB_NOT_INIT;
|
|
COMMON_LOG(WARN, "not init", K(ret));
|
|
} else {
|
|
ObIOInfo info;
|
|
ObIOHandle handle[read_ctx_.workload_.depth_];
|
|
const int64_t begin_time = ObTimeUtility::current_time();
|
|
for (int32_t i = 0; OB_SUCC(ret) && i < read_ctx_.workload_.depth_; ++i) {
|
|
if (OB_FAIL(read_ctx_.generator_->get_io_info(nullptr, info))) {
|
|
COMMON_LOG(WARN, "fail to get read io info", K(ret), K(i));
|
|
} else if (OB_FAIL(ObIOManager::get_instance().aio_read(info, handle[i]))) {
|
|
COMMON_LOG(WARN, "fail to aio read", K(ret), K(i));
|
|
}
|
|
}
|
|
for (int32_t i = 0; OB_SUCC(ret) && i < read_ctx_.workload_.depth_; ++i) {
|
|
if (OB_FAIL(handle[i].wait())) {
|
|
COMMON_LOG(WARN, "fail to wait read io", K(ret), K(i));
|
|
ATOMIC_INC(&read_ctx_.fail_cnt_);
|
|
} else {
|
|
ATOMIC_INC(&read_ctx_.succ_cnt_);
|
|
}
|
|
int64_t delay = handle[i].get_rt();
|
|
if (delay <= 0) {
|
|
delay = ObTimeUtility::current_time() - begin_time;
|
|
}
|
|
read_ctx_.sum_rt_ += delay;
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
int IORunner::run_write(const char* data)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (!inited_) {
|
|
ret = OB_NOT_INIT;
|
|
COMMON_LOG(WARN, "not init", K(ret));
|
|
} else if (OB_ISNULL(data)) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
COMMON_LOG(WARN, "data is null", K(ret), K(data));
|
|
} else {
|
|
ObIOInfo info;
|
|
ObIOHandle handle[write_ctx_.workload_.depth_];
|
|
const int64_t begin_time = ObTimeUtility::current_time();
|
|
for (int32_t i = 0; OB_SUCC(ret) && i < write_ctx_.workload_.depth_; ++i) {
|
|
if (OB_FAIL(write_ctx_.generator_->get_io_info(data, info))) {
|
|
COMMON_LOG(WARN, "fail to get read io info", K(ret), K(i));
|
|
} else if (OB_FAIL(ObIOManager::get_instance().aio_write(info, handle[i]))) {
|
|
COMMON_LOG(WARN, "fail to aio read", K(ret), K(i));
|
|
}
|
|
}
|
|
for (int32_t i = 0; OB_SUCC(ret) && i < write_ctx_.workload_.depth_; ++i) {
|
|
if (OB_FAIL(handle[i].wait())) {
|
|
COMMON_LOG(WARN, "fail to wait read io", K(ret), K(i));
|
|
ATOMIC_INC(&write_ctx_.fail_cnt_);
|
|
} else {
|
|
ATOMIC_INC(&write_ctx_.succ_cnt_);
|
|
}
|
|
int64_t delay = handle[i].get_rt();
|
|
if (delay <= 0) {
|
|
delay = ObTimeUtility::current_time() - begin_time;
|
|
}
|
|
write_ctx_.sum_rt_ += delay;
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
inline int64_t IORunner::get_wait_interval(const int64_t iops, const int64_t thread_cnt, const int32_t io_depth)
|
|
{
|
|
const int64_t MILLION = 1000L * 1000L;
|
|
const int64_t interval = thread_cnt * io_depth * MILLION / iops;
|
|
return interval;
|
|
}
|
|
|
|
/**
|
|
* ---------------------------------------- SingleIOInfoGenerator --------------------------------
|
|
*/
|
|
int SingleIOInfoGenerator::init(
|
|
const ObIOMode mode, const IOWorkload& workload, const ObDiskFd& fd, const int64_t file_size)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (inited_) {
|
|
ret = OB_INIT_TWICE;
|
|
COMMON_LOG(WARN, "init twice", K(ret));
|
|
} else if (mode >= IO_MODE_MAX || !workload.is_valid() || !fd.is_valid() || file_size <= 0 ||
|
|
(IO_MODE_WRITE == mode && workload.size_ % DIO_READ_ALIGN_SIZE != 0)) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
COMMON_LOG(WARN, "invalid argument", K(ret), K(mode), K(workload), K(fd), K(file_size));
|
|
} else {
|
|
mode_ = mode;
|
|
io_category_ = workload.category_;
|
|
io_size_ = workload.size_;
|
|
is_sequence_ = workload.is_sequence_;
|
|
fd_ = fd;
|
|
file_size_ = file_size;
|
|
inited_ = true;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int SingleIOInfoGenerator::get_io_info(const char* buf, ObIOInfo& info)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (!inited_) {
|
|
ret = OB_NOT_INIT;
|
|
COMMON_LOG(WARN, "not init", K(ret));
|
|
} else {
|
|
ObIOPoint& io_point = info.io_points_[0];
|
|
info.io_desc_.category_ = io_category_;
|
|
info.io_desc_.mode_ = mode_;
|
|
io_point.fd_ = fd_;
|
|
io_point.size_ = io_size_;
|
|
io_point.write_buf_ = buf;
|
|
info.size_ = io_point.size_;
|
|
info.batch_count_ = 1;
|
|
if (IO_MODE_READ == mode_) {
|
|
info.io_desc_.wait_event_no_ = ObWaitEventIds::DB_FILE_DATA_READ;
|
|
} else {
|
|
info.io_desc_.wait_event_no_ = ObWaitEventIds::DB_FILE_COMPACT_WRITE;
|
|
}
|
|
|
|
if (is_sequence_) {
|
|
last_offset_ += io_size_;
|
|
io_point.offset_ = last_offset_;
|
|
} else {
|
|
int64_t rand_offset = ObRandom::rand(0, file_size_ - io_size_);
|
|
if (IO_MODE_READ == mode_) {
|
|
io_point.offset_ = rand_offset;
|
|
} else {
|
|
io_point.offset_ = lower_align(rand_offset, DIO_READ_ALIGN_SIZE);
|
|
}
|
|
}
|
|
info.offset_ = info.io_points_[0].offset_ % common::DEFAULT_MACRO_BLOCK_SIZE;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
/**
|
|
* -------------------------------- MultiIOInfoGenerator ---------------------------
|
|
*/
|
|
|
|
int MultiIOInfoGenerator::init(
|
|
const ObIOMode mode, const IOWorkload& workload, const ObArray<ObDiskFd>& fds, const int64_t file_size)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (inited_) {
|
|
ret = OB_INIT_TWICE;
|
|
COMMON_LOG(WARN, "init twice", K(ret));
|
|
} else if (mode >= IO_MODE_MAX || !workload.is_valid() || fds.empty() || file_size <= 0 ||
|
|
(IO_MODE_WRITE == mode && workload.size_ % DIO_READ_ALIGN_SIZE != 0)) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
COMMON_LOG(WARN, "invalid argument", K(ret), K(mode), K(workload), K(fds), K(file_size));
|
|
} else {
|
|
mode_ = mode;
|
|
io_category_ = workload.category_;
|
|
io_size_ = workload.size_;
|
|
is_sequence_ = workload.is_sequence_;
|
|
fds_ = fds;
|
|
file_size_ = file_size;
|
|
inited_ = true;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int MultiIOInfoGenerator::get_io_info(const char* buf, ObIOInfo& info)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (!inited_) {
|
|
ret = OB_NOT_INIT;
|
|
COMMON_LOG(WARN, "not init", K(ret));
|
|
} else {
|
|
const int32_t batch_count = 16;
|
|
info.io_desc_.category_ = io_category_;
|
|
info.io_desc_.mode_ = mode_;
|
|
info.size_ = io_size_;
|
|
info.batch_count_ = batch_count;
|
|
if (IO_MODE_READ == mode_) {
|
|
info.io_desc_.wait_event_no_ = ObWaitEventIds::DB_FILE_DATA_READ;
|
|
} else {
|
|
info.io_desc_.wait_event_no_ = ObWaitEventIds::DB_FILE_COMPACT_WRITE;
|
|
}
|
|
|
|
for (int i = 0; i < batch_count; ++i) {
|
|
ObIOPoint& point = info.io_points_[i];
|
|
point.fd_ = fds_[i % fds_.size()];
|
|
point.size_ = io_size_ / batch_count;
|
|
point.write_buf_ = buf + i * point.size_;
|
|
if (is_sequence_) {
|
|
last_offset_ += point.size_;
|
|
point.offset_ = last_offset_;
|
|
} else {
|
|
int64_t rand_offset = ObRandom::rand(0, file_size_ - io_size_);
|
|
if (IO_MODE_READ == mode_) {
|
|
point.offset_ = rand_offset;
|
|
} else {
|
|
point.offset_ = lower_align(rand_offset, DIO_READ_ALIGN_SIZE);
|
|
}
|
|
}
|
|
}
|
|
info.offset_ = info.io_points_[0].offset_ % common::DEFAULT_MACRO_BLOCK_SIZE;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
/**
|
|
* -------------------------------------- IOStress --------------------------------
|
|
*/
|
|
IOStress::~IOStress()
|
|
{
|
|
for (int64_t i = 0; i < runners_.size(); ++i) {
|
|
if (OB_NOT_NULL(runners_[i])) {
|
|
delete runners_[i];
|
|
runners_[i] = NULL;
|
|
}
|
|
}
|
|
}
|
|
|
|
int IOStress::init(const char* config_file_path)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
int64_t workload_cnt = 0;
|
|
int64_t file_size_gb = 0;
|
|
int64_t file_num = 0;
|
|
FILE* config_file = NULL;
|
|
if (inited_) {
|
|
ret = OB_INIT_TWICE;
|
|
COMMON_LOG(WARN, "init twice", K(ret));
|
|
} else if (OB_ISNULL(config_file_path)) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
COMMON_LOG(WARN, "invalid argument", K(ret));
|
|
} else if (OB_ISNULL(config_file = fopen(config_file_path, "rt"))) {
|
|
ret = OB_IO_ERROR;
|
|
COMMON_LOG(WARN, "fail to open config file", K(ret));
|
|
} else if (3 != fscanf(config_file,
|
|
"workload_cnt: %ld, file_size_gb: %ld, file_num: %ld\n",
|
|
&workload_cnt,
|
|
&file_size_gb,
|
|
&file_num)) {
|
|
ret = OB_IO_ERROR;
|
|
COMMON_LOG(WARN, "fail to read config file", K(ret));
|
|
} else if (workload_cnt <= 0 || file_size_gb <= 0 || file_num <= 0) {
|
|
ret = OB_INVALID_DATA;
|
|
COMMON_LOG(WARN, "invalid config", K(ret), K(workload_cnt), K(file_size_gb), K(file_num));
|
|
} else if (OB_FAIL(prepare_files(file_num, file_size_gb * 1024L * 1024L * 1024L))) {
|
|
COMMON_LOG(WARN, "fail to prepare file", K(ret), K(file_num), K(file_size_gb));
|
|
} else {
|
|
for (int64_t i = 0; OB_SUCC(ret) && i < workload_cnt; ++i) {
|
|
IORunner* runner = new IORunner;
|
|
if (OB_FAIL(load_io_runner(config_file, *runner))) {
|
|
COMMON_LOG(WARN, "fail to load runner", K(ret), K(i));
|
|
} else if (!runner->is_inited()) {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
COMMON_LOG(WARN, "runner not init", K(ret));
|
|
} else if (OB_FAIL(runners_.push_back(runner))) {
|
|
COMMON_LOG(WARN, "fail to push runner to array", K(ret));
|
|
}
|
|
} // end for loop
|
|
if (OB_SUCC(ret)) {
|
|
inited_ = true;
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
int IOStress::run()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (!inited_) {
|
|
ret = OB_NOT_INIT;
|
|
COMMON_LOG(WARN, "not init", K(ret));
|
|
} else {
|
|
for (int64_t i = 0; OB_SUCC(ret) && i < runners_.size(); ++i) {
|
|
runners_[i]->run_workload();
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int IOStress::prepare_files(const int64_t file_num, const int64_t file_size)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
const int64_t MAX_FILE_PATH_LENGTH = 256;
|
|
if (file_ready_) {
|
|
ret = OB_INIT_TWICE;
|
|
COMMON_LOG(WARN, "file is already ready", K(ret));
|
|
} else if (file_num <= 0 || file_size <= 0) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
COMMON_LOG(WARN, "invalid argument", K(file_num), K(file_size));
|
|
} else {
|
|
file_size_ = file_size;
|
|
for (int64_t i = 0; OB_SUCC(ret) && i < file_num; ++i) {
|
|
ObDiskFd fd;
|
|
char file_path[MAX_FILE_PATH_LENGTH] = {0};
|
|
snprintf(file_path, sizeof(file_path) - 1, "./io_stress_file_%ld", i + 1);
|
|
fd.disk_id_.disk_idx_ = i;
|
|
fd.disk_id_.install_seq_ = 0;
|
|
if (OB_FAIL(prepare_one_file(file_path, file_size_, fd.fd_))) {
|
|
COMMON_LOG(WARN, "fail to prepare one file", K(ret), K(file_path), K(file_size_));
|
|
} else if (fd.fd_ < 0) {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
COMMON_LOG(WARN, "fd is invalid", K(fd));
|
|
} else if (OB_FAIL(ObIOManager::get_instance().add_disk(fd))) {
|
|
COMMON_LOG(WARN, "fail to add disk", K(ret));
|
|
} else if (OB_FAIL(fds_.push_back(fd))) {
|
|
COMMON_LOG(WARN, "fail to push fd to array", K(ret), K(fd));
|
|
}
|
|
}
|
|
if (OB_SUCC(ret) && fds_.size() == file_num) {
|
|
file_ready_ = true;
|
|
}
|
|
}
|
|
if (!file_ready_) {
|
|
destroy_files();
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
void IOStress::destroy_files()
|
|
{
|
|
for (int64_t i = 0; i < fds_.size(); ++i) {
|
|
if (::close(fds_[i].fd_) < 0) {
|
|
COMMON_LOG(ERROR, "fail to close file", "fd", fds_[i]);
|
|
}
|
|
}
|
|
fds_.reset();
|
|
file_size_ = 0;
|
|
file_ready_ = false;
|
|
}
|
|
|
|
int IOStress::prepare_one_file(const char* file_path, const int64_t file_size, int& fd)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
fd = -1;
|
|
// open existing file
|
|
bool is_exist = false;
|
|
if (OB_FAIL(FileDirectoryUtils::is_exists(file_path, is_exist))) {
|
|
COMMON_LOG(WARN, "fail to decide if file exist", K(ret), K(file_path));
|
|
} else if (is_exist) { // file exist
|
|
int64_t exist_file_size = 0;
|
|
if (OB_FAIL(FileDirectoryUtils::get_file_size(file_path, exist_file_size))) {
|
|
COMMON_LOG(WARN, "fail to get existing file size", K(ret));
|
|
} else if (file_size == exist_file_size) {
|
|
fd = ::open(file_path, O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
|
|
if (fd < 0) {
|
|
ret = OB_IO_ERROR;
|
|
COMMON_LOG(WARN, "fail to open exist file", K(ret), K(file_path));
|
|
}
|
|
}
|
|
}
|
|
|
|
// create file if not exist
|
|
if (OB_SUCC(ret) && fd < 0) {
|
|
if ((fd = ::open(file_path, O_CREAT | O_TRUNC | O_DIRECT | O_RDWR, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) < 0) {
|
|
ret = OB_IO_ERROR;
|
|
COMMON_LOG(WARN, "fail to create file", K(ret), K(file_path));
|
|
} else if (fallocate(fd, 0, 0, file_size) < 0) {
|
|
ret = OB_IO_ERROR;
|
|
COMMON_LOG(WARN, "fail to allocate file", K(ret), K(fd), K(file_path), K(file_size));
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int IOStress::load_io_runner(FILE* config_file, IORunner& runner)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
IORunContext read_ctx;
|
|
IORunContext write_ctx;
|
|
int64_t duration_ms = 0;
|
|
int multi_disk = -1;
|
|
int read_category = -1;
|
|
int read_is_sequence = -1;
|
|
int write_category = -1;
|
|
int write_is_sequence = -1;
|
|
if (!file_ready_) {
|
|
ret = OB_NOT_INIT;
|
|
COMMON_LOG(WARN, "file not ready", K(ret));
|
|
} else if (OB_ISNULL(config_file)) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
COMMON_LOG(WARN, "invalid argument", K(ret), KP(config_file));
|
|
} else if (2 != fscanf(config_file, "duration_ms: %ld, multi_disk: %d\n", &duration_ms, &multi_disk)) {
|
|
ret = OB_IO_ERROR;
|
|
COMMON_LOG(WARN, "fail to read config file", K(ret));
|
|
} else if (6 != fscanf(config_file,
|
|
"Read: thread_cnt: %ld, category: %d, size_kb: %d, iops: %ld, depth: %d, is_sequence: %d\n",
|
|
&read_ctx.thread_cnt_,
|
|
&read_category,
|
|
&read_ctx.workload_.size_,
|
|
&read_ctx.workload_.iops_,
|
|
&read_ctx.workload_.depth_,
|
|
&read_is_sequence)) {
|
|
ret = OB_IO_ERROR;
|
|
COMMON_LOG(WARN, "fail to read config file", K(ret));
|
|
} else if (6 != fscanf(config_file,
|
|
"Write: thread_cnt: %ld, category: %d, size_kb: %d, iops: %ld, depth: %d, is_sequence: %d\n",
|
|
&write_ctx.thread_cnt_,
|
|
&write_category,
|
|
&write_ctx.workload_.size_,
|
|
&write_ctx.workload_.iops_,
|
|
&write_ctx.workload_.depth_,
|
|
&write_is_sequence)) {
|
|
ret = OB_IO_ERROR;
|
|
COMMON_LOG(WARN, "fail to read config file", K(ret));
|
|
} else if (read_category < 0 || read_is_sequence < 0 || write_category < 0 || write_is_sequence < 0) {
|
|
ret = OB_INVALID_DATA;
|
|
COMMON_LOG(
|
|
WARN, "invalid data", K(ret), K(read_category), K(read_is_sequence), K(write_category), K(write_is_sequence));
|
|
} else {
|
|
read_ctx.workload_.category_ = static_cast<ObIOCategory>(read_category);
|
|
read_ctx.workload_.is_sequence_ = static_cast<bool>(read_is_sequence);
|
|
read_ctx.workload_.size_ *= 1024;
|
|
write_ctx.workload_.category_ = static_cast<ObIOCategory>(write_category);
|
|
write_ctx.workload_.is_sequence_ = static_cast<bool>(write_is_sequence);
|
|
write_ctx.workload_.size_ *= 1024;
|
|
if (duration_ms <= 0 || multi_disk < 0 || read_ctx.thread_cnt_ < 0 || write_ctx.thread_cnt_ < 0 ||
|
|
(read_ctx.thread_cnt_ + write_ctx.thread_cnt_ <= 0) || !read_ctx.workload_.is_valid() ||
|
|
!write_ctx.workload_.is_valid() || write_ctx.workload_.size_ % DIO_READ_ALIGN_SIZE != 0) {
|
|
ret = OB_INVALID_DATA;
|
|
COMMON_LOG(WARN, "invalid config", K(ret), K(duration_ms), K(read_ctx), K(write_ctx));
|
|
} else if (OB_FAIL(get_info_generator(IO_MODE_READ, multi_disk, read_ctx.workload_, read_ctx.generator_))) {
|
|
COMMON_LOG(WARN, "fail to get read generator", K(ret), K(multi_disk), K(read_ctx.workload_));
|
|
} else if (OB_FAIL(get_info_generator(IO_MODE_WRITE, multi_disk, write_ctx.workload_, write_ctx.generator_))) {
|
|
COMMON_LOG(WARN, "fail to get write generator", K(ret), K(multi_disk), K(write_ctx.workload_));
|
|
} else if (OB_FAIL(runner.init(read_ctx, write_ctx, duration_ms))) {
|
|
COMMON_LOG(WARN, "fail to init io runner", K(ret), K(read_ctx), K(write_ctx));
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int IOStress::get_info_generator(
|
|
const ObIOMode mode, const bool multi_disk, const IOWorkload& workload, IOInfoGenerator*& generator)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (!file_ready_) {
|
|
ret = OB_NOT_INIT;
|
|
COMMON_LOG(WARN, "file not ready", K(ret));
|
|
} else if (mode >= IO_MODE_MAX || !workload.is_valid()) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
COMMON_LOG(WARN, "invalid argument", K(ret), K(mode), K(workload));
|
|
} else if (multi_disk) {
|
|
MultiIOInfoGenerator* multi_gen = new MultiIOInfoGenerator();
|
|
if (OB_ISNULL(multi_gen)) {
|
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
|
COMMON_LOG(WARN, "fail to allocate memory", K(ret));
|
|
} else if (OB_FAIL(multi_gen->init(mode, workload, fds_, file_size_))) {
|
|
COMMON_LOG(WARN, "fail to init MultiIOInfoGenerator", K(ret));
|
|
} else {
|
|
generator = multi_gen;
|
|
}
|
|
} else {
|
|
SingleIOInfoGenerator* single_gen = new SingleIOInfoGenerator();
|
|
if (OB_ISNULL(single_gen)) {
|
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
|
COMMON_LOG(WARN, "fail to allocate memory", K(ret));
|
|
} else if (OB_FAIL(single_gen->init(mode, workload, fds_[0], file_size_))) {
|
|
COMMON_LOG(WARN, "fail to init SingleIOInfoGenerator", K(ret));
|
|
} else {
|
|
generator = single_gen;
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
} // end namespace common
|
|
} // end namespace oceanbase
|
|
|
|
#endif
|