impl the object storage driver quality test

This commit is contained in:
kpole 2025-01-06 05:15:39 +00:00 committed by ob-robot
parent d68ffd17d3
commit a4aa834c52
11 changed files with 3522 additions and 3 deletions

View File

@ -163,4 +163,4 @@ private:
} // common
} // oceanbase
#endif // SRC_LIBRARY_SRC_LIB_RESTORE_OB_OBJECT_STORAGE_BASE_H_
#endif // SRC_LIBRARY_SRC_LIB_RESTORE_OB_OBJECT_STORAGE_BASE_H_

View File

@ -29,7 +29,7 @@ using namespace oceanbase::common;
int init_cos_env()
{
int ret = OB_SUCCESS;
OBJECT_STORAGE_GUARD(nullptr/*storage_info*/, "OSS_GLOBAL_INIT", IO_HANDLED_SIZE_ZERO);
OBJECT_STORAGE_GUARD(nullptr/*storage_info*/, "COS_GLOBAL_INIT", IO_HANDLED_SIZE_ZERO);
return qcloud_cos::ObCosEnv::get_instance().init(ob_apr_abort_fn);
}
@ -1046,6 +1046,10 @@ int ObStorageCosWriter::write(const char *buf, const int64_t size)
} else if (NULL == buf || size < 0) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "buf is NULL or size is invalid", K(ret), KP(buf), K(size));
#ifdef ERRSIM
} else if (OB_FAIL(EventTable::EN_OBJECT_STORAGE_CHECKSUM_ERROR)) {
OB_LOG(WARN, "fake checksum error", K(ret));
#endif
} else {
qcloud_cos::CosStringBuffer bucket_name = qcloud_cos::CosStringBuffer(
handle_.get_bucket_name().ptr(), handle_.get_bucket_name().length());

View File

@ -1063,6 +1063,13 @@ static int add_content_md5(oss_request_options_t *options, const char *buf, cons
} else {
int b64_len = aos_base64_encode(md5, in_len, b64_value);
b64_value[b64_len] = '\0';
#ifdef ERRSIM
// Test checksum by deliberately modifying the md5 value
if (OB_FAIL(EventTable::EN_OBJECT_STORAGE_CHECKSUM_ERROR)) {
ret = OB_SUCCESS;
b64_value[b64_len - 1] = '\0';
}
#endif
apr_table_set(headers, OSS_CONTENT_MD5, b64_value);
}
}

View File

@ -1381,6 +1381,10 @@ int ObStorageS3Writer::write_obj_(const char *obj_name, const char *buf, const i
Aws::S3::Model::PutObjectOutcome outcome;
if (OB_FAIL(s3_client_->put_object(request, outcome))) {
OB_LOG(WARN, "failed to put s3 object", K(ret));
#ifdef ERRSIM
} else if (OB_FAIL(EventTable::EN_OBJECT_STORAGE_CHECKSUM_ERROR)) {
ret = OB_OBJECT_STORAGE_CHECKSUM_ERROR;
#endif
} else if (!outcome.IsSuccess()) {
handle_s3_outcome(outcome, ret);
OB_LOG(WARN, "failed to write object into s3",

View File

@ -525,6 +525,7 @@ GLOBAL_ERRSIM_POINT_DEF(1114, EN_INSERT_USER_RECOVER_JOB_FAILED, "");
GLOBAL_ERRSIM_POINT_DEF(1115, EN_INSERT_AUX_TENANT_RESTORE_JOB_FAILED, "");
GLOBAL_ERRSIM_POINT_DEF(1116, EN_RESTORE_CREATE_LS_FAILED, "");
GLOBAL_ERRSIM_POINT_DEF(1117, EN_ENABLE_LOG_OBJECT_STORAGE_CHECKSUM_TYPE, "");
GLOBAL_ERRSIM_POINT_DEF(1118, EN_OBJECT_STORAGE_CHECKSUM_ERROR, "");
// END OF STORAGE HA - 1101 - 2000
// sql parameterization 1170-1180

View File

@ -70,6 +70,11 @@ add_executable(ob_admin
io_device/ob_admin_test_io_device_executor.cpp
io_device/ob_admin_test_object_storage_interface.cpp
object_storage_driver_quality/ob_admin_object_storage_driver_quality.h
object_storage_driver_quality/ob_admin_object_storage_driver_quality.cpp
object_storage_driver_quality/ob_admin_object_storage_driver_quality_scene.cpp
object_storage_driver_quality/ob_admin_object_storage_driver_quality_task_handler.cpp
#trx_tool/ob_admin_trx_executor.h
#trx_tool/ob_admin_trx_executor.cpp
@ -142,6 +147,11 @@ add_executable(ob_admin
io_device/ob_admin_test_io_device_executor.cpp
io_device/ob_admin_test_object_storage_interface.cpp
object_storage_driver_quality/ob_admin_object_storage_driver_quality.h
object_storage_driver_quality/ob_admin_object_storage_driver_quality.cpp
object_storage_driver_quality/ob_admin_object_storage_driver_quality_scene.cpp
object_storage_driver_quality/ob_admin_object_storage_driver_quality_task_handler.cpp
#trx_tool/ob_admin_trx_executor.h
#trx_tool/ob_admin_trx_executor.cpp
@ -166,7 +176,8 @@ target_link_libraries(ob_admin
${ob_close_modules_static_name}
-Wl,--end-group
-static-libgcc
-static-libstdc++)
-static-libstdc++
malloc_hook)
if(ENABLE_THIN_LTO AND USE_LTO_CACHE)
add_dependencies(ob_admin observer)

View File

@ -30,6 +30,7 @@
#include "slog_tool/ob_admin_slog_executor.h"
#include "io_bench/ob_admin_io_adapter_bench.h"
#include "io_device/ob_admin_test_io_device_executor.h"
#include "object_storage_driver_quality/ob_admin_object_storage_driver_quality.h"
#include "lib/utility/ob_print_utils.h"
#ifdef OB_BUILD_SHARED_STORAGE
#include "tools/ob_admin/shared_storage_tool/ob_admin_shared_storage_tool_executor.h"
@ -162,6 +163,8 @@ int main(int argc, char *argv[])
executor = new ObAdminIOAdapterBenchmarkExecutor();
} else if (0 == strcmp("test_io_device", argv[1])) {
executor = new ObAdminTestIODeviceExecutor();
} else if (0 == strcmp("io_driver_quality", argv[1])) {
executor = new ObAdminObjectStorageDriverQualityExecutor();
} else if (0 == strncmp("-h", argv[1], 2) || 0 == strncmp("-S", argv[1], 2)) {
executor = new ObAdminServerExecutor();
} else {

View File

@ -0,0 +1,713 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_ADMIN_IO_DRIVER_QUALITY_H_
#define OB_ADMIN_IO_DRIVER_QUALITY_H_
#include <stdio.h>
#include <time.h>
#include <iostream>
#include <fstream>
#include <iomanip>
#include <sstream>
#include <thread>
#include <future>
#include <functional>
#include <atomic>
#include <condition_variable>
#include "../ob_admin_executor.h"
#include "deps/oblib/src/lib/task/ob_timer.h"
#include "deps/oblib/src/lib/ob_define.h" // OB_MAX_URI_LENGTH
#include "share/backup/ob_backup_struct.h" // OB_MAX_BACKUP_STORAGE_INFO_LENGTH
#include "share/ob_thread_mgr.h"
#include "share/backup/ob_backup_io_adapter.h"
#include "src/share/io/ob_io_manager.h"
#include "src/share/ob_device_manager.h"
#include "deps/oblib/src/lib/lock/ob_rwlock.h"
#include "deps/oblib/src/lib/allocator/ob_malloc.h"
#include "deps/oblib/src/lib/allocator/ob_vslice_alloc.h"
#include "deps/oblib/src/lib/alloc/ob_malloc_sample_struct.h"
#include "deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.h"
#include "deps/oblib/src/lib/alloc/memory_dump.h"
#include "deps/oblib/src/lib/alloc/malloc_hook.h"
namespace oceanbase
{
namespace tools
{
// color prefix in shell
const std::string NONE_COLOR_PREFIX = "\033[m";
const std::string GREEN_COLOR_PREFIX = "\033[0;32;32m";
const std::string RED_COLOR_PREFIX = "\033[0;32;31m";
const std::string LIGHT_BLUE_PREFIX = "\033[1;34m";
const std::string DARY_GRAY_PREFIX = "\033[1;30m";
// global constant
constexpr int64_t TIME_STR_LENGTH = 128;
constexpr int64_t FILE_PATH_LENGTH = common::OB_MAX_URI_LENGTH;
constexpr int64_t MAX_CONTENT_LENGTH = 128L * 1024 * 1024;
constexpr int64_t MAX_OBJECT_NAME_LENGTH = 50;
constexpr int64_t MAX_OBJECT_NAME_SUFFIX_LENGTH = 16;
constexpr int64_t MIN_OBJECT_SIZE = 0;
constexpr int64_t SMALL_OBJECT_SIZE_LIMIT = 128 * 1024; // 128 KB
constexpr int64_t NORMAL_OBJECT_SIZE_LIMIT = 2L * 1024 * 1024; // 2MB
constexpr int64_t LARGE_OBJECT_SIZE_LIMIT = 128L * 1024 * 1024; // 128MB
constexpr int64_t MEMORY_LIMITED_SIZE = 16L * 1024 * 1024 * 1024;
constexpr int64_t FINAL_OBJECT_STORAGE_MEMORY_LIMIT = 50 * 1024 * 1024; // 50MB
constexpr double EPS = 1e-6;
// The LARGE_OBJECT_SIZE_RATE = 1.0 - SMALL_OBJECT_SIZE_RATE - NORMAL_OBJECT_SIZE_RATE;
constexpr double SMALL_OBJECT_SIZE_RATE = 0.2;
constexpr double NORMAL_OBJECT_SIZE_RATE = 0.75;
static_assert(SMALL_OBJECT_SIZE_RATE + NORMAL_OBJECT_SIZE_RATE < 1, "The sum of SMALL_OBJECT_SIZE_RATE and NORMAL_OBJECT_SIZE_LIMIT should be less than 1");
// Default Scene Parameter
constexpr int64_t DEFAULT_RUN_TIME_S = 20; // the default running time of the scene
constexpr int64_t DEFAULT_INTERVAL_S = 1; // the default interval between each metric display
constexpr int64_t DEFAULT_THREAD_CNT = 16; // the dafault thread cnt for task hanlder
constexpr int64_t DEFAULT_LIMIT_RUN_TIME_S = 10; // the default duration of the restrictions in the limited scene
constexpr int64_t DEFAULT_LIMIT_MEMORY_MB = 64; // the default memory limited in the limited scene
constexpr double DEFAULT_LIMIT_CPU = 0.2; // the default cpu limited in the limited scene
constexpr int64_t DEFAULT_QUEUE_SIZE = 100; // the default task handler queue size
constexpr int64_t DEFAULT_PROB_OF_WRITING_OLD_DATA = 20;
constexpr int64_t DEFAULT_PROB_OF_PARALLEL = 10;
enum OSDQOpType
{
WRITE_SINGLE_FILE,
MULTIPART_WRITE,
APPEND_WRITE,
READ_SINGLE_FILE,
DEL_FILE,
MAX_OPERATE_TYPE // invalid type
};
// in order to show in metric summary
static const char *osdq_op_type_names[] = {
"WS",
"MW",
"AW",
"RS",
"DE"
};
static_assert((sizeof(osdq_op_type_names) / sizeof(char *)) == static_cast<int>(MAX_OPERATE_TYPE),
"the length of osdq_op_type_names should be equal to MAX_OPERATE_TYPE");
double cal_time_diff(const timeval &start, const timeval &end);
/**
* @brief Generate the contents of the file based on the object_id
*
* @param buf [out] The pointer to the contents of the file and must be requested in advance with a length of buf_len
* @param buf_len [in] The length of the buf
* @param object_id [in] The Object's ID
*/
int generate_content_by_object_id(char *buf, const int64_t buf_len, const int64_t object_id);
/**
* @brief Verify that content is correct by object_id
*
* @param buf [in] The pointer to the content
* @param buf_len [in] The length of the buf
* @param object_id [in] The Object's ID
*/
int check_content_by_object_id(const char *buf, const int64_t buf_len, const int64_t object_id);
/**
* @brief Get a random length for performing a write operation or a read operation
*
* @return content_length Indicates the content length generated
*/
int64_t get_random_content_length(const OSDQOpType op_type);
/**
* @brief Generate object_name from object_id.
* For example, if object_id is k, then the generated line will be like object_k_{suffic}, where suffic is a random string of length MAX_OBJECT_NAME_SUFFIX_LENGTH containing numbers and upper and lower case letters.
* @param object_id [in] The Object's ID
* @param object_name [out] The Object's name
* @param object_name_len [in] indicates the maximum length of object_name
*/
int construct_object_name(const int64_t object_id, char *object_name, const int64_t object_name_len);
int construct_file_path(
const char *base_uri,
const char *object_name,
char *file_path,
const int64_t file_path_length);
ObVSliceAlloc &get_vslice_alloc_instance();
extern int64_t allocator_cnt;
template <typename T>
class STLMemAllocator
{
public:
ObVSliceAlloc &allocator_;
public:
using value_type = T;
using pointer = T*;
using const_pointer = const T*;
using void_pointer = void*;
using const_void_pointer = const void*;
using size_type = size_t;
using difference_type = ptrdiff_t;
template <typename U>
struct rebind
{
using other = STLMemAllocator<U>;
};
STLMemAllocator() : allocator_(get_vslice_alloc_instance())
{}
template <typename U>
STLMemAllocator(const STLMemAllocator<U> &other) : allocator_(other.allocator_)
{}
pointer allocate(size_type n, const_void_pointer hint = 0)
{
int ret = OB_SUCCESS;
void *ptr = nullptr;
do {
ptr = allocator_.alloc(n * sizeof(T));
if (OB_ISNULL(ptr)) {
::usleep(10000); //10ms
if (TC_REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "failed to allocate memory", KR(ret), K(n));
}
}
} while (OB_ISNULL(ptr));
allocator_cnt++;
return static_cast<pointer>(ptr);
}
void deallocate(pointer p, size_type n) noexcept { allocator_.free(p); }
size_type max_size() const noexcept { return allocator_.limit(); }
template <typename U, typename... Args>
void construct(U *p, Args &&...args)
{
new(p) U(std::forward<Args>(args)...);
}
template <typename U>
void destroy(U* p) { p->~U(); }
};
class OSDQLogEntry
{
public:
OSDQLogEntry();
~OSDQLogEntry() {}
int init(const std::string &title, const std::string &color = NONE_COLOR_PREFIX);
static void print_log(const std::string &title, const std::string &content, const std::string &color = GREEN_COLOR_PREFIX);
void log_entry_kv(const std::string &key, const std::string &value, const std::string &color = NONE_COLOR_PREFIX);
void log_entry(const std::string &content, const std::string &color = NONE_COLOR_PREFIX);
void print();
void reset();
private:
static std::string get_time_prefix_();
private:
bool is_inited_;
std::string prefix_;
std::string head_holder_;
std::string content_;
};
/**
* @class OSDQTimeMap
* @brief Save all the latency for a certain operation
*/
class OSDQTimeMap
{
public:
OSDQTimeMap();
~OSDQTimeMap() {}
/**
* @brief Add the time consumed by a IO operaton to the time_map
* @param cost_time_us [in] the time consumed by a IO operaton
*/
int log_entry(const int64_t cost_time_us);
/**
* @brief print the information of the time_map
* @param map_name_str [in] title of the information
*/
int summary(const char *map_name_str, OSDQLogEntry &log) const;
/**
* @brief Just estimating the dynamic memory requested by time_map_
*/
TO_STRING_KV(K_(total_entry));
private:
int64_t total_entry_;
std::map<int64_t, int64_t, std::less<int64_t>,
STLMemAllocator<std::pair<const int64_t, int64_t>>> time_map_;
};
/**
* @class OSDQMetric
* @brief record the runtime metric, like requests statistics, memory info, cpu info, etc.
* Since the size of read and write operation can be random,
* statistics will be categorised by request size
* The following table shows the relationship between file size and attributes:
*
* File Size | Attribute
* ------------------------------------------
* file_size <= 128KB | small object
* 128 KB < file_size <= 2MB | normal object
* 2MB < file_size <= 128MB | large object
*/
class OSDQMetric
{
public:
enum ObjectSizeType
{
SMALL_OBJECT = 0,
NORMAL_OBJECT_SIZE = 1,
LARGE_OBJECT_SIZE = 2,
MAX_OJBECT_SIZE_TYPE
};
struct ReqStatisticalsInfo
{
ReqStatisticalsInfo();
int64_t total_operation_num_; // total req numbers
int64_t total_queued_num_;
double total_throughput_mb_;
double average_qps_;
double average_bw_mb_;
double real_qps_;
double real_bw_mb_;
};
struct CpuInfo
{
CpuInfo();
double cpu_usage_for_100MB_bw_;
double total_cpu_usage_;
double total_user_time_;
double total_system_time_;
double real_cpu_usage_;
};
struct MemInfo
{
MemInfo();
double start_vm_size_kb_;
double start_vm_rss_kb_;
double object_storage_hold_kb_;
double object_storage_used_kb_;
double total_hold_kb_;
double total_used_kb_;
double vm_peak_kb_; // maximum virtual memory usage
double vm_size_kb_; // current virtual memory usage
double vm_hwm_kb_; // physical maxiumu memory usage
double vm_rss_kb_; // current physical memory usage
double ob_vslice_alloc_used_memory_kb_;
int64_t ob_vslice_alloc_allocator_cnt_;
};
public:
OSDQMetric();
~OSDQMetric();
int init();
/**
* @brief add the latency of the IO operation to the corresponding latency_map
* @param op_start_time_us [in] the start time of the IO operation
* @param op_type [in] type of operation, e.g. read or write
* @param object_size [in] the operation's size
*/
int add_latency_metric(
const int64_t op_start_time_us,
const OSDQOpType op_type,
const int64_t object_size);
int add_queued_entry();
int sub_queued_entry();
int summary(const bool is_final = false);
public:
static int get_memory_usage(MemInfo &mem_info);
double get_real_cpu_usage() const { return last_cpu_info_.real_cpu_usage_; }
private:
int get_req_statistical_info_(OSDQLogEntry &log);
int get_req_latency_map_(OSDQLogEntry &log);
int get_cpu_info_(OSDQLogEntry &log);
int get_memory_info_(OSDQLogEntry &log, const bool is_final = false);
int print_csv_title_();
int print_csv_dump_();
private:
static constexpr int64_t TITLE_LEN = 64;
static constexpr int64_t PRECISION = 2;
lib::ObMutex mutex_;
bool is_inited_;
char metric_csv_path_[OB_MAX_FILE_NAME_LENGTH];
int64_t summary_cnt_;
int64_t start_real_time_us_;
int64_t last_real_time_us_;
int64_t total_operation_num_;
int64_t total_queued_num_;
double total_throughput_mb_;
ReqStatisticalsInfo last_req_statistical_info_;
struct rusage start_usage_;
struct rusage last_usage_;
CpuInfo last_cpu_info_;
MemInfo start_mem_info_;
MemInfo last_mem_info_;
OSDQTimeMap latency_maps_[OSDQOpType::MAX_OPERATE_TYPE][ObjectSizeType::MAX_OJBECT_SIZE_TYPE];
};
/**
* @class OSDQMonitor
* @brief Monitoring threads, used to print monitoring data such as qps, latency, etc. at regular intervals.
*/
class OSDQMonitor : public common::ObTimerTask
{
public:
OSDQMonitor();
virtual ~OSDQMonitor();
int init(const int64_t interval_s, OSDQMetric *metric);
int start();
void destroy();
virtual void runTimerTask() override;
private:
bool is_inited_;
bool is_started_;
OSDQMetric *metric_;
int64_t interval_us_;
int tg_id_;
};
struct OSDQParameters
{
OSDQParameters();
~OSDQParameters() {}
TO_STRING_KV(K(base_path_), K(storage_info_str_), K(scene_type_),
K(run_time_s_), K(interval_s_), K(thread_cnt_), K(resource_limited_type_),
K(limit_run_time_s_), K(limit_memory_mb_), K(limit_cpu_));
// common param
char base_path_[common::OB_MAX_URI_LENGTH];
char storage_info_str_[common::OB_MAX_BACKUP_STORAGE_INFO_LENGTH];
int64_t scene_type_;
int64_t run_time_s_;
int64_t interval_s_;
int64_t thread_cnt_;
//ResourceLimitedScene param
int64_t resource_limited_type_;
int64_t limit_run_time_s_;
int64_t limit_memory_mb_;
double limit_cpu_;
};
class OSDQScene;
/**
* @class ObAdminObjectStorageDriverQualityExecutor
* @brief The execution entry point of the programme, used for parameter parsing, to start the execution of the scenario.
*/
class ObAdminObjectStorageDriverQualityExecutor : public ObAdminExecutor
{
enum SceneType
{
HYBRID_TEST_SCENE,
RESOURCE_LIMITED_SCENE,
ERRSIM_SCENE,
MAX_SCENE
};
public:
ObAdminObjectStorageDriverQualityExecutor();
virtual ~ObAdminObjectStorageDriverQualityExecutor() {};
virtual int execute(int argc, char *argv[]) override;
private:
int parse_cmd_(int argc, char *argv[]);
int set_environment_();
int run_all_tests_();
int print_usage_();
private:
OSDQScene *create_scene_();
void free_scene_(OSDQScene *&scene);
private:
OSDQParameters params_;
OSDQMetric metric_;
OSDQMonitor monitor_;
private:
static constexpr char *HELP_FMT = const_cast<char*>("\t%-30s%-12s\n");
static constexpr int64_t N_WAY = 32;
static constexpr int64_t DEFAULT_BLOCK_SIZE = 2L * 1024 * 1024;
DISALLOW_COPY_AND_ASSIGN(ObAdminObjectStorageDriverQualityExecutor);
};
class OSDQIDGenerator
{
public:
OSDQIDGenerator() : current_id_(0) {}
int64_t get_next_id() { return current_id_.fetch_add(1) + 1; }
static OSDQIDGenerator &get_instance()
{
static OSDQIDGenerator instance;
return instance;
}
private:
std::atomic<int64_t> current_id_;
};
/**
* @class OSDQFileSet
* @brief This class maintains a set of files that have been written, along with the file path.
*/
class OSDQFileSet
{
template <typename T>
struct equal_to {
bool operator()(const T &lhs, const T &rhs) const {
return lhs == rhs;
}
};
typedef std::unordered_map<int64_t, char *, std::hash<int64_t>, equal_to<int64_t>, STLMemAllocator<std::pair<const int64_t, char *>>> FilePathMap;
typedef std::set<int64_t, std::less<const int64_t>, STLMemAllocator<int64_t>> FileSet;
public:
OSDQFileSet();
~OSDQFileSet();
int add_file(const int64_t object_id, const char *file_path);
/**
* @brief randomly select a file from the file set that have been written to
* then delete it from the file set
* @param object_id [out] the selected file's object_id
* @param file_path [out] the selected file's path
*/
int fetch_and_delete_file(int64_t &object_id, char *&file_path);
size_t size() const;
private:
lib::ObMutex mutex_;
FileSet file_set_;
FilePathMap file_path_map_;
};
struct OSDQTask
{
OSDQTask();
~OSDQTask();
bool is_valid() const;
TO_STRING_KV(K_(op_type), K_(object_id), K_(uri), K_(buf_len), K(start_time_stamp_us_), K(parallel_), K(parallel_op_type_));
OSDQOpType op_type_;
int64_t object_id_;
char *uri_;
char *buf_;
int64_t buf_len_;
int64_t start_time_stamp_us_;
bool parallel_; // indicates this task is concurrent
OSDQOpType parallel_op_type_;
};
void free_task(OSDQTask *&task);
class OSDQResourceLimitedScene;
class OSDQErrSimScene;
/**
* @class OSDQTaskHandler
* @brief Responsible for task generation and processing
*/
class OSDQTaskHandler : public lib::TGTaskHandler
{
public:
OSDQTaskHandler();
virtual ~OSDQTaskHandler();
int init(
const char *base_uri,
OSDQMetric *metric,
OSDQFileSet *file_set,
const share::ObBackupStorageInfo *storage_info);
void destroy();
int start();
void stop();
int wait();
int set_thread_cnt(const int64_t thread_cnt);
virtual void handle(void *task) override;
virtual void handle_drop(void *task) override;
int push_task(OSDQTask *task);
public:
int set_operator_weight(const OSDQOpType op_type, const int64_t op_type_weight);
int set_prob_of_writing_old_data(const int64_t prob);
int set_prob_of_parallel(const int64_t prob);
int gen_task(OSDQTask *&task);
int gen_write_task(OSDQTask *task, const OSDQOpType op_type);
int handle_write_task(const OSDQTask *task);
int gen_read_single_task(OSDQTask *task);
int handle_read_single_task(OSDQTask *task);
int gen_del_task(OSDQTask *task);
int handle_del_task(OSDQTask *task);
public:
friend class OSDQResourceLimitedScene;
friend class OSDQErrSimScene;
private:
std::packaged_task<int()> get_packaged_task_(OSDQOpType op_type, const OSDQTask *task);
int handle_read_single_task_helper_(OSDQTask *task);
int handle_write_single_task_helper_(const OSDQTask *task);
int handle_multipart_write_task_helper_(const OSDQTask *task);
int handle_append_write_task_helper_(const OSDQTask *task);
bool check_parallel_write_result_(
const OSDQOpType op_type1,
const int ret1,
const OSDQOpType op_type2,
const int ret2);
void push_req_result_(const int ret);
private:
int64_t op_type_weights_[OSDQOpType::MAX_OPERATE_TYPE];
int64_t op_type_random_boundaries_[OSDQOpType::MAX_OPERATE_TYPE];
int64_t prob_of_writing_old_data_; // the range is [0, 100), 0 indicates impossiblly write old file
int64_t prob_of_parallel_; // the probabilty of parallelly execution, generally considering only write or read operations
private:
bool is_inited_;
bool is_stopped_;
int tg_id_; // thread group id
int thread_cnt_;
const char *base_uri_;
OSDQMetric *metric_;
OSDQFileSet *file_set_;
const share::ObBackupStorageInfo *storage_info_;
ObBackupIoAdapter adapter_;
lib::ObMutex mutex_;
std::vector<int, STLMemAllocator<int>> req_results_;
};
/**
* @class OSDQScene
* @brief base class for scenarios
*/
class OSDQScene
{
public:
OSDQScene();
virtual ~OSDQScene();
virtual int init(const OSDQParameters *param, OSDQMetric *metric) = 0;
virtual bool param_valid(const OSDQParameters *param) = 0;
virtual int execute() = 0;
int loop_send_task();
int set_thread_cnt(const int64_t thread_cnt);
protected:
bool is_inited_;
int64_t run_time_s_;
OSDQMetric *metric_;
const char *base_uri_;
share::ObBackupStorageInfo storage_info_;
OSDQTaskHandler task_handler_;
OSDQFileSet file_set_;
};
/**
* @class OSDQHybridTestScene
* @brief Hybrid scene
*/
class OSDQHybridTestScene : public OSDQScene
{
public:
OSDQHybridTestScene();
virtual ~OSDQHybridTestScene() {}
virtual int init(const OSDQParameters *param, OSDQMetric *metric) override;
virtual bool param_valid(const OSDQParameters *param) override;
virtual int execute() override;
};
/**
* @class OSDQResourceLimitedScene
* @brief resource limited scene, contain:
* - 0 network packet loss resource limited
* - 1 network bandwidth limited
* - 2 memory limited
* - 3 cpu limited
*/
class OSDQResourceLimitedScene : public OSDQScene
{
enum ResourceLimitedType
{
NETWORK_PACKET_LOSS_LIMITED_TYPE,
NETWORK_BANDWIDTH_LIMITED_TYPE,
MEMORY_LIMITED_TYPE,
CPU_LIMITED_TYPE,
MAX_RESOURCE_LIMITED_TYPE
};
public:
OSDQResourceLimitedScene();
virtual ~OSDQResourceLimitedScene() {}
virtual int init(const OSDQParameters *param, OSDQMetric *metric) override;
virtual bool param_valid(const OSDQParameters *param) override;
virtual int execute() override;
private:
int test_network_packet_loss_limit_();
int test_network_bandwidth_limit_();
int test_memory_limit_();
int test_cpu_limit_();
static void inner_disrupt_network_(
const int64_t sleep_time_s,
const char *class_handle,
std::mutex &mtx,
std::condition_variable &cv,
bool &ready);
static void inner_limit_memory_(
const int64_t sleep_time_s,
const int64_t limit_memory_size_mb,
std::mutex &mtx,
std::condition_variable &cv,
bool &ready);
static void inner_limit_cpu_(
const int64_t sleep_time_s,
const double cpu_rate,
std::mutex &mtx,
std::condition_variable &cv,
bool &ready);
private:
static constexpr int64_t TASK_HANDLER_DEFAULT_THREAD_CNT = 16;
static constexpr double FINAL_CPU_USAGE_LIMIT = 5;
ResourceLimitedType resource_limited_type_;
int64_t limit_run_time_s_;
int64_t limit_memory_mb_;
double limit_cpu_;
};
class OSDQErrSimScene : public OSDQScene
{
public:
OSDQErrSimScene();
virtual ~OSDQErrSimScene() {}
virtual int init(const OSDQParameters *param, OSDQMetric *metric) override;
virtual bool param_valid(const OSDQParameters *param) override;
virtual int execute() override;
};
} // namespace tools
} // namespace oceanbase
#endif

View File

@ -0,0 +1,680 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ob_admin_object_storage_driver_quality.h"
#include "deps/oblib/src/lib/utility/ob_tracepoint.h"
using namespace oceanbase::share;
using namespace oceanbase::common;
namespace oceanbase
{
namespace tools
{
//=========================== OSDQScene ================================
OSDQScene::OSDQScene()
: is_inited_(false),
run_time_s_(0),
metric_(nullptr),
base_uri_(nullptr),
storage_info_(),
task_handler_(),
file_set_()
{
}
OSDQScene::~OSDQScene()
{
}
int OSDQScene::loop_send_task()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "scene not init", KR(ret), K(is_inited_));
} else {
const int64_t start_time_us = ObTimeUtility::current_time();
int64_t current_time_us = start_time_us;
while (current_time_us - start_time_us < run_time_s_ * 1000000
&& (ret == OB_SUCCESS || ret == OB_EAGAIN)) {
OSDQTask *task = nullptr;
if (OB_FAIL(task_handler_.gen_task(task))) {
OB_LOG(WARN, "failed generate task", KR(ret));
if (ret == OB_FILE_NOT_EXIST) {
ret = OB_EAGAIN;
}
} else {
do {
ret = task_handler_.push_task(task);
if (ret != OB_SUCCESS && ret != OB_EAGAIN) {
OB_LOG(ERROR, "deliver task failed", KR(ret), K(task->op_type_), K(task->uri_));
} else {
::usleep(10000);
}
current_time_us = ObTimeUtility::current_time();
} while (OB_EAGAIN == ret && current_time_us - start_time_us < run_time_s_ * 1000000);
}
if (OB_FAIL(ret)) {
free_task(task);
}
current_time_us = ObTimeUtility::current_time();
}
if (ret == OB_EAGAIN) {
ret = OB_SUCCESS;
}
}
return ret;
}
int OSDQScene::set_thread_cnt(const int64_t thread_cnt)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "not init", KR(ret), K(is_inited_));
} else if (OB_UNLIKELY(thread_cnt <= 0 || thread_cnt > 128)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret));
} else if (OB_FAIL(task_handler_.set_thread_cnt(thread_cnt))) {
OB_LOG(WARN, "failed set task handler's thread_cnt", KR(ret), K(thread_cnt));
}
return ret;
}
//=========================== OSDQTask ========================================
OSDQTask::OSDQTask()
: op_type_(OSDQOpType::MAX_OPERATE_TYPE),
object_id_(-1),
uri_(nullptr),
buf_(nullptr),
buf_len_(-1),
start_time_stamp_us_(0),
parallel_(false),
parallel_op_type_(OSDQOpType::MAX_OPERATE_TYPE)
{}
OSDQTask::~OSDQTask()
{
if (uri_ != nullptr) {
free(uri_);
uri_ = nullptr;
}
if (buf_ != nullptr) {
free(buf_);
buf_ = nullptr;
}
}
bool OSDQTask::is_valid() const
{
bool valid = true;
if (OB_UNLIKELY(op_type_ == MAX_OPERATE_TYPE) || OB_ISNULL(uri_)) {
valid = false;
} else if (op_type_ == WRITE_SINGLE_FILE || op_type_ == APPEND_WRITE || op_type_ == MULTIPART_WRITE) {
if (OB_UNLIKELY(object_id_ <= 0)
|| OB_UNLIKELY(buf_len_ > 0 && buf_ == nullptr)
|| OB_UNLIKELY(buf_len_ < 0)) {
valid = false;
} else if (op_type_ == APPEND_WRITE && OB_UNLIKELY(buf_len_ < 4)) {
valid = false;
}
} else if (op_type_ == READ_SINGLE_FILE) {
if (OB_UNLIKELY(object_id_ <= 0)) {
valid = false;
}
} else if (op_type_ == DEL_FILE) {
if (OB_UNLIKELY(object_id_ <= 0)) {
valid = false;
}
}
return valid;
}
void free_task(OSDQTask *&task)
{
if (OB_NOT_NULL(task)) {
task->~OSDQTask();
free(task);
}
task = nullptr;
}
//=========================== OSDQHybridTestScene ================================
OSDQHybridTestScene::OSDQHybridTestScene()
: OSDQScene()
{
}
int OSDQHybridTestScene::init(const OSDQParameters *param, OSDQMetric *metric)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
OB_LOG(WARN, "hybrid test scene init twice", KR(ret), K(is_inited_));
} else if (OB_UNLIKELY(!param_valid(param))) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), KPC(param), K(storage_info_));
} else if (OB_FAIL(task_handler_.init(param->base_path_, metric, &file_set_, &storage_info_))) {
OB_LOG(WARN, "failed init task handler", KR(ret), KPC(param), K(storage_info_));
} else {
task_handler_.set_operator_weight(WRITE_SINGLE_FILE, 1);
task_handler_.set_operator_weight(MULTIPART_WRITE, 1);
task_handler_.set_operator_weight(APPEND_WRITE, 1);
task_handler_.set_operator_weight(READ_SINGLE_FILE, 3);
task_handler_.set_operator_weight(DEL_FILE, 1);
run_time_s_ = param->run_time_s_;
metric_ = metric;
base_uri_ = param->base_path_;
is_inited_ = true;
}
return ret;
}
bool OSDQHybridTestScene::param_valid(const OSDQParameters *param) {
bool is_valid = true;
int ret = OB_SUCCESS;
if (OB_ISNULL(param)) {
is_valid = false;
} else if (OB_FAIL(storage_info_.set(param->base_path_, param->storage_info_str_))) {
is_valid = false;
OB_LOG(WARN, "failed to set storage info", KR(ret), KPC(param));
} else if (OB_UNLIKELY(!storage_info_.is_valid() || param->run_time_s_ <= 0)) {
is_valid = false;
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), K(storage_info_), KPC(param));
}
return is_valid;
}
int OSDQHybridTestScene::execute()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "not init", KR(ret));
} else if (OB_FAIL(task_handler_.start())) {
OB_LOG(WARN, "failed start task handler", KR(ret));
} else if (OB_FAIL(loop_send_task())) {
OB_LOG(WARN, "failed exec loop_send_task", KR(ret));
}
task_handler_.destroy();
return ret;
}
//==================== OSDQResourceLimitedScene ====================
OSDQResourceLimitedScene::OSDQResourceLimitedScene()
: OSDQScene(),
resource_limited_type_(MAX_RESOURCE_LIMITED_TYPE),
limit_run_time_s_(0),
limit_memory_mb_(0),
limit_cpu_(0)
{}
int OSDQResourceLimitedScene::init(const OSDQParameters *param, OSDQMetric *metric)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
OB_LOG(WARN, "resource limited scene init twice", KR(ret), K(is_inited_));
} else if (OB_UNLIKELY(!param_valid(param))) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), KPC(param), K(metric));
} else if (OB_FAIL(task_handler_.init(param->base_path_, metric, &file_set_, &storage_info_))) {
OB_LOG(WARN, "failed init task handler", KR(ret));
} else if (OB_FAIL(task_handler_.set_thread_cnt(TASK_HANDLER_DEFAULT_THREAD_CNT))) {
OB_LOG(WARN, "failed set task handler thread cnt", KR(ret));
} else {
task_handler_.set_operator_weight(OSDQOpType::DEL_FILE, 0);
task_handler_.set_prob_of_parallel(0);
task_handler_.set_prob_of_writing_old_data(0);
base_uri_ = param->base_path_;
run_time_s_ = param->run_time_s_;
metric_ = metric;
resource_limited_type_ = static_cast<ResourceLimitedType>(param->resource_limited_type_);
limit_run_time_s_ = param->limit_run_time_s_;
limit_memory_mb_ = param->limit_memory_mb_;
limit_cpu_ = param->limit_cpu_;
is_inited_ = true;
}
return ret;
}
bool OSDQResourceLimitedScene::param_valid(const OSDQParameters *param) {
bool is_valid = true;
int ret = OB_SUCCESS;
if (OB_ISNULL(param)) {
is_valid = false;
} else if (OB_FAIL(storage_info_.set(param->base_path_, param->storage_info_str_))) {
is_valid = false;
OB_LOG(WARN, "failed to set storage info", KR(ret), KPC(param));
} else if (OB_UNLIKELY(!storage_info_.is_valid() || param->run_time_s_ <= 0
|| param->resource_limited_type_ < 0
|| param->resource_limited_type_ >= MAX_RESOURCE_LIMITED_TYPE
|| param->limit_run_time_s_ <= 0)) {
is_valid = false;
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), K(storage_info_), KPC(param));
} else if (param->resource_limited_type_ == MEMORY_LIMITED_TYPE) {
// setting this to a smaller value has been tested to cause the program to break
if (OB_UNLIKELY(param->limit_memory_mb_ < 64)) {
is_valid = false;
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), K(param->limit_memory_mb_));
}
} else if (param->resource_limited_type_ == CPU_LIMITED_TYPE) {
if (OB_UNLIKELY(param->limit_cpu_ > 1 || param->limit_cpu_ < 0
|| param->limit_run_time_s_ < param->run_time_s_ + 5)) {
is_valid = false;
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), K(param->limit_cpu_));
OSDQLogEntry::print_log("INVALID ARGUMENT", "the resource limit run time should be at least 5s larger than run time when limited type is cpu limit", RED_COLOR_PREFIX);
}
}
return is_valid;
}
int OSDQResourceLimitedScene::execute()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "not init", KR(ret));
} else if (OB_FAIL(task_handler_.start())) {
OB_LOG(ERROR, "failed start task hanlder", KR(ret));
} else {
switch (resource_limited_type_) {
case NETWORK_PACKET_LOSS_LIMITED_TYPE:
ret = test_network_packet_loss_limit_();
break;
case NETWORK_BANDWIDTH_LIMITED_TYPE:
ret = test_network_bandwidth_limit_();
break;
case MEMORY_LIMITED_TYPE:
ret = test_memory_limit_();
break;
case CPU_LIMITED_TYPE:
ret = test_cpu_limit_();
break;
case MAX_RESOURCE_LIMITED_TYPE:
ret = OB_ERR_UNEXPECTED;
break;
default:
ret = OB_ERR_UNEXPECTED;
break;
}
if (OB_FAIL(ret)) {
OB_LOG(WARN, "failed exec resource limited test", KR(ret), K(resource_limited_type_));
}
task_handler_.destroy();
}
return ret;
}
void OSDQResourceLimitedScene::inner_disrupt_network_(
const int64_t sleep_time_s,
const char *class_handle,
std::mutex &mtx,
std::condition_variable &cv,
bool &ready)
{
int ret = OB_SUCCESS;
/*
* 1: root qdisc
* / \
* 1:1 1:2 child class
* | |
* 10: 20: qdisc
*/
system("tc qdisc add dev eth0 root handle 1: htb default 30");
system("tc class add dev eth0 parent 1: classid 1:1 htb rate 10000mbit");
system("tc class add dev eth0 parent 1: classid 1:2 htb rate 10mbit");
system("tc filter add dev eth0 parent 1: protocol all prio 1 handle 1: cgroup");
system("tc qdisc add dev eth0 parent 1:1 handle 10: netem loss 100%");
system("tc qdisc add dev eth0 parent 1:2 handle 20: sfq perturb 10");
system("cgcreate -g net_cls:/ob_admin_osdq_cgroup");
std::string bind_str = std::string("echo ") + std::string(class_handle) +
" | tee /sys/fs/cgroup/net_cls/ob_admin_osdq_cgroup/net_cls.classid > /dev/null";
system(bind_str.c_str());
const int pid = getpid();
std::string cmd = std::string("cgclassify -g net_cls:ob_admin_osdq_cgroup ") + std::to_string(pid);
assert(system(cmd.c_str()) == 0);
{
std::unique_lock<std::mutex> lock(mtx);
ready = true;
}
cv.notify_all();
OSDQLogEntry log;
if (OB_FAIL(log.init("DISRUPT NETWORK"))) {
OB_LOG(WARN, "failed init log", KR(ret));
}
OSDQLogEntry::print_log("DISRUPT_NETWORK", "THE NETWORK RESTRICTED ENVIRONMENT IS READY");
OSDQLogEntry::print_log("DISRUPT_NETWORK",
std::string("bind qdisc ") + std::string(class_handle) + std::string(" success"));
std::this_thread::sleep_for(std::chrono::seconds(sleep_time_s));
system("echo | tee /sys/fs/cgroup/net_cls/ob_admin_osdq_cgroup/net_cls.classid");
OSDQLogEntry::print_log("DISRUPT_NETWORK", "NETWORK DISRUPT IS FINISHED");
system("tc qdisc del dev eth0 root");
}
int OSDQResourceLimitedScene::test_network_packet_loss_limit_()
{
int ret = OB_SUCCESS;
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
OSDQLogEntry::print_log("NETWORK TEST1", "Packet loss 100% Test started");
std::thread t(inner_disrupt_network_, limit_run_time_s_, "0x00010001",
std::ref(mtx), std::ref(cv), std::ref(ready));
std::unique_lock<std::mutex> lock(mtx);
while (!ready) {
cv.wait(lock);
}
ret = loop_send_task();
t.join();
task_handler_.stop();
if (OB_SUCC(ret)) {
for (auto req_ret : task_handler_.req_results_) {
if (req_ret != OB_SUCCESS
&& req_ret != OB_OBJECT_STORAGE_IO_ERROR
&& req_ret != OB_IO_TIMEOUT
&& req_ret != OB_TIMEOUT) {
ret = req_ret;
break;
}
}
}
return ret;
}
int OSDQResourceLimitedScene::test_network_bandwidth_limit_()
{
int ret = OB_SUCCESS;
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
OSDQLogEntry::print_log("NETWORK TEST2", "BandWidth Limit Test Started");
std::thread t(inner_disrupt_network_, limit_run_time_s_, "0x00010002",
std::ref(mtx), std::ref(cv), std::ref(ready));
std::unique_lock<std::mutex> lock(mtx);
while (!ready) {
cv.wait(lock);
}
ret = loop_send_task();
t.join();
task_handler_.stop();
if (OB_SUCC(ret)) {
for (auto req_ret : task_handler_.req_results_) {
if (req_ret != OB_SUCCESS && req_ret != OB_IO_TIMEOUT) {
ret = req_ret;
break;
}
}
}
return ret;
}
void OSDQResourceLimitedScene::inner_limit_memory_(
const int64_t sleep_time_s,
const int64_t limit_memory_size_mb,
std::mutex &mtx,
std::condition_variable &cv,
bool &ready)
{
int ret = OB_SUCCESS;
ObRefHolder<ObTenantIOManager> tenant_holder;
const int64_t limit_memory_size = limit_memory_size_mb * 1024 * 1024;
if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(OB_SERVER_TENANT_ID, tenant_holder))) {
OB_LOG(WARN, "failed get tenant io manager", KR(ret));
} else if (OB_FAIL(tenant_holder.get_ptr()->update_memory_pool(limit_memory_size))) {
OB_LOG(WARN, "faield to update memory limit", KR(ret));
OSDQLogEntry::print_log("LIMIT MEMORY", "failed to update memory limit", RED_COLOR_PREFIX);
} else if (OB_FAIL(ObMallocAllocator::get_instance()->set_tenant_limit(OB_SERVER_TENANT_ID, limit_memory_size))) {
OB_LOG(ERROR, "failed set tenant memory limit", KR(ret), K(limit_memory_size));
} else {
lib::set_memory_limit(limit_memory_size);
// When the tenant is out of memory, it will raise(49), and ob_admin doesn't do the
// initialisation that observer does, so it will just quit, so in order to make sure
// that ob_admin runs correctly, need to ignore 49.
signal(49, SIG_IGN);
{
std::unique_lock<std::mutex> lock(mtx);
ready = true;
}
cv.notify_all();
OSDQLogEntry log;
if (OB_FAIL(log.init("LIMIT MEMORY"))) {
OB_LOG(ERROR, "failed init log", KR(ret));
} else {
log.log_entry_kv("RESTRICTED TIME", std::to_string(sleep_time_s) + "s");
log.log_entry_kv("TOTAL MEMORY LIMIT", std::to_string(limit_memory_size));
log.log_entry_kv("TOTAL TENANT MEMORY LIMIT", std::to_string(limit_memory_size));
log.log_entry_kv("TOTAL TENANT IO MANAGER MEMORY LIMIT", std::to_string(limit_memory_size));
log.log_entry("THE MEMORY RESTRICTED ENVIRONMENT IS READY");
log.print();
}
ret = OB_SUCCESS;
std::this_thread::sleep_for(std::chrono::seconds(sleep_time_s));
lib::set_memory_limit(MEMORY_LIMITED_SIZE);
lib::set_tenant_memory_limit(500, MEMORY_LIMITED_SIZE);
if (OB_NOT_NULL(tenant_holder.get_ptr())) {
if (FAILEDx(tenant_holder.get_ptr()->update_memory_pool(MEMORY_LIMITED_SIZE))) {
OB_LOG(WARN, "faield to restore memory", KR(ret));
OSDQLogEntry::print_log("LIMIT MEMORY", "failed to restore memory", RED_COLOR_PREFIX);
}
}
}
if (OB_SUCC(ret)) {
OSDQLogEntry::print_log("LIMIT MEMORY", "THE MEMORY RESTRICETED ENVIRONMENT IS REMOVED", GREEN_COLOR_PREFIX);
}
}
int OSDQResourceLimitedScene::test_memory_limit_()
{
int ret = OB_SUCCESS;
OSDQLogEntry::print_log("MEMORY LIMITD TEST", "MEMORY LIMIT TEST START");
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
std::thread t(inner_limit_memory_, limit_run_time_s_, limit_memory_mb_,
std::ref(mtx), std::ref(cv), std::ref(ready));
std::unique_lock<std::mutex> lock(mtx);
while (!ready) {
cv.wait(lock);
}
ret = loop_send_task();
t.join();
task_handler_.stop();
if (OB_SUCC(ret)) {
for (auto req_ret : task_handler_.req_results_) {
if (req_ret != OB_SUCCESS
&& req_ret != OB_HASH_NOT_EXIST
&& req_ret != OB_ALLOCATE_MEMORY_FAILED) {
ret = req_ret;
break;
}
}
}
return ret;
}
void OSDQResourceLimitedScene::inner_limit_cpu_(
const int64_t sleep_time_s,
const double cpu_rate,
std::mutex &mtx,
std::condition_variable &cv,
bool &ready)
{
int ret = OB_SUCCESS;
system("cgcreate -g cpu:/ob_admin_osdq_cgroup");
const int64_t quota = static_cast<int64_t>(100000 * cpu_rate);
std::string cmd = std::string("echo ") + std::to_string(quota) + std::string(" > /sys/fs/cgroup/cpu/ob_admin_osdq_cgroup/cpu.cfs_quota_us");
system(cmd.c_str());
system("cat /sys/fs/cgroup/cpu/cpuset.mems > /sys/fs/cgroup/cpu/ob_admin_osdq_cgroup/cpuset.mems");
system("cat /sys/fs/cgroup/cpu/cpuset.cpus > /sys/fs/cgroup/cpu/ob_admin_osdq_cgroup/cpuset.cpus");
const int pid = getpid();
cmd = std::string("cgclassify -g cpu:ob_admin_osdq_cgroup ") + std::to_string(pid);
assert(system(cmd.c_str()) == 0);
OSDQLogEntry log;
if (OB_FAIL(log.init("LIMIT CPU"))) {
OB_LOG(ERROR, "failed init log", KR(ret));
} else {
log.log_entry_kv("RESTRICTED TIME", std::to_string(sleep_time_s) + "s");
log.log_entry_kv("CPU RATE", std::to_string(cpu_rate) + "%");
log.log_entry("THE MEMORY RESTRICTED ENVIRONMENT IS READY");
log.log_entry("LIMIT CPU SUCCESS");
log.print();
}
{
std::unique_lock<std::mutex> lock(mtx);
ready = true;
}
cv.notify_all();
std::this_thread::sleep_for(std::chrono::seconds(sleep_time_s));
OSDQLogEntry::print_log("LIMIT CPU", "LIMIT CPU FINISH");
system("echo 100000 > /sys/fs/cgroup/cpu/ob_admin_osdq_cgroup/cpu.cfs_quota_us");
}
int OSDQResourceLimitedScene::test_cpu_limit_()
{
int ret = OB_SUCCESS;
OSDQLogEntry::print_log("CPU LIMIT TEST", "CPU LIMIT TEST START");
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
std::thread t(inner_limit_cpu_, limit_run_time_s_, limit_cpu_,
std::ref(mtx), std::ref(cv),
std::ref(ready));
std::unique_lock<std::mutex> lock(mtx);
while (!ready) {
cv.wait(lock);
}
ret = loop_send_task();
t.join();
task_handler_.stop();
if (OB_SUCC(ret)) {
for (auto req_ret : task_handler_.req_results_) {
if (req_ret != OB_SUCCESS) {
ret = req_ret;
break;
}
}
}
if (metric_->get_real_cpu_usage() >= FINAL_CPU_USAGE_LIMIT) {
ret = OB_ERR_UNEXPECTED;
}
return ret;
}
//===================== OSDQErrSimScene ====================
OSDQErrSimScene::OSDQErrSimScene()
{}
int OSDQErrSimScene::init(const OSDQParameters *param, OSDQMetric *metric)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
OB_LOG(WARN, "ErrSim scene init twice", KR(ret), K(is_inited_));
} else if (OB_UNLIKELY(!param_valid(param))) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), KPC(param));
} else if (OB_FAIL(task_handler_.init(param->base_path_, metric, &file_set_, &storage_info_))) {
OB_LOG(WARN, "failed init task handler", KR(ret));
} else {
task_handler_.set_operator_weight(WRITE_SINGLE_FILE, 1);
task_handler_.set_operator_weight(MULTIPART_WRITE, 1);
task_handler_.set_operator_weight(APPEND_WRITE, 1);
task_handler_.set_operator_weight(READ_SINGLE_FILE, 3);
task_handler_.set_operator_weight(DEL_FILE, 1);
task_handler_.set_prob_of_parallel(0);
run_time_s_ = param->run_time_s_;
metric_ = metric;
base_uri_ = param->base_path_;
is_inited_ = true;
}
return ret;
}
bool OSDQErrSimScene::param_valid(const OSDQParameters *param)
{
bool is_valid = true;
int ret = OB_SUCCESS;
if (OB_ISNULL(param)) {
is_valid = false;
} else if (OB_FAIL(storage_info_.set(param->base_path_, param->storage_info_str_))) {
is_valid = false;
OB_LOG(WARN, "failed to set storage info", KR(ret), KPC(param));
} else if (OB_UNLIKELY(!storage_info_.is_valid() || param->run_time_s_ <= 0)) {
is_valid = false;
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), K(storage_info_), KPC(param));
}
return is_valid;
}
int OSDQErrSimScene::execute()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "not init", KR(ret));
} else if (OB_FAIL(task_handler_.start())) {
OB_LOG(WARN, "failed start task handler", KR(ret));
} else {
EventItem item;
item.trigger_freq_ = 1;
item.error_code_ = OB_OBJECT_STORAGE_CHECKSUM_ERROR;
/* if (OB_FAIL(EventTable::instance().set_event(EventTable::EN_OBJECT_STORAGE_CHECKSUM_ERROR.item_.no_, item))) { */
if (OB_FAIL(EventTable::instance().set_event("EN_OBJECT_STORAGE_CHECKSUM_ERROR", item))) {
OB_LOG(ERROR, "failed set event", KR(ret));
} else {
ret = loop_send_task();
task_handler_.stop();
if (OB_SUCC(ret)) {
for (auto req_ret : task_handler_.req_results_) {
if (req_ret != OB_OBJECT_STORAGE_CHECKSUM_ERROR) {
ret = req_ret;
break;
}
}
}
}
}
task_handler_.destroy();
return ret;
}
} // namespace tools
} // namespace oceanbase

View File

@ -0,0 +1,747 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ob_admin_object_storage_driver_quality.h"
using namespace oceanbase::share;
using namespace oceanbase::common;
namespace oceanbase
{
namespace tools
{
//=========================== OSDQTaskHandler ==================================
OSDQTaskHandler::OSDQTaskHandler()
: prob_of_writing_old_data_(DEFAULT_PROB_OF_WRITING_OLD_DATA),
prob_of_parallel_(DEFAULT_PROB_OF_PARALLEL),
is_inited_(false),
is_stopped_(true),
tg_id_(-1),
thread_cnt_(DEFAULT_THREAD_CNT),
base_uri_(nullptr),
metric_(nullptr),
file_set_(nullptr),
storage_info_(nullptr),
adapter_()
{}
OSDQTaskHandler::~OSDQTaskHandler()
{}
int OSDQTaskHandler::init(
const char *base_uri,
OSDQMetric *metric,
OSDQFileSet *file_set,
const share::ObBackupStorageInfo *storage_info)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
OB_LOG(WARN, "task handler init twice", KR(ret), K(is_inited_));
} else if (OB_ISNULL(base_uri) || OB_ISNULL(metric) || OB_ISNULL(storage_info)
|| OB_ISNULL(file_set) || OB_UNLIKELY(!storage_info->is_valid())) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), K(metric), KPC(storage_info), K(storage_info->is_valid()), K(file_set));
} else if (OB_FAIL(TG_CREATE(lib::TGDefIDs::COMMON_QUEUE_THREAD, tg_id_))) {
OB_LOG(WARN, "create task handler thread failed", KR(ret), K(tg_id_));
} else if (OB_FAIL(TG_SET_HANDLER(tg_id_, *this))) {
OB_LOG(WARN, "failed set hanlder", KR(ret), K(tg_id_));
} else if (OB_FAIL(TG_SET_THREAD_CNT(tg_id_, thread_cnt_))) {
OB_LOG(WARN, "failed set thread cnt", KR(ret), K(tg_id_), K(thread_cnt_));
} else {
TG_MGR.tgs_[tg_id_]->set_queue_size(DEFAULT_QUEUE_SIZE);
base_uri_ = base_uri;
metric_ = metric;
file_set_ = file_set;
storage_info_ = storage_info;
for (int i = 0; i < MAX_OPERATE_TYPE; i++) {
op_type_weights_[i] = 1;
}
is_inited_ = true;
}
if (OB_UNLIKELY(!is_inited_)) {
destroy();
}
return ret;
}
int OSDQTaskHandler::start()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_));
} else if (OB_FAIL(TG_START(tg_id_))) {
OB_LOG(WARN, "start thread pool faield", KR(ret));
} else {
is_stopped_ = false;
op_type_random_boundaries_[0] = op_type_weights_[0];
for (int i = 1; i < MAX_OPERATE_TYPE; i++) {
op_type_random_boundaries_[i] = op_type_random_boundaries_[i - 1] + op_type_weights_[i];
}
OB_LOG(INFO, "start OSDQTaskHandler success");
OSDQLogEntry::print_log("TASK HANDLER", "task handler has started");
}
return ret;
}
void OSDQTaskHandler::stop()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "not init", KR(ret));
} else {
is_stopped_ = true;
TG_STOP(tg_id_);
OB_LOG(INFO, "stop OSDQTaskHandler");
}
}
int OSDQTaskHandler::wait()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "not init", KR(ret));
} else {
TG_WAIT(tg_id_);
OB_LOG(INFO, "wait OSDQTaskHandler success", KR(ret), K(tg_id_));
}
return ret;
}
int OSDQTaskHandler::set_thread_cnt(const int64_t thread_cnt)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "not init", KR(ret));
} else if (OB_FAIL(TG_SET_THREAD_CNT(tg_id_, thread_cnt))) {
OB_LOG(WARN, "failed set thread cnt", KR(ret), K(tg_id_), K(thread_cnt));
} else {
thread_cnt_ = thread_cnt;
}
return ret;
}
void OSDQTaskHandler::destroy()
{
if (OB_LIKELY(tg_id_ >= 0)) {
stop();
wait();
TG_DESTROY(tg_id_);
tg_id_ = -1;
}
is_inited_ = false;
is_stopped_ = true;
metric_ = nullptr;
file_set_ = nullptr;
storage_info_ = nullptr;
OSDQLogEntry::print_log("TASK HANDLER", "task handler has destroy");
}
void OSDQTaskHandler::handle(void *task)
{
int ret = OB_SUCCESS;
OSDQTask *task_p = nullptr;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_));
} else if (OB_ISNULL(task)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), K(task));
} else if (FALSE_IT(task_p = reinterpret_cast<OSDQTask *>(task))) {
} else if (OB_UNLIKELY(!task_p->is_valid())) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), KPC(task_p));
} else {
task_p->start_time_stamp_us_ = ObTimeUtility::current_time();
if (task_p->op_type_ == WRITE_SINGLE_FILE
|| task_p->op_type_ == MULTIPART_WRITE
|| task_p->op_type_ == APPEND_WRITE) {
if (OB_FAIL(handle_write_task(task_p))) {
OB_LOG(WARN, "failed handle write task", KR(ret), KPC(task_p));
}
} else if (task_p->op_type_ == READ_SINGLE_FILE) {
if (OB_FAIL(handle_read_single_task(task_p))) {
OB_LOG(WARN, "failed handle read single task", KR(ret), KPC(task_p));
}
} else if (task_p->op_type_ == DEL_FILE) {
if (OB_FAIL(handle_del_task(task_p))) {
OB_LOG(WARN, "failed handle del task", KR(ret), KPC(task_p));
}
} else {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "invalid task op type", KR(ret), K(task_p->op_type_));
}
if (OB_FAIL(ret)) {
if (OB_NOT_NULL(task_p)) {
const double cost_time = (ObTimeUtility::current_time() - task_p->start_time_stamp_us_) / 1000000;
std::ostringstream oss;
oss << " error code: " << ret << " " << common::ob_error_name(ret) \
<< " cost_time:" << std::setprecision(3) << cost_time << "s";
OSDQLogEntry::print_log("TASK FAILED", oss.str(), RED_COLOR_PREFIX);
}
}
}
if (OB_NOT_NULL(task_p)) {
free_task(task_p);
task = nullptr;
}
push_req_result_(ret);
}
int OSDQTaskHandler::handle_write_single_task_helper_(const OSDQTask *task)
{
int ret = OB_SUCCESS;
if (OB_FAIL(adapter_.write_single_file(task->uri_, storage_info_,
task->buf_, task->buf_len_,
ObStorageIdMod::get_default_id_mod()))) {
OB_LOG(WARN, "failed write single file", KR(ret), KPC(task), KPC(storage_info_));
}
return ret;
}
int OSDQTaskHandler::handle_multipart_write_task_helper_(const OSDQTask *task)
{
int ret = OB_SUCCESS;
int64_t write_size = 0;
ObIODevice *device_handle = nullptr;
ObIOFd fd;
ObIOHandle io_handle;
const ObStorageAccessType access_type = OB_STORAGE_ACCESS_DIRECT_MULTIPART_WRITER;
if (OB_FAIL(adapter_.open_with_access_type(device_handle, fd, storage_info_, task->uri_,
access_type, ObStorageIdMod::get_default_id_mod()))) {
OB_LOG(WARN, "failed to open device with access type", KR(ret), KPC(task));
} else if (OB_FAIL(adapter_.async_upload_data(*device_handle, fd, task->buf_, 0/*offset*/,
task->buf_len_, io_handle))) {
OB_LOG(WARN, "failed to start async upload task!", KR(ret), KPC(task));
} else if (OB_FAIL(io_handle.wait())) {
OB_LOG(WARN, "failed to wait async upload data finish", KR(ret), KPC(task));
} else if (OB_FAIL(adapter_.complete(*device_handle, fd))) {
OB_LOG(WARN, "failed to complete", KR(ret), KPC(task));
}
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(adapter_.close_device_and_fd(device_handle, fd))) {
ret = COVER_SUCC(tmp_ret);
OB_LOG(WARN, "failed to close file and release device", KR(ret), KPC(task));
}
return ret;
}
int OSDQTaskHandler::handle_append_write_task_helper_(const OSDQTask *task)
{
int ret = OB_SUCCESS;
const int64_t buf_len = task->buf_len_;
const int64_t write_cnt = 4;
const int64_t write_size_once = buf_len / write_cnt;
const ObStorageAccessType access_type = OB_STORAGE_ACCESS_APPENDER;
int64_t write_size = 0;
for (int i = 0; i < write_cnt && OB_SUCC(ret); i++) {
const int64_t offset = write_size_once * i;
const bool is_can_seal = (i + 1 == write_cnt);
const int64_t expected_write_size = is_can_seal ? (buf_len - offset) : write_size_once;
if (FAILEDx(adapter_.pwrite(task->uri_, storage_info_, task->buf_ + offset,
offset, expected_write_size, access_type,
write_size, is_can_seal, ObStorageIdMod::get_default_id_mod()))) {
OB_LOG(WARN, "failed exec pwrite", KR(ret), K(i), K(offset), KPC(task));
} else if (OB_UNLIKELY(write_size != expected_write_size)) {
OB_LOG(WARN, "pwrite operation write size not as expected", KR(ret), KPC(task), K(write_size), K(expected_write_size));
}
}
return ret;
}
bool OSDQTaskHandler::check_parallel_write_result_(
const OSDQOpType op_type1,
const int ret1,
const OSDQOpType op_type2,
const int ret2)
{
bool bool_ret = true;
int ret = OB_SUCCESS;
if ((op_type1 == WRITE_SINGLE_FILE || op_type1 == MULTIPART_WRITE)
&& (op_type2 == WRITE_SINGLE_FILE || op_type2 == MULTIPART_WRITE)) {
// in this case, no operation is append write
if (OB_UNLIKELY(ret1 != OB_SUCCESS || ret2 != OB_SUCCESS)) {
bool_ret = false;
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "parallel write should success when two type is write_single_file or multipart write", KR(ret),
K(op_type1), K(ret1), K(op_type2), K(ret2));
}
} else {
// in this case, at least one operation is append write
if (storage_info_->get_type() == ObStorageType::OB_STORAGE_S3) {
if (OB_UNLIKELY(ret1 != OB_SUCCESS || ret2 != OB_SUCCESS)) {
bool_ret = false;
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "parallel append write should succeed when storage type is s3", KR(ret),
K(op_type1), K(ret1), K(op_type2), K(ret2));
}
} else if (storage_info_->get_type() == ObStorageType::OB_STORAGE_COS
|| storage_info_->get_type() == ObStorageType::OB_STORAGE_OSS) {
// if the storage type is cos or oss, the append write may fail if the append operation is performed
// after the single or multi-part write operation.
// but if two parallel operation are both append write operation, they should succeed.
if (op_type1 == APPEND_WRITE && op_type2 == APPEND_WRITE) {
if (OB_UNLIKELY(ret1 != OB_SUCCESS || ret2 != OB_SUCCESS)) {
bool_ret = false;
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "the parallel append write operation should succeed", KR(ret),
K(op_type1), K(ret1), K(op_type2), K(ret2));
}
} else {
if (OB_UNLIKELY((op_type1 != APPEND_WRITE && ret1 != OB_SUCCESS) || (op_type2 != APPEND_WRITE && ret2 != OB_SUCCESS))) {
bool_ret = false;
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "the write single operation or multi-part write should success",
KR(ret), KPC(storage_info_), K(op_type1), K(ret1), K(op_type2), K(ret2));
}
}
} else {
bool_ret = false;
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "error storage type", KR(ret), KPC(storage_info_));
}
}
return bool_ret;
}
void OSDQTaskHandler::push_req_result_(const int ret)
{
lib::ObMutexGuard guard(mutex_);
req_results_.push_back(ret);
}
void OSDQTaskHandler::handle_drop(void *task)
{
int ret = OB_SUCCESS;
OSDQTask *task_p = nullptr;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "not init", KR(ret), K(is_inited_));
} else if (OB_ISNULL(task)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), K(task));
} else if (FALSE_IT(task_p = reinterpret_cast<OSDQTask *>(task))) {
} else if (OB_FAIL(metric_->sub_queued_entry())) {
OB_LOG(WARN, "failed sub queued entry in metric", KR(ret));
} else {
free_task(task_p);
task = nullptr;
}
}
int OSDQTaskHandler::push_task(OSDQTask *task)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_));
} else if (OB_UNLIKELY(is_stopped_)) {
OB_LOG(WARN, "task handler is stopped", KR(ret), K(is_stopped_));
} else if (OB_ISNULL(task) || OB_UNLIKELY(!task->is_valid())) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), KPC(task));
} else if (OB_FAIL(TG_PUSH_TASK(tg_id_, task))) {
OB_LOG(WARN, "failed push task in task handler", KR(ret), KPC(task));
} else if (OB_FAIL(metric_->add_queued_entry())){
OB_LOG(WARN, "failed add queued entry in metric", KR(ret));
}
return ret;
}
int OSDQTaskHandler::set_operator_weight(const OSDQOpType op_type, const int64_t op_type_weight)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_stopped_)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "set op type weight is disable when task handler is running", KR(ret), K(is_stopped_));
} else if (OB_UNLIKELY(op_type == MAX_OPERATE_TYPE || op_type_weight < 0)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), K(op_type), K(op_type_weight));
} else {
op_type_weights_[op_type] = op_type_weight;
}
return ret;
}
int OSDQTaskHandler::set_prob_of_writing_old_data(const int64_t prob)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(prob < 0 || prob >= 100)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), K(prob));
} else {
prob_of_writing_old_data_ = prob;
}
return ret;
}
int OSDQTaskHandler::set_prob_of_parallel(const int64_t prob)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(prob < 0 || prob >= 100)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), K(prob));
} else {
prob_of_parallel_ = prob;
}
return ret;
}
int OSDQTaskHandler::gen_task(OSDQTask *&task)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_));
} else if (OB_ISNULL(task = static_cast<OSDQTask *>(malloc(sizeof(OSDQTask))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "failed allocate memory for OSDQTask", KR(ret));
} else if (FALSE_IT(task = new(task)OSDQTask())) {
} else {
int64_t weight_sum = op_type_random_boundaries_[MAX_OPERATE_TYPE - 1];
if (file_set_->size() == 0) {
weight_sum = op_type_random_boundaries_[APPEND_WRITE];
}
const int64_t rnd = ObRandom::rand(1, weight_sum);
if (rnd <= op_type_random_boundaries_[WRITE_SINGLE_FILE]) {
ret = gen_write_task(task, WRITE_SINGLE_FILE);
} else if (rnd <= op_type_random_boundaries_[MULTIPART_WRITE]) {
ret = gen_write_task(task, MULTIPART_WRITE);
} else if (rnd <= op_type_random_boundaries_[APPEND_WRITE]) {
ret = gen_write_task(task, APPEND_WRITE);
} else if (rnd <= op_type_random_boundaries_[READ_SINGLE_FILE]) {
ret = gen_read_single_task(task);
} else if (rnd <= op_type_random_boundaries_[DEL_FILE]) {
ret = gen_del_task(task);
} else {
ret = OB_ERR_UNEXPECTED;
OB_LOG(ERROR, "wrong rnd", KR(ret));
}
if (OB_FAIL(ret)) {
OB_LOG(WARN, "failed gen task", KR(ret), KPC(task));
}
}
if (OB_FAIL(ret)) {
push_req_result_(ret);
free_task(task);
}
return ret;
}
int OSDQTaskHandler::gen_write_task(OSDQTask *task, const OSDQOpType op_type)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "not init", KR(ret), K(is_inited_));
} else if (OB_ISNULL(task) || OB_UNLIKELY(op_type != WRITE_SINGLE_FILE
&& op_type != MULTIPART_WRITE && op_type != APPEND_WRITE)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), K(task), K(op_type));
} else {
// random choosing whether to write a new file or an old one
// 20% chance of writing old files
bool is_write_new = true;
int rnd = ObRandom::rand(0, 100);
if (rnd <= prob_of_writing_old_data_ && file_set_->size() != 0) {
is_write_new = false;
}
int64_t object_id = 0;
// when op_type is append_write, only new files can be written.
if (is_write_new || op_type == APPEND_WRITE) {
object_id = OSDQIDGenerator::get_instance().get_next_id();
char object_name[MAX_OBJECT_NAME_LENGTH] = { 0 };
if (FAILEDx(construct_object_name(object_id, object_name, MAX_OBJECT_NAME_LENGTH))) {
OB_LOG(WARN, "failed construct object name", KR(ret), K(object_id));
} else if (OB_ISNULL(task->uri_ = static_cast<char *>(malloc(OB_MAX_URI_LENGTH)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "failed allocate memory for file path", KR(ret), K(OB_MAX_URI_LENGTH));
} else if (OB_FAIL(construct_file_path(base_uri_, object_name, task->uri_, OB_MAX_URI_LENGTH))) {
OB_LOG(WARN, "failed construct file path", KR(ret), K(base_uri_), K(object_name));
}
} else {
if (FAILEDx(file_set_->fetch_and_delete_file(object_id, task->uri_))) {
OB_LOG(WARN, "failed to fetch file from file set", KR(ret), K(object_id));
}
}
if (OB_FAIL(ret)) {
} else if (FALSE_IT(task->buf_len_ = get_random_content_length(op_type))) {
} else if (FALSE_IT(task->buf_ = static_cast<char *>(malloc(task->buf_len_)))) {
} else if (task->buf_len_ > 0) {
if (OB_ISNULL(task->buf_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "failed allocate memory for task buf", KR(ret), K(task->buf_len_));
} else if (OB_FAIL(generate_content_by_object_id(task->buf_, task->buf_len_, object_id))) {
OB_LOG(WARN, "failed to generate content for task", KR(ret), K(task->buf_len_), K(object_id));
}
}
if (OB_SUCC(ret)) {
task->op_type_ = op_type;
task->object_id_ = object_id;
// random choosing whether to parallel or not
rnd = ObRandom::rand(1, 100);
if (rnd <= prob_of_parallel_) {
task->parallel_ = true;
static const OSDQOpType parallel_op_types[] = {
WRITE_SINGLE_FILE,
MULTIPART_WRITE,
APPEND_WRITE
};
int parallel_op_type_nums = sizeof(parallel_op_types) / sizeof(OSDQOpType);
// if buf_len_ is less than 4, the append task cannot be executed
if (task->buf_len_ < 4) {
parallel_op_type_nums--;
}
rnd = ObRandom::rand(0, parallel_op_type_nums - 1);
task->parallel_op_type_ = static_cast<OSDQOpType>(rnd);
}
}
}
return ret;
}
int OSDQTaskHandler::gen_read_single_task(OSDQTask *task)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_));
} else if (OB_ISNULL(task)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), K(task));
} else {
task->op_type_ = READ_SINGLE_FILE;
if (OB_FAIL(file_set_->fetch_and_delete_file(task->object_id_, task->uri_))) {
OB_LOG(WARN, "failed fetch and delete file from file set", KR(ret));
} else {
int rnd = ObRandom::rand(1, 100);
if (rnd <= prob_of_parallel_) {
task->parallel_ = true;
task->parallel_op_type_ = task->op_type_;
}
}
}
return ret;
}
int OSDQTaskHandler::gen_del_task(OSDQTask *task)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "task hanlder not init", KR(ret), K(is_inited_));
} else if (OB_ISNULL(task)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), K(task));
} else {
task->op_type_ = DEL_FILE;
if (OB_FAIL(file_set_->fetch_and_delete_file(task->object_id_, task->uri_))) {
OB_LOG(WARN, "failed fetch and delete file from file set", KR(ret));
}
}
return ret;
}
std::packaged_task<int()> OSDQTaskHandler::get_packaged_task_(OSDQOpType op_type, const OSDQTask *task)
{
if (op_type == WRITE_SINGLE_FILE) {
return std::packaged_task<int()>(std::bind(&OSDQTaskHandler::handle_write_single_task_helper_, this, task));
} else if (op_type == MULTIPART_WRITE) {
return std::packaged_task<int()>(std::bind(&OSDQTaskHandler::handle_multipart_write_task_helper_, this, task));
} else {
// the previous parameter checks ensure that this must be append write operation
return std::packaged_task<int()>(std::bind(&OSDQTaskHandler::handle_append_write_task_helper_, this, task));
}
}
int OSDQTaskHandler::handle_write_task(const OSDQTask *task)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_));
} else if (OB_ISNULL(task) || OB_UNLIKELY(!task->is_valid())) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), KPC(task));
} else {
if (!task->parallel_) {
// not parallel write
const int64_t op_start_time_us = ObTimeUtility::current_time();
if (task->op_type_ == WRITE_SINGLE_FILE
&& OB_FAIL(handle_write_single_task_helper_(task))) {
OB_LOG(WARN, "failed handle write single task", KR(ret), KPC(task));
} else if (task->op_type_ == MULTIPART_WRITE
&& OB_FAIL(handle_multipart_write_task_helper_(task))) {
OB_LOG(WARN, "failed handle multipart write task", KR(ret), KPC(task));
} else if (task->op_type_ == APPEND_WRITE
&& OB_FAIL(handle_append_write_task_helper_(task))) {
OB_LOG(WARN, "failed handle append write task", KR(ret), KPC(task));
}
if (FAILEDx(metric_->add_latency_metric(op_start_time_us,
task->op_type_, task->buf_len_))) {
OB_LOG(WARN, "failed add latency metric", KR(ret), K(op_start_time_us), KPC(task));
} else if (OB_FAIL(file_set_->add_file(task->object_id_, task->uri_))) {
OB_LOG(WARN, "failed add object id and file path in file set", KR(ret), KPC(task));
}
} else {
//parallel write
std::packaged_task<int()> task1 = get_packaged_task_(task->op_type_, task);
std::packaged_task<int()> task2 = get_packaged_task_(task->parallel_op_type_, task);
std::future<int> future1 = task1.get_future();
std::future<int> future2 = task2.get_future();
std::thread th1(std::move(task1));
std::thread th2(std::move(task2));
th1.join();
th2.join();
int ret1 = future1.get();
int ret2 = future2.get();
if (OB_UNLIKELY(!check_parallel_write_result_(task->op_type_, ret1, task->parallel_op_type_, ret2))) {
OSDQLogEntry log;
log.init("PARALLEL WRITE FAILED", RED_COLOR_PREFIX);
log.log_entry(std::string(osdq_op_type_names[task->op_type_])
+ ": " + std::to_string(ret1) + " " + std::string(ob_error_name(ret1)));
log.log_entry(std::string(osdq_op_type_names[task->parallel_op_type_])
+ ": " + std::to_string(ret2) + " " + std::string(ob_error_name(ret2)));
log.print();
} else {
OB_LOG(INFO, "parallel write succees info", KPC(task), K(ret1), K(ret2));
}
}
}
return ret;
}
int OSDQTaskHandler::handle_read_single_task(OSDQTask *task)
{
int ret = OB_SUCCESS;
const int64_t op_start_time_us = ObTimeUtility::current_time();
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_));
} else if (OB_ISNULL(task) || OB_UNLIKELY(!task->is_valid())) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), KPC(task));
} else {
if (!task->parallel_) {
if (OB_FAIL(handle_read_single_task_helper_(task))) {
OB_LOG(WARN, "failed handle read single task", KR(ret));
} else if (OB_FAIL(metric_->add_latency_metric(op_start_time_us,
task->op_type_, task->buf_len_))) {
OB_LOG(WARN, "failed add latency metric", KR(ret), K(op_start_time_us), KPC(task));
} else if (OB_FAIL(file_set_->add_file(task->object_id_, task->uri_))) {
OB_LOG(WARN, "failed add object id and file path in file set", KR(ret), KPC(task));
}
} else {
std::packaged_task<int(OSDQTask *task)> helper1(std::bind(&OSDQTaskHandler::handle_read_single_task_helper_, this, std::placeholders::_1));
std::packaged_task<int(OSDQTask *task)> helper2(std::bind(&OSDQTaskHandler::handle_read_single_task_helper_, this, std::placeholders::_1));
std::future<int> future_obj1 = helper1.get_future();
std::future<int> future_obj2 = helper2.get_future();
OSDQTask *task2 = nullptr;
if (OB_ISNULL(task2 = static_cast<OSDQTask *>(malloc(sizeof(OSDQTask))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "failed allocate memory for OSDQTask", KR(ret));
} else if (FALSE_IT(task2 = new(task2)OSDQTask())) {
} else if (OB_ISNULL(task2->uri_ = static_cast<char *>(malloc(OB_MAX_URI_LENGTH)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "failed allocate memory for OSDQTask", KR(ret));
} else {
memcpy(task2->uri_, task->uri_, OB_MAX_URI_LENGTH);
task2->op_type_ = READ_SINGLE_FILE;
task2->object_id_ = task->object_id_;
}
if (OB_FAIL(ret)) {
} else {
std::thread th1(std::move(helper1), task);
std::thread th2(std::move(helper2), task2);
th1.join();
th2.join();
int ret1 = future_obj1.get();
int ret2 = future_obj2.get();
if (OB_UNLIKELY(ret1 != OB_SUCCESS || ret2 != OB_SUCCESS)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "failed parallel read single task", KR(ret), KR(ret1), KR(ret2));
}
}
if (OB_NOT_NULL(task2)) {
free_task(task2);
}
}
}
return ret;
}
int OSDQTaskHandler::handle_read_single_task_helper_(OSDQTask *task)
{
int ret = OB_SUCCESS;
int64_t read_size = 0;
if (OB_FAIL(adapter_.adaptively_get_file_length(task->uri_, storage_info_, task->buf_len_))) {
OB_LOG(WARN, "failed get file length", KR(ret), KPC(task), KPC(storage_info_));
} else if (OB_ISNULL(task->buf_ = static_cast<char *>(malloc(task->buf_len_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "failed allocate memory for task buf", KR(ret), KPC(task));
} else if (OB_FAIL(adapter_.adaptively_read_single_file(task->uri_, storage_info_, task->buf_,
task->buf_len_, read_size, ObStorageIdMod::get_default_id_mod()))) {
OB_LOG(WARN, "failed read single file", KR(ret), KPC(task), KPC(storage_info_));
}
return ret;
}
int OSDQTaskHandler::handle_del_task(OSDQTask *task)
{
int ret = OB_SUCCESS;
const int64_t op_start_time_us = ObTimeUtility::current_time();
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_));
} else if (OB_ISNULL(task) || OB_UNLIKELY(!task->is_valid())) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", KR(ret), KPC(task));
} else if (OB_FAIL(adapter_.adaptively_del_file(task->uri_, storage_info_))) {
OB_LOG(WARN, "failed del file", KR(ret), KPC(task));
} else if (OB_FAIL(metric_->add_latency_metric(op_start_time_us, task->op_type_, 0))) {
OB_LOG(WARN, "failed add latency metric", KR(ret), K(op_start_time_us), KPC(task));
}
if (OB_FAIL(ret)) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(file_set_->add_file(task->object_id_, task->uri_))) {
OB_LOG(WARN, "failed add object id and file path in file set", KR(ret), KPC(task));
}
}
return ret;
}
} // namespace tools
} // namespace oceanbase