diff --git a/deps/oblib/src/lib/restore/ob_object_storage_base.h b/deps/oblib/src/lib/restore/ob_object_storage_base.h index ea17029af..ec91d4c96 100644 --- a/deps/oblib/src/lib/restore/ob_object_storage_base.h +++ b/deps/oblib/src/lib/restore/ob_object_storage_base.h @@ -163,4 +163,4 @@ private: } // common } // oceanbase -#endif // SRC_LIBRARY_SRC_LIB_RESTORE_OB_OBJECT_STORAGE_BASE_H_ \ No newline at end of file +#endif // SRC_LIBRARY_SRC_LIB_RESTORE_OB_OBJECT_STORAGE_BASE_H_ diff --git a/deps/oblib/src/lib/restore/ob_storage_cos_base.cpp b/deps/oblib/src/lib/restore/ob_storage_cos_base.cpp index 2f68d8d1d..1065566d8 100644 --- a/deps/oblib/src/lib/restore/ob_storage_cos_base.cpp +++ b/deps/oblib/src/lib/restore/ob_storage_cos_base.cpp @@ -29,7 +29,7 @@ using namespace oceanbase::common; int init_cos_env() { int ret = OB_SUCCESS; - OBJECT_STORAGE_GUARD(nullptr/*storage_info*/, "OSS_GLOBAL_INIT", IO_HANDLED_SIZE_ZERO); + OBJECT_STORAGE_GUARD(nullptr/*storage_info*/, "COS_GLOBAL_INIT", IO_HANDLED_SIZE_ZERO); return qcloud_cos::ObCosEnv::get_instance().init(ob_apr_abort_fn); } @@ -1046,6 +1046,10 @@ int ObStorageCosWriter::write(const char *buf, const int64_t size) } else if (NULL == buf || size < 0) { ret = OB_INVALID_ARGUMENT; OB_LOG(WARN, "buf is NULL or size is invalid", K(ret), KP(buf), K(size)); +#ifdef ERRSIM + } else if (OB_FAIL(EventTable::EN_OBJECT_STORAGE_CHECKSUM_ERROR)) { + OB_LOG(WARN, "fake checksum error", K(ret)); +#endif } else { qcloud_cos::CosStringBuffer bucket_name = qcloud_cos::CosStringBuffer( handle_.get_bucket_name().ptr(), handle_.get_bucket_name().length()); diff --git a/deps/oblib/src/lib/restore/ob_storage_oss_base.cpp b/deps/oblib/src/lib/restore/ob_storage_oss_base.cpp index 6e350bf6e..9f2824a9c 100644 --- a/deps/oblib/src/lib/restore/ob_storage_oss_base.cpp +++ b/deps/oblib/src/lib/restore/ob_storage_oss_base.cpp @@ -1063,6 +1063,13 @@ static int add_content_md5(oss_request_options_t *options, const char *buf, cons } else { int b64_len = aos_base64_encode(md5, in_len, b64_value); b64_value[b64_len] = '\0'; +#ifdef ERRSIM + // Test checksum by deliberately modifying the md5 value + if (OB_FAIL(EventTable::EN_OBJECT_STORAGE_CHECKSUM_ERROR)) { + ret = OB_SUCCESS; + b64_value[b64_len - 1] = '\0'; + } +#endif apr_table_set(headers, OSS_CONTENT_MD5, b64_value); } } diff --git a/deps/oblib/src/lib/restore/ob_storage_s3_base.cpp b/deps/oblib/src/lib/restore/ob_storage_s3_base.cpp index a1de55892..a693f5321 100644 --- a/deps/oblib/src/lib/restore/ob_storage_s3_base.cpp +++ b/deps/oblib/src/lib/restore/ob_storage_s3_base.cpp @@ -1381,6 +1381,10 @@ int ObStorageS3Writer::write_obj_(const char *obj_name, const char *buf, const i Aws::S3::Model::PutObjectOutcome outcome; if (OB_FAIL(s3_client_->put_object(request, outcome))) { OB_LOG(WARN, "failed to put s3 object", K(ret)); +#ifdef ERRSIM + } else if (OB_FAIL(EventTable::EN_OBJECT_STORAGE_CHECKSUM_ERROR)) { + ret = OB_OBJECT_STORAGE_CHECKSUM_ERROR; +#endif } else if (!outcome.IsSuccess()) { handle_s3_outcome(outcome, ret); OB_LOG(WARN, "failed to write object into s3", diff --git a/deps/oblib/src/lib/utility/ob_tracepoint_def.h b/deps/oblib/src/lib/utility/ob_tracepoint_def.h index 05069bb3e..a1081e388 100644 --- a/deps/oblib/src/lib/utility/ob_tracepoint_def.h +++ b/deps/oblib/src/lib/utility/ob_tracepoint_def.h @@ -525,6 +525,7 @@ GLOBAL_ERRSIM_POINT_DEF(1114, EN_INSERT_USER_RECOVER_JOB_FAILED, ""); GLOBAL_ERRSIM_POINT_DEF(1115, EN_INSERT_AUX_TENANT_RESTORE_JOB_FAILED, ""); GLOBAL_ERRSIM_POINT_DEF(1116, EN_RESTORE_CREATE_LS_FAILED, ""); GLOBAL_ERRSIM_POINT_DEF(1117, EN_ENABLE_LOG_OBJECT_STORAGE_CHECKSUM_TYPE, ""); +GLOBAL_ERRSIM_POINT_DEF(1118, EN_OBJECT_STORAGE_CHECKSUM_ERROR, ""); // END OF STORAGE HA - 1101 - 2000 // sql parameterization 1170-1180 diff --git a/tools/ob_admin/CMakeLists.txt b/tools/ob_admin/CMakeLists.txt index a8ebcb99d..de2b995d8 100644 --- a/tools/ob_admin/CMakeLists.txt +++ b/tools/ob_admin/CMakeLists.txt @@ -70,6 +70,11 @@ add_executable(ob_admin io_device/ob_admin_test_io_device_executor.cpp io_device/ob_admin_test_object_storage_interface.cpp + object_storage_driver_quality/ob_admin_object_storage_driver_quality.h + object_storage_driver_quality/ob_admin_object_storage_driver_quality.cpp + object_storage_driver_quality/ob_admin_object_storage_driver_quality_scene.cpp + object_storage_driver_quality/ob_admin_object_storage_driver_quality_task_handler.cpp + #trx_tool/ob_admin_trx_executor.h #trx_tool/ob_admin_trx_executor.cpp @@ -142,6 +147,11 @@ add_executable(ob_admin io_device/ob_admin_test_io_device_executor.cpp io_device/ob_admin_test_object_storage_interface.cpp + object_storage_driver_quality/ob_admin_object_storage_driver_quality.h + object_storage_driver_quality/ob_admin_object_storage_driver_quality.cpp + object_storage_driver_quality/ob_admin_object_storage_driver_quality_scene.cpp + object_storage_driver_quality/ob_admin_object_storage_driver_quality_task_handler.cpp + #trx_tool/ob_admin_trx_executor.h #trx_tool/ob_admin_trx_executor.cpp @@ -166,7 +176,8 @@ target_link_libraries(ob_admin ${ob_close_modules_static_name} -Wl,--end-group -static-libgcc - -static-libstdc++) + -static-libstdc++ + malloc_hook) if(ENABLE_THIN_LTO AND USE_LTO_CACHE) add_dependencies(ob_admin observer) diff --git a/tools/ob_admin/main.cpp b/tools/ob_admin/main.cpp index ae50a2adf..6dc540b37 100644 --- a/tools/ob_admin/main.cpp +++ b/tools/ob_admin/main.cpp @@ -30,6 +30,7 @@ #include "slog_tool/ob_admin_slog_executor.h" #include "io_bench/ob_admin_io_adapter_bench.h" #include "io_device/ob_admin_test_io_device_executor.h" +#include "object_storage_driver_quality/ob_admin_object_storage_driver_quality.h" #include "lib/utility/ob_print_utils.h" #ifdef OB_BUILD_SHARED_STORAGE #include "tools/ob_admin/shared_storage_tool/ob_admin_shared_storage_tool_executor.h" @@ -162,6 +163,8 @@ int main(int argc, char *argv[]) executor = new ObAdminIOAdapterBenchmarkExecutor(); } else if (0 == strcmp("test_io_device", argv[1])) { executor = new ObAdminTestIODeviceExecutor(); + } else if (0 == strcmp("io_driver_quality", argv[1])) { + executor = new ObAdminObjectStorageDriverQualityExecutor(); } else if (0 == strncmp("-h", argv[1], 2) || 0 == strncmp("-S", argv[1], 2)) { executor = new ObAdminServerExecutor(); } else { diff --git a/tools/ob_admin/object_storage_driver_quality/ob_admin_object_storage_driver_quality.cpp b/tools/ob_admin/object_storage_driver_quality/ob_admin_object_storage_driver_quality.cpp new file mode 100644 index 000000000..a4dcaa426 --- /dev/null +++ b/tools/ob_admin/object_storage_driver_quality/ob_admin_object_storage_driver_quality.cpp @@ -0,0 +1,1349 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "ob_admin_object_storage_driver_quality.h" + +using namespace oceanbase::share; +using namespace oceanbase::common; + +namespace oceanbase +{ +namespace tools +{ +double cal_time_diff(const timeval &start, const timeval &end) +{ + return (end.tv_sec - start.tv_sec) + (end.tv_usec - start.tv_usec) / 1e6; +} + +std::string to_string_with_precision(const double value, const int precision) +{ + + std::ostringstream out; + out << std::fixed << std::setprecision(precision) << value; + return out.str(); +} + +int generate_content_by_object_id(char *buf, const int64_t buf_len, const int64_t object_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(buf_len < 0 || object_id < 0) + || OB_UNLIKELY(buf_len > 0 && buf == nullptr)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "the argument is invalid", KR(ret), K(buf), K(buf_len), K(object_id)); + } + for (int i = 0; i < buf_len && OB_SUCC(ret); i++) { + buf[i] = (object_id + i) % INT8_MAX; + } + return ret; +} + +int check_content_by_object_id(const char *buf, const int64_t buf_len, const int64_t object_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(buf_len < 0 || object_id < 0) || OB_UNLIKELY(buf_len > 0 && buf == nullptr)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "the argument is invalid", KR(ret), K(buf)); + } + + for (int i = 0; i < buf_len && OB_SUCC(ret); i++) { + if (OB_UNLIKELY(buf[i] != (object_id + i) % INT8_MAX)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "check content failed!", KR(ret), K(i), K(buf[i]), K(object_id), K(buf_len)); + } + } + return ret; +} + +/** + * @brief Get a random length for performing a write operation or a read operation + * + * @return content_length Indicates the content length generated + */ +int64_t get_random_content_length(const OSDQOpType op_type) +{ + int min_object_size = MIN_OBJECT_SIZE; + if (op_type == APPEND_WRITE) { + min_object_size = 4; + } + int64_t content_length = 0; + double rnd = 1.0 * ObRandom::rand(1, 100) / 100; + if (rnd <= SMALL_OBJECT_SIZE_RATE) { + content_length = ObRandom::rand(min_object_size, SMALL_OBJECT_SIZE_LIMIT); + } else if (rnd <= SMALL_OBJECT_SIZE_RATE + NORMAL_OBJECT_SIZE_RATE) { + content_length = ObRandom::rand(SMALL_OBJECT_SIZE_LIMIT + 1, NORMAL_OBJECT_SIZE_LIMIT); + } else { + content_length = ObRandom::rand(NORMAL_OBJECT_SIZE_LIMIT + 1, LARGE_OBJECT_SIZE_LIMIT); + } + return content_length; +} + +int construct_object_name(const int64_t object_id, char *object_name, const int64_t object_name_len) +{ + static const char alphanum[] = + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "@=:$"; + int ret = OB_SUCCESS; + int64_t pos = 0; + if (OB_ISNULL(object_name) || OB_UNLIKELY(object_id < 0 || object_name_len != MAX_OBJECT_NAME_LENGTH)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument!", KR(ret), K(object_id), K(object_name_len)); + } else if (OB_FAIL(databuff_printf(object_name, object_name_len, pos, "object_%ld_", object_id))) { + OB_LOG(WARN, "failed to databuff_printf object_name", KR(ret), K(object_id), K(object_name_len), K(pos)); + } else { + for (int i = 0; i < MAX_OBJECT_NAME_SUFFIX_LENGTH && OB_SUCC(ret); i++) { + if (OB_LIKELY(pos < object_name_len)) { + object_name[pos] = alphanum[ObRandom::rand(0, sizeof(alphanum) - 2)]; + pos++; + } else { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "pos should be smaller than MAX_OBJECT_NAME_LENGTH", KR(ret), K(i), K(pos), K(object_name_len)); + } + } + + if (OB_SUCC(ret)) { + if (OB_LIKELY(pos < object_name_len)) { + object_name[pos] = '\0'; + } else { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "pos should be smaller than MAX_OBJECT_NAME_LENGTH", KR(ret), K(pos), K(object_name_len)); + } + } + } + return ret; +} + +int construct_file_path( + const char *base_uri, + const char *object_name, + char *file_path, + const int64_t file_path_length) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(base_uri) || OB_ISNULL(object_name) || OB_ISNULL(file_path) + || OB_UNLIKELY(file_path_length < OB_MAX_URI_LENGTH)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(base_uri), K(object_name), K(file_path), K(file_path_length)); + } else if (OB_FAIL(databuff_printf(file_path, file_path_length, "%s/%s", base_uri, object_name))) { + OB_LOG(WARN, "failed to construct object uri", KR(ret), K(base_uri), K(object_name), K(file_path_length)); + } + return ret; +} + +ObVSliceAlloc& get_vslice_alloc_instance() +{ + static ObBlockAllocMgr alloc_mgr; + static ObMemAttr attr; + static const int64_t DEFAULT_BLOCK_SIZE = 8 * 1024 * 1024; + static ObVSliceAlloc allocator(attr, DEFAULT_BLOCK_SIZE, alloc_mgr); + return allocator; +} + +int64_t allocator_cnt = 0; + +//============================ OSDQLogEntry ============================== + +OSDQLogEntry::OSDQLogEntry() + : is_inited_(false), + prefix_(), + head_holder_(), + content_() +{ +} + +int OSDQLogEntry::init(const std::string &title, const std::string &color) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + OB_LOG(WARN, "init twice", KR(ret)); + } else { + prefix_ = get_time_prefix_() + " "; + head_holder_ = std::string(prefix_.size(), ' '); + prefix_ += color + std::string("[") + title + std::string("]") + NONE_COLOR_PREFIX; + is_inited_ = true; + } + return ret; +} + +void OSDQLogEntry::print_log(const std::string &title, const std::string &content, const std::string &color) +{ + const std::string log = get_time_prefix_() + color + " [" + title + "] " + NONE_COLOR_PREFIX + content + "\n"; + std::cout << log; +} + +int get_time_formatted(char *time_str, const int time_str_len, const char *format = "%Y-%m-%d %H:%M:%S") { + int ret = OB_SUCCESS; + struct timeval current_real_time; + struct tm current_timeinfo; + gettimeofday(¤t_real_time, nullptr); + ob_localtime(const_cast(¤t_real_time.tv_sec), ¤t_timeinfo); + + if (OB_ISNULL(time_str) || OB_UNLIKELY(time_str_len <= 0) || OB_ISNULL(format)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), KP(time_str), K(time_str_len), KP(format)); + } else if (0 == strftime(time_str, time_str_len, format, ¤t_timeinfo)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "the len of time_str is not enough", KR(ret)); + } + return ret; +} + +std::string OSDQLogEntry::get_time_prefix_() +{ + int ret = OB_SUCCESS; + char time_str[TIME_STR_LENGTH] = { 0 }; + int64_t pos = 0; + std::string time_prefix = ""; + if (OB_FAIL(get_time_formatted(time_str, TIME_STR_LENGTH))) { + OB_LOG(WARN, "failed get time formatted", KR(ret)); + } else if (FALSE_IT(pos = strlen(time_str))) { + } else if (snprintf(time_str + pos, sizeof(time_str) - pos, ".%06ld", ObTimeUtility::current_time() % 1000000) < 0) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "failed append microsecond for time_str", KR(ret)); + } else { + time_prefix = std::string("[") + std::string(time_str) + std::string("]"); + } + return time_prefix; +} + +void OSDQLogEntry::log_entry_kv(const std::string &key, const std::string &value, const std::string &color) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "not init", KR(ret)); + } else { + content_ += head_holder_ + color + key + NONE_COLOR_PREFIX + ": " + value + "\n"; + } +} + +void OSDQLogEntry::log_entry(const std::string &content, const std::string &color) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "not init", KR(ret)); + } else { + content_ += head_holder_ + color + content + NONE_COLOR_PREFIX + "\n"; + } +} + +void OSDQLogEntry::print() +{ + std::string log = prefix_ + "\n" + content_; + std::cout << log; + reset(); +} + +void OSDQLogEntry::reset() +{ + prefix_ = ""; + head_holder_ = ""; + content_ = ""; + is_inited_ = false; +} + +//================== OSDQTimeMap =========================== +OSDQTimeMap::OSDQTimeMap() + : total_entry_(0) +{ +} + +int OSDQTimeMap::log_entry(const int64_t cost_time_us) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(cost_time_us <= 0)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(cost_time_us)); + } else { + const int64_t cost_time_ms = cost_time_us / 1000; + time_map_[cost_time_ms]++; + total_entry_++; + } + return ret; +} + +int OSDQTimeMap::summary(const char *map_name_str, OSDQLogEntry &log) const +{ + int ret = OB_SUCCESS; + const char *map_name = "Anonymous Time Map"; + if (OB_NOT_NULL(map_name_str)) { + map_name = map_name_str; + } + if (total_entry_ <= 0) { + log.log_entry(std::string(map_name) + " 0"); + } else { + const int64_t latency_quantile_boundaries[] = { + 1, + static_cast(total_entry_ * 0.5 + 0.5), + static_cast(total_entry_ * 0.9 + 0.5), + static_cast(total_entry_ * 0.95 + 0.5), + static_cast(total_entry_ * 0.99 + 0.5), + total_entry_ + }; + const int64_t latency_quantile_cnt = sizeof(latency_quantile_boundaries) / sizeof(latency_quantile_boundaries[0]); + assert(latency_quantile_cnt == sizeof(latency_quantile_boundaries) / sizeof(latency_quantile_boundaries[0])); + int64_t latency_quantile_vals[latency_quantile_cnt] = {0}; + int64_t boundary_index = 0; + int64_t current_cnt = 0; + + std::map::const_iterator it = time_map_.begin(); + while (boundary_index < latency_quantile_cnt && it != time_map_.end()) { + current_cnt += it->second; + while (boundary_index < latency_quantile_cnt && current_cnt >= latency_quantile_boundaries[boundary_index]) { + latency_quantile_vals[boundary_index] = it->first; + boundary_index++; + } + ++it; + } + + char buf[2048] = {0}; + int64_t pos = 0; + + if (FAILEDx(databuff_printf(buf, sizeof(buf), pos, "%s%7ld ", map_name, total_entry_))) { + OB_LOG(WARN, "failed set log str", KR(ret)); + } + + for (int i = 0; i < latency_quantile_cnt && OB_SUCC(ret); ++i) { + if (FAILEDx(databuff_printf(buf, sizeof(buf), pos, "%7ld ", + latency_quantile_vals[i]))) { + OB_LOG(WARN, "failed set log str", KR(ret), K(i), K(latency_quantile_vals[i])); + } + } + if (OB_SUCC(ret)) { + log.log_entry(std::string(buf)); + } + } + return ret; +} + +//================== OSDQMetric =========================== + +//================== OSDQMetric::ReqStatisticalsInfo ============= + +OSDQMetric::ReqStatisticalsInfo::ReqStatisticalsInfo() + : total_operation_num_(0), + total_queued_num_(0), + average_qps_(0), + average_bw_mb_(0) +{} + +//================== OSDQMetric::CpuInfo ========================= +OSDQMetric::CpuInfo::CpuInfo() + : cpu_usage_for_100MB_bw_(0), + total_cpu_usage_(0), + total_user_time_(0), + total_system_time_(0), + real_cpu_usage_(0) +{} + +//================== OSDQMetric::MemInfo ========================= +OSDQMetric::MemInfo::MemInfo() + : start_vm_size_kb_(0), + start_vm_rss_kb_(0), + object_storage_hold_kb_(0), + object_storage_used_kb_(0), + total_hold_kb_(0), + total_used_kb_(0), + vm_peak_kb_(0), + vm_size_kb_(0), + vm_hwm_kb_(0), + vm_rss_kb_(0), + ob_vslice_alloc_used_memory_kb_(0), + ob_vslice_alloc_allocator_cnt_(0) +{} + +int OSDQMetric::print_csv_title_() +{ + int ret = OB_SUCCESS; + std::ofstream ofs; + if (FALSE_IT(ofs.open(metric_csv_path_))) { + } else if (OB_UNLIKELY(!ofs.is_open())) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "failed to open metric_csv file", KR(ret), KP(metric_csv_path_)); + } else { + ofs << "id, time, " + << "total_operation_num, " + << "total_queued_num, " + << "total_throughput_mb, " + << "average_qps, " + << "average_bw_mb, " + << "real_qps, " + << "real_bw_mb," + << "cpu_usage_for_100MB_bw, " + << "total_cpu_usage, " + << "total_user_time, " + << "total_system_time, " + << "real_cpu_usage," + << "start_vm_size_kb, " + << "start_vm_rss_kb, " + << "object_storage_hold_kb, " + << "object_storage_used_kb, " + << "total_hold_kb, " + << "total_used_kb, " + << "vm_peak_kb, " + << "vm_size_kb, " + << "vm_hwm_kb, " + << "vm_rss_kb, " + << "ob_vslice_alloc_used_memory_kb, " + << "ob_vslice_alloc_allocator_cnt, " + << std::endl; + ofs.close(); + } + return ret; +} + +int OSDQMetric::print_csv_dump_() +{ + int ret = OB_SUCCESS; + char time[TIME_STR_LENGTH]; + std::ofstream ofs; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "not init", KR(ret), K(is_inited_)); + } else if (OB_FAIL(get_time_formatted(time, TIME_STR_LENGTH))) { + OB_LOG(WARN, "failed get time formatted", KR(ret)); + } else if (FALSE_IT(ofs.open(metric_csv_path_, std::ios::app))) { + } else if (OB_UNLIKELY(!ofs.is_open())) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "failed to open metric_csv file", KR(ret), KP(metric_csv_path_)); + } else { + ofs << std::fixed << std::setprecision(6) + << summary_cnt_ << "," << time << "," + << last_req_statistical_info_.total_operation_num_ << "," + << last_req_statistical_info_.total_queued_num_ << "," + << last_req_statistical_info_.total_throughput_mb_ << "," + << last_req_statistical_info_.average_qps_ << "," + << last_req_statistical_info_.average_bw_mb_ << "," + << last_req_statistical_info_.real_qps_ << "," + << last_req_statistical_info_.real_bw_mb_ << "," + << last_cpu_info_.cpu_usage_for_100MB_bw_ << "," + << last_cpu_info_.total_cpu_usage_ << "," + << last_cpu_info_.total_user_time_ << "," + << last_cpu_info_.total_system_time_ << "," + << last_cpu_info_.real_cpu_usage_ << "," + << last_mem_info_.start_vm_size_kb_ << "," + << last_mem_info_.start_vm_rss_kb_ << "," + << last_mem_info_.object_storage_hold_kb_ << "," + << last_mem_info_.object_storage_used_kb_ << "," + << last_mem_info_.total_hold_kb_ << "," + << last_mem_info_.total_used_kb_ << "," + << last_mem_info_.vm_peak_kb_ << "," + << last_mem_info_.vm_size_kb_ << "," + << last_mem_info_.vm_hwm_kb_ << "," + << last_mem_info_.vm_rss_kb_ << "," + << last_mem_info_.ob_vslice_alloc_used_memory_kb_ << "," + << last_mem_info_.ob_vslice_alloc_allocator_cnt_ << "," + << std::endl; + ofs.close(); + } + return ret; +} + +OSDQMetric::OSDQMetric() + : is_inited_(false), + summary_cnt_(0), + start_real_time_us_(ObTimeUtility::current_time()), + last_real_time_us_(start_real_time_us_), + total_operation_num_(0), + total_queued_num_(0), + total_throughput_mb_(0), + last_req_statistical_info_(), + last_cpu_info_(), + start_mem_info_(), + last_mem_info_() +{ + metric_csv_path_[0] = '\0'; +} + +OSDQMetric::~OSDQMetric() +{ + +} + +int get_metric_csv_path(char *metric_csv_path, const int metric_csv_path_len) +{ + int ret = OB_SUCCESS; + char time_str[TIME_STR_LENGTH] = { 0 }; + + const char *ob_admin_log_dir = getenv("OB_ADMIN_LOG_DIR"); + if (OB_ISNULL(metric_csv_path) || OB_UNLIKELY(metric_csv_path_len < OB_MAX_FILE_NAME_LENGTH)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), KP(metric_csv_path), K(metric_csv_path_len)); + } else if (OB_FAIL(get_time_formatted(time_str, TIME_STR_LENGTH, "%Y%m%d_%H%M%S"))) { + OB_LOG(WARN, "failed to get time str", KR(ret)); + } else if (OB_ISNULL(ob_admin_log_dir)) { + if (OB_FAIL(databuff_printf(metric_csv_path, metric_csv_path_len, "metric_csv_%s.csv", time_str))) { + OB_LOG(WARN, "failed databuff printf metric csv path", KR(ret), K(ob_admin_log_dir), K(time_str)); + } + } else if (OB_FAIL(databuff_printf(metric_csv_path, metric_csv_path_len, "%s/metric_csv_%s.csv", + ob_admin_log_dir, time_str))) { + OB_LOG(WARN, "failed databuff printf metric csv path", KR(ret), K(ob_admin_log_dir), K(time_str)); + } + return ret; +} + +int OSDQMetric::init() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + OB_LOG(WARN, "init twice", KR(ret), K(is_inited_)); + } else if (OB_FAIL(get_memory_usage(start_mem_info_))) { + OB_LOG(WARN, "failed get start memory info", KR(ret)); + } else if (OB_FAIL(get_metric_csv_path(metric_csv_path_, OB_MAX_FILE_NAME_LENGTH))) { + OB_LOG(WARN, "failed get metric csv path", KR(ret)); + } else if (OB_FAIL(print_csv_title_())) { + OB_LOG(WARN, "faild print title to metric csv file", KR(ret), KP(metric_csv_path_)); + } else { + getrusage(RUSAGE_SELF, &start_usage_); + last_usage_ = start_usage_; + is_inited_ = true; + } + return ret; +} + +int OSDQMetric::add_latency_metric( + const int64_t op_start_time_us, + const OSDQOpType op_type, + const int64_t object_size) +{ + lib::ObMutexGuard guard(mutex_); + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "not init", KR(ret), K(is_inited_)); + } else if (OB_UNLIKELY(op_start_time_us <= 0 || op_type == MAX_OPERATE_TYPE || object_size < 0)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(op_start_time_us), K(op_type), K(object_size)); + } else { + const int64_t op_cost_time_us = ObTimeUtility::current_time() - op_start_time_us; + ObjectSizeType object_size_type = ObjectSizeType::MAX_OJBECT_SIZE_TYPE; + if (object_size <= SMALL_OBJECT_SIZE_LIMIT) { + object_size_type = ObjectSizeType::SMALL_OBJECT; + } else if (object_size <= NORMAL_OBJECT_SIZE_LIMIT) { + object_size_type = ObjectSizeType::NORMAL_OBJECT_SIZE; + } else if (object_size <= LARGE_OBJECT_SIZE_LIMIT) { + object_size_type = ObjectSizeType::LARGE_OBJECT_SIZE; + } else { + ret = OB_ERR_UNEXPECTED; + OB_LOG(ERROR, "object size is too large", KR(ret), K(object_size)); + } + + if (OB_FAIL(ret)) { + } else if (OB_LIKELY(object_size_type != ObjectSizeType::MAX_OJBECT_SIZE_TYPE)) { + if (OB_FAIL(latency_maps_[op_type][object_size_type].log_entry(op_cost_time_us))) { + OB_LOG(WARN, "failed to log entry", KR(ret), K(op_type), K(op_cost_time_us)); + } else { + total_operation_num_++; + total_queued_num_--; + total_throughput_mb_ += 1.0 * object_size / 1024 / 1024; + } + } + } + return ret; +} + +int OSDQMetric::add_queued_entry() +{ + lib::ObMutexGuard guard(mutex_); + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "not init", KR(ret)); + } else { + total_queued_num_++; + } + return ret; +} + +int OSDQMetric::sub_queued_entry() +{ + lib::ObMutexGuard guard(mutex_); + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "not init", KR(ret)); + } else { + total_queued_num_--; + } + return ret; +} + +int OSDQMetric::get_memory_usage(MemInfo &mem_info) +{ + int ret = OB_SUCCESS; + std::ifstream status_file("/proc/self/status"); + std::string line; + if (OB_UNLIKELY(!status_file.is_open())) { + ret = OB_FILE_NOT_OPENED; + OB_LOG(WARN, "failed to open /proc/self/status", KR(ret)); + } + + int write_state = 0; + while (std::getline(status_file, line) && OB_SUCC(ret)) { + if (line.find("VmPeak:") == 0) { + if (OB_UNLIKELY(sscanf(line.c_str(), "VmPeak: %lf kB", &mem_info.vm_peak_kb_) != 1)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "faield to get VmPeak", KR(ret), K(line.c_str())); + } else { + write_state |= 1 << 0; + } + } else if (line.find("VmSize:") == 0) { + if (OB_UNLIKELY(sscanf(line.c_str(), "VmSize: %lf kB", &mem_info.vm_size_kb_) != 1)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "failed to get VmSize", KR(ret), K(line.c_str())); + } else { + write_state |= 1 << 1; + } + } else if (line.find("VmHWM:") == 0) { + if (OB_UNLIKELY(sscanf(line.c_str(), "VmHWM: %lf kB", &mem_info.vm_hwm_kb_) != 1)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "failed to get VmHWM", KR(ret), K(line.c_str())); + } else { + write_state |= 1 << 2; + } + } else if (line.find("VmRSS:") == 0) { + if (OB_UNLIKELY(sscanf(line.c_str(), "VmRSS: %lf kB", &mem_info.vm_rss_kb_) != 1)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "failed to get VmRSS", KR(ret), K(line.c_str())); + } else { + write_state |= 1 << 3; + } + } + } + + if (OB_FAIL(ret)) { + } else if (OB_UNLIKELY(write_state != 15)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "some memory statistics are missing", KR(ret), K(write_state)); + } + return ret; +} + +int OSDQMetric::get_req_statistical_info_(OSDQLogEntry &log) +{ + int ret = OB_SUCCESS; + const double cost_time_s = 1.0 * (ObTimeUtility::current_time() - start_real_time_us_) / 1000000; + log.log_entry("===== PART 1 REQ STATISTICAL INFO =====", DARY_GRAY_PREFIX); + if (cost_time_s > EPS) { + last_req_statistical_info_.average_qps_ = total_operation_num_ / cost_time_s; + last_req_statistical_info_.average_bw_mb_ = total_throughput_mb_ / cost_time_s; + } + double real_qps = 0; + double real_bw_mb = 0; + const double interval_time_s = (ObTimeUtility::current_time() - last_real_time_us_) / 1000000; + if (interval_time_s > EPS) { + real_qps = static_cast(total_operation_num_ - last_req_statistical_info_.total_operation_num_) / interval_time_s; + real_bw_mb = static_cast(total_throughput_mb_ - last_req_statistical_info_.total_throughput_mb_) / interval_time_s; + } + last_req_statistical_info_.total_operation_num_ = total_operation_num_; + last_req_statistical_info_.total_throughput_mb_ = total_throughput_mb_; + last_req_statistical_info_.total_queued_num_ = total_queued_num_; + last_req_statistical_info_.real_qps_ = real_qps; + last_req_statistical_info_.real_bw_mb_ = real_bw_mb; + + log.log_entry_kv("Total operation num", std::to_string(last_req_statistical_info_.total_operation_num_)); + log.log_entry_kv("Total queued num", std::to_string(last_req_statistical_info_.total_queued_num_)); + log.log_entry_kv("Total throughput bytes(MB)", + to_string_with_precision(last_req_statistical_info_.total_throughput_mb_, PRECISION)); + log.log_entry_kv("Average QPS", + to_string_with_precision(last_req_statistical_info_.average_qps_, PRECISION)); + log.log_entry_kv("Average BW(MB/s)", + to_string_with_precision(last_req_statistical_info_.average_bw_mb_, PRECISION)); + log.log_entry_kv("Real QPS", to_string_with_precision(real_qps, PRECISION)); + log.log_entry_kv("Real BW(MB/s)", to_string_with_precision(real_bw_mb, PRECISION)); + return ret; +} + +int OSDQMetric::get_req_latency_map_(OSDQLogEntry &log) +{ + int ret = OB_SUCCESS; + log.log_entry("===== PART 2 REQ LATENCY MAP =====", DARY_GRAY_PREFIX); + static const int latency_maps_num = MAX_OPERATE_TYPE; + static const char *object_size_type_time_map_str[] = { + "S |", // SMALL + "N |", // NORMAL + "L |" // LARGE + }; + + log.log_entry(" req | cnt | min | P50 | P90 | P95 | P99 | max "); + log.log_entry("--------------------------------------------------------------"); + char time_map_str[24] = {0}; + int64_t pos = 0; + int64_t cur_pos = 0; + for (int i = 0; i < latency_maps_num && OB_SUCC(ret); i++) { + pos = 0; + if (FAILEDx(databuff_printf(time_map_str, sizeof(time_map_str), pos, "%s ", osdq_op_type_names[i]))) { + OB_LOG(WARN, "failed to printf latency map str", KR(ret), K(osdq_op_type_names[i])); + } else { + for (int type = 0; type < ObjectSizeType::MAX_OJBECT_SIZE_TYPE && OB_SUCC(ret); type++) { + cur_pos = pos; + if (OB_FAIL(databuff_printf(time_map_str, sizeof(time_map_str), cur_pos, "%s", object_size_type_time_map_str[type]))) { + OB_LOG(WARN, "failed to printf object size type time map str", + KR(ret), K(object_size_type_time_map_str[type])); + } else if (OB_FAIL(latency_maps_[i][type].summary(time_map_str, log))) { + OB_LOG(WARN, "failed summary latency map", KR(ret), K(i), K(type), K(time_map_str)); + } + } + } + log.log_entry("--------------------------------------------------------------"); + } + return ret; +} + +int OSDQMetric::get_cpu_info_(OSDQLogEntry &log) +{ + int ret = OB_SUCCESS; + const double cost_time_s = 1.0 * (ObTimeUtility::current_time() - start_real_time_us_) / 1000000; + const double interval_time_s = 1.0 * (ObTimeUtility::current_time() - last_real_time_us_) / 1000000; + log.log_entry("===== PART 3 CPU INFO =====", DARY_GRAY_PREFIX); + struct rusage current_usage; + getrusage(RUSAGE_SELF, ¤t_usage); + + const double user_cpu_time_s = cal_time_diff(start_usage_.ru_utime, current_usage.ru_utime); + const double sys_cpu_time_s = cal_time_diff(start_usage_.ru_stime, current_usage.ru_stime); + double avg_cpu_usage = 0; + if (cost_time_s > EPS) { + avg_cpu_usage = (user_cpu_time_s + sys_cpu_time_s) / cost_time_s * 100; + } + double cpu_usage_for_100MB_bw = 0; + if (last_req_statistical_info_.average_bw_mb_ > EPS) { + cpu_usage_for_100MB_bw = (100.0 / last_req_statistical_info_.average_bw_mb_) * avg_cpu_usage; + } + double real_cpu_time_s = cal_time_diff(last_usage_.ru_utime, current_usage.ru_utime) + + cal_time_diff(last_usage_.ru_stime, current_usage.ru_stime); + double real_cpu_usage_ = 0; + if (interval_time_s > EPS) { + real_cpu_usage_ = real_cpu_time_s / interval_time_s * 100; + } + + log.log_entry_kv("CPU usage for 100MB/s BW", + (to_string_with_precision(cpu_usage_for_100MB_bw, PRECISION) + "% per 100MB/s")); + log.log_entry_kv("Total CPU usage", (to_string_with_precision(avg_cpu_usage, PRECISION) + "%")); + log.log_entry_kv("Total user time", (to_string_with_precision(user_cpu_time_s, PRECISION) + " s")); + log.log_entry_kv("Total system time", (to_string_with_precision(sys_cpu_time_s, PRECISION) + " s")); + log.log_entry_kv("Real CPU usage", (to_string_with_precision(real_cpu_usage_, PRECISION) + "%")); + last_usage_ = current_usage; + last_cpu_info_.cpu_usage_for_100MB_bw_ = cpu_usage_for_100MB_bw; + last_cpu_info_.total_system_time_ = sys_cpu_time_s; + last_cpu_info_.total_user_time_ = user_cpu_time_s; + last_cpu_info_.total_cpu_usage_ = avg_cpu_usage; + last_cpu_info_.real_cpu_usage_ = real_cpu_time_s; + return ret; +} + +static int iter_memory_label( + lib::ObLabel &label, + LabelItem *l_item, + int64_t &object_storage_hold_bytes, + int64_t &object_storage_used_bytes, + int64_t &total_hold_bytes, + int64_t &total_used_bytes) +{ + // OSS_SDK or StorageOss + const char *oss_label = "OSS"; + // COS_SDK or StorageCos + const char *cos_label = "COS"; + // S3_SDK or StorageS3 + const char *s3_label = "S3"; + const char *default_label = "OBJECT_STORAGE"; + const char *object_label = "OBJECT_DEVICE"; + + int ret = OB_SUCCESS; + if (strstr(label.str_, oss_label) != nullptr + || strstr(label.str_, cos_label) != nullptr + || strstr(label.str_, s3_label) != nullptr + || strstr(label.str_, default_label) != nullptr + || strstr(label.str_, object_label) != nullptr) { + object_storage_hold_bytes += l_item->hold_; + object_storage_used_bytes += l_item->used_; + } + total_hold_bytes += l_item->hold_; + total_used_bytes += l_item->used_; + return ret; +} + +int OSDQMetric::get_memory_info_(OSDQLogEntry &log, const bool is_final) +{ + int ret = OB_SUCCESS; + log.log_entry("===== PART 4 MEMORY INFO =====", DARY_GRAY_PREFIX); + if (FAILEDx(get_memory_usage(last_mem_info_))) { + OB_LOG(WARN, "failed to get memory usage", KR(ret)); + } else { + if (is_final) { + last_mem_info_.start_vm_size_kb_ = start_mem_info_.vm_size_kb_; + last_mem_info_.start_vm_rss_kb_ = start_mem_info_.vm_rss_kb_; + log.log_entry_kv("start VmSize", (std::to_string(start_mem_info_.vm_size_kb_) + " kB")); + log.log_entry_kv("start VmRSS", (std::to_string(start_mem_info_.vm_rss_kb_) + " kB")); + } + + int64_t total_hold_bytes = 0; + int64_t total_used_bytes = 0; + int64_t object_storage_hold_bytes = 0; + int64_t object_storage_used_bytes = 0; + uint64_t tenant_ids[OB_MAX_SERVER_TENANT_CNT]; + int tenant_cnt = 0; + // default get all tenant memory info + get_tenant_ids(tenant_ids, OB_MAX_SERVER_TENANT_CNT, tenant_cnt); + + for (int tenant_idx = 0; OB_SUCC(ret) && tenant_idx < tenant_cnt; tenant_idx++) { + uint64_t tenant_id = tenant_ids[tenant_idx]; + for (int ctx_id = 0; ctx_id < ObCtxIds::MAX_CTX_ID; ctx_id++) { + lib::ObTenantCtxAllocatorGuard it = ObMallocAllocator::get_instance()->get_tenant_ctx_allocator(tenant_id, ctx_id); + if (nullptr == it) { + it = ObMallocAllocator::get_instance()->get_tenant_ctx_allocator_unrecycled(tenant_id, ctx_id); + } + if (nullptr == it) { + continue; + } + + if (OB_SUCC(ret)) { + std::function iter_label_func = std::bind( + &iter_memory_label, + std::placeholders::_1, + std::placeholders::_2, + std::ref(object_storage_hold_bytes), + std::ref(object_storage_used_bytes), + std::ref(total_hold_bytes), + std::ref(total_used_bytes)); + ret = it->iter_label(iter_label_func); + } + } + } + + if (OB_FAIL(ret)) { + OB_LOG(WARN, "failed get alloc bytes", KR(ret)); + } else { + last_mem_info_.object_storage_hold_kb_ = object_storage_hold_bytes / 1024; + last_mem_info_.object_storage_used_kb_ = object_storage_used_bytes / 1024; + last_mem_info_.total_hold_kb_ = total_hold_bytes / 1024; + last_mem_info_.total_used_kb_ = total_used_bytes / 1024; + last_mem_info_.ob_vslice_alloc_used_memory_kb_ = get_vslice_alloc_instance().hold() / 1024; + last_mem_info_.ob_vslice_alloc_allocator_cnt_ = allocator_cnt; + log.log_entry_kv("ObjectStorageHold", (std::to_string(last_mem_info_.object_storage_hold_kb_) + " kB")); + log.log_entry_kv("ObjectStorageUsed", (std::to_string(last_mem_info_.object_storage_used_kb_) + " kB")); + log.log_entry_kv("TotalHold", (std::to_string(last_mem_info_.total_hold_kb_) + " kB")); + log.log_entry_kv("TotalUsed", (std::to_string(last_mem_info_.total_used_kb_) + " kB")); + log.log_entry_kv("VmPeak", (std::to_string(last_mem_info_.vm_peak_kb_) + " kB")); + log.log_entry_kv("VmSize", (std::to_string(last_mem_info_.vm_size_kb_) + " kB")); + log.log_entry_kv("VmHWM", (std::to_string(last_mem_info_.vm_hwm_kb_) + " kB")); + log.log_entry_kv("VmRSS", (std::to_string(last_mem_info_.vm_rss_kb_) + " kB")); + log.log_entry_kv("ObVSliceAlloc used memory", + (std::to_string(last_mem_info_.ob_vslice_alloc_used_memory_kb_) + " kB")); + log.log_entry_kv("ObVSliceAlloc allocator cnt", + (std::to_string(last_mem_info_.ob_vslice_alloc_allocator_cnt_) + " times")); + } + + if (OB_FAIL(ret)) { + } else if (is_final) { + if (OB_UNLIKELY(last_mem_info_.object_storage_hold_kb_ > FINAL_OBJECT_STORAGE_MEMORY_LIMIT)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(ERROR, "Object storage memory is out of limit!", K(last_mem_info_.object_storage_hold_kb_)); + log.print_log("[ERROR]", "Object Storage memory is out of limit! " + std::to_string(last_mem_info_.object_storage_hold_kb_)); + } + } + } + + return ret; +} + +int OSDQMetric::summary(const bool is_final) +{ + int ret = OB_SUCCESS; + lib::ObMutexGuard guard(mutex_); + summary_cnt_++; + OSDQLogEntry log; + std::string title = std::to_string(summary_cnt_) + "-th METRIC SUMMARY" + (is_final ? " FINAL" : ""); + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "metric not init", KR(ret), K(is_inited_)); + } else if (OB_FAIL(log.init(title, LIGHT_BLUE_PREFIX))) { + OB_LOG(WARN, "failed init log", KR(ret), K(summary_cnt_)); + } else if (OB_FAIL(get_req_statistical_info_(log))) { + OB_LOG(WARN, "failed get req statistical info", KR(ret)); + } else if (OB_FAIL(get_req_latency_map_(log))) { + OB_LOG(WARN, "failed to get req latency map", KR(ret)); + } else if (OB_FAIL(get_cpu_info_(log))) { + OB_LOG(WARN, "failed get cpu info", KR(ret)); + } else if (OB_FAIL(get_memory_info_(log, is_final))) { + OB_LOG(WARN, "failed get memory info", KR(ret)); + } else if (OB_FAIL(print_csv_dump_())) { + OB_LOG(WARN, "failed print csv dump", KR(ret)); + } else { + log.print(); + last_real_time_us_ = ObTimeUtility::current_time(); + } + + if (OB_FAIL(ret)) { + summary_cnt_--; + } + return ret; +} + +//=========================== OSDQMonitor ============================== + +OSDQMonitor::OSDQMonitor() + : is_inited_(false), + is_started_(false), + metric_(nullptr), + interval_us_(0), + tg_id_(-1) +{} + +OSDQMonitor::~OSDQMonitor() {} + +int OSDQMonitor::init(const int64_t interval_s, OSDQMetric *metric) { + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + OB_LOG(WARN, "OSDQMonitor init twice", KR(ret), K(is_inited_)); + } else if (OB_ISNULL(metric) || OB_UNLIKELY(interval_s <= 0)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid arguments", KR(ret), K(metric)); + } else if (OB_FAIL(TG_CREATE(lib::TGDefIDs::COMMON_TIMER_THREAD, tg_id_))) { + OB_LOG(WARN, "failed create timer thread", KR(ret)); + } else { + metric_ = metric; + interval_us_ = interval_s * 1000 * 1000; + is_inited_ = true; + } + return ret; +} + +int OSDQMonitor::start() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "monitor not init", KR(ret), K(is_inited_)); + } else if (OB_UNLIKELY(is_started_)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "monitor start twice", KR(ret), K(is_started_)); + } else if (OB_FAIL(TG_START(tg_id_))) { + OB_LOG(WARN, "failed start timer thread", KR(ret)); + } else if (OB_FAIL(TG_SCHEDULE(tg_id_, *this, interval_us_, true/*repeat*/, true/*immediate*/))) { + OB_LOG(WARN, "failed schedule summary task", KR(ret), K(tg_id_), K(interval_us_)); + } else { + is_started_ = true; + } + + if (OB_FAIL(ret)) { + destroy(); + } + return ret; +} + +void OSDQMonitor::destroy() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "metric monitor not init", KR(ret), K(is_inited_)); + } else { + if (tg_id_ != -1) { + TG_STOP(tg_id_); + TG_WAIT(tg_id_); + TG_DESTROY(tg_id_); + } + metric_ = nullptr; + interval_us_ = 0; + is_inited_ = false; + is_started_ = false; + } +} + +void OSDQMonitor::runTimerTask() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "metric monitor not init", KR(ret), K(is_inited_)); + } else if (OB_FAIL(metric_->summary())) { + OB_LOG(WARN, "failed exec metric summary", KR(ret)); + } +} + +//============================ OSDQParameters =============================== +OSDQParameters::OSDQParameters() + : scene_type_(-1), + run_time_s_(DEFAULT_RUN_TIME_S), + interval_s_(DEFAULT_INTERVAL_S), + thread_cnt_(DEFAULT_THREAD_CNT), + resource_limited_type_(-1), + limit_run_time_s_(DEFAULT_LIMIT_RUN_TIME_S), + limit_memory_mb_(DEFAULT_LIMIT_MEMORY_MB), + limit_cpu_(DEFAULT_LIMIT_CPU) +{ +} + +//============================ ObAdminObjectStorageDriverQualityExecutor =========================== +ObAdminObjectStorageDriverQualityExecutor::ObAdminObjectStorageDriverQualityExecutor() + : params_(), + metric_(), + monitor_() +{ +} + +int ObAdminObjectStorageDriverQualityExecutor::execute(int argc, char *argv[]) +{ + int ret = OB_SUCCESS; + OSDQScene *scene = nullptr; + const int64_t memory_limit = 16 * 1024 * 1024 * 1024LL; // 16 GB + if (OB_FAIL(set_environment_())) { + OB_LOG(WARN, "failed set environment", KR(ret)); + } else if (OB_FAIL(parse_cmd_(argc, argv))) { + OB_LOG(WARN, "failed to parse cmd", KR(ret), K(argc), K(argv)); + } else if (OB_FAIL(metric_.init())) { + OB_LOG(WARN, "failed init metric", KR(ret)); + } else if (OB_ISNULL(scene = create_scene_())) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "failed to create scene", K(ret)); + } else if (OB_FAIL(scene->init(¶ms_, &metric_))) { + if (ret == OB_INVALID_ARGUMENT) { + print_usage_(); + } + OB_LOG(WARN, "failed to init scene", KR(ret), K(params_)); + } else if (OB_FAIL(scene->set_thread_cnt(params_.thread_cnt_))) { + OB_LOG(WARN, "failed set thread cnt", KR(ret)); + } else if (OB_FAIL(monitor_.init(params_.interval_s_, &metric_))) { + OB_LOG(WARN, "failed init monitor", KR(ret), K(params_.interval_s_)); + } else if (OB_FAIL(monitor_.start())) { + OB_LOG(WARN, "failed start monitor", KR(ret)); + } else if (OB_FAIL(scene->execute())) { + OB_LOG(WARN, "failed to execute scene", KR(ret), K(params_)); + OSDQLogEntry::print_log("TEST RESULT FAILED", "", RED_COLOR_PREFIX); + } + monitor_.destroy(); + malloc_trim(0); + free_scene_(scene); + if (OB_SUCC(ret)) { + OSDQLogEntry::print_log("TEST RESULT SUCCESS", ""); + OSDQLogEntry::print_log("WAIT 10 seconds for MemoryDump to refresh", ""); + // wait 10 seconds for MemoryDump to refresh + ::sleep(10); + if (OB_FAIL(metric_.summary(true/*is_final*/))) { + OB_LOG(WARN, "failed to execute the last summary", KR(ret)); + } + } + return ret; +} + +int ObAdminObjectStorageDriverQualityExecutor::parse_cmd_(int argc, char *argv[]) +{ + int ret = OB_SUCCESS; + int opt = 0; + int index = -1; + const char *opt_str = "h:d:s:S:R:r:i:t:"; + struct option longopts[] = { + {"help", 0, NULL, 'h'}, + {"file-path-prefix", 1, NULL, 'd'}, + {"storage-info", 1, NULL, 's'}, + {"scene-type", 1, NULL, 'S'}, + {"resource-limited-type", 1, NULL, 'R'}, + {"limit-run-time", 1, NULL, '0'}, + {"limit-memory", 1, NULL, '0'}, + {"limit-cpu", 1, NULL, '0'}, + {"run-time", 1, NULL, 'r'}, + {"interval", 1, NULL, 'i'}, + {"thread_cnt", 1, NULL, 't'}, + {NULL, 0, NULL, 0}, + }; + while (OB_SUCC(ret) && -1 != (opt = getopt_long(argc, argv, opt_str, longopts, &index))) { + switch (opt) { + case 'h': { + print_usage_(); + exit(1); + } + case 'd': { + time_t timestamp = time(NULL); + struct tm *timeinfo = localtime(×tamp); + char buf[OB_MAX_TIME_STR_LENGTH]; + strftime(buf, sizeof(buf), "%Y-%m-%d-%H:%M:%S", timeinfo); + if (OB_FAIL(databuff_printf(params_.base_path_, sizeof(params_.base_path_), "%s", optarg))) { + OB_LOG(WARN, "failed to construct base path", KR(ret), K((char *)optarg), K(buf)); + } + break; + } + case 's': { + if (OB_FAIL(databuff_printf(params_.storage_info_str_, sizeof(params_.storage_info_str_), "%s", optarg))) { + OB_LOG(WARN, "failed to copy storage info str", KR(ret), K((char *)optarg)); + } + break; + } + case 'S': { + if (OB_FAIL(c_str_to_int(optarg, params_.scene_type_))) { + OB_LOG(WARN, "failed to parse scene type", KR(ret), K((char *) optarg)); + } + break; + } + case 'R': { + if (OB_FAIL(c_str_to_int(optarg, params_.resource_limited_type_))) { + OB_LOG(WARN, "failed to parse resource limited type", KR(ret), K((char *)optarg)); + } + break; + } + case 'r': { + if (OB_FAIL(c_str_to_int(optarg, params_.run_time_s_))) { + OB_LOG(WARN, "failed to parse run time", KR(ret), K((char *)optarg)); + } + break; + } + case 'i': { + if (OB_FAIL(c_str_to_int(optarg, params_.interval_s_))) { + OB_LOG(WARN, "failed to parse interval", KR(ret), K((char *)optarg)); + } + break; + } + case 't': { + if (OB_FAIL(c_str_to_int(optarg, params_.thread_cnt_))) { + OB_LOG(WARN, "failed to parse thread cnt", KR(ret), K((char *)optarg)); + } + break; + } + case '0': { + if (index >= 0) { + const char *opt_name = longopts[index].name; + if (strcmp(opt_name, "limit-run-time") == 0) { + if (OB_FAIL(c_str_to_int(optarg, params_.limit_run_time_s_))) { + OB_LOG(WARN, "failed to parse limit run time", KR(ret), K((char *) optarg)); + } + } else if (strcmp(opt_name, "limit-memory") == 0) { + if (OB_FAIL(c_str_to_int(optarg, params_.limit_memory_mb_))) { + OB_LOG(WARN, "failed to parse limit memory", KR(ret), K((char *) optarg)); + } + } else if (strcmp(opt_name, "limit-cpu") == 0) { + try { + params_.limit_cpu_ = std::stod(optarg); + } catch (...) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "failed to parse limit cpu", KR(ret), K((char *) optarg)); + } + } + break; + } else { + print_usage_(); + exit(1); + } + } + default: { + print_usage_(); + exit(1); + } + } + + } + return ret; +} + +int ObAdminObjectStorageDriverQualityExecutor::set_environment_() +{ + int ret = OB_SUCCESS; + init_malloc_hook(); + lib::set_memory_limit(MEMORY_LIMITED_SIZE); + lib::set_tenant_memory_limit(OB_SERVER_TENANT_ID, MEMORY_LIMITED_SIZE); + mallopt(M_MMAP_THRESHOLD, 128 * 1024); + OB_LOGGER.set_log_level("INFO"); + + ObTenantBase *tenant_base = new ObTenantBase(OB_SERVER_TENANT_ID); + ObMallocAllocator *malloc = ObMallocAllocator::get_instance(); + + // set tenant memory limit + if (OB_ISNULL(malloc->get_tenant_ctx_allocator(OB_SERVER_TENANT_ID, 0))) { + if (OB_FAIL(malloc->create_and_add_tenant_allocator(OB_SERVER_TENANT_ID))) { + OB_LOG(WARN, "failed to create_and_add_tenant_allocator", KR(ret)); + } + } + + // init tenant and tenant io manager + if (FAILEDx(tenant_base->init())) { + OB_LOG(WARN, "failed to init tenant base", KR(ret)); + } else if (FALSE_IT(ObTenantEnv::set_tenant(tenant_base))) { + } else if (OB_FAIL(ObDeviceManager::get_instance().init_devices_env())) { + OB_LOG(WARN, "init device manager failed", KR(ret)); + } else if (OB_FAIL(ObIOManager::get_instance().init(MEMORY_LIMITED_SIZE))) { + OB_LOG(WARN, "failed to init io manager", KR(ret)); + } else if (OB_FAIL(ObIOManager::get_instance().start())) { + OB_LOG(WARN, "failed to start io manager", KR(ret)); + } + + // set tenant io manager memory limit; + ObRefHolder tenant_holder; + if (FAILEDx(OB_IO_MANAGER.get_tenant_io_manager(OB_SERVER_TENANT_ID, tenant_holder))) { + OB_LOG(WARN, "failed get tenant io manager", KR(ret)); + } else if (OB_FAIL(tenant_holder.get_ptr()->update_memory_pool(MEMORY_LIMITED_SIZE))) { + OB_LOG(WARN, "failed update memory limit", KR(ret), K(MEMORY_LIMITED_SIZE)); + } + + if (FAILEDx(ObClockGenerator::get_instance().init())) { + OB_LOG(WARN, "failed init clock generate", KR(ret)); + } else if (OB_FAIL(ObMemoryDump::get_instance().init())) { + OB_LOG(WARN, "failed init MemoryDump", KR(ret)); + } + return ret; +} + +int ObAdminObjectStorageDriverQualityExecutor::print_usage_() +{ + int ret = OB_SUCCESS; + printf("\n"); + printf("Usage: io_driver_quality command [command args] [options]\n"); + printf("commands:\n"); + printf(HELP_FMT, "-h, --help", "display this message."); + printf("options:\n"); + printf(HELP_FMT, "-d, --file-path-prefix", "absolute file path with file prefix"); + printf(HELP_FMT, "-s, --storage-info", "oss/cos should provide storage info"); + printf(HELP_FMT, "-S, --scene-type", "indicate the Scene to be run"); + printf(HELP_FMT, "", "0, Hybrid Scene"); + printf(HELP_FMT, "", "1, Resource Limited Scene"); + printf(HELP_FMT, "", "2, ErrSim Scene"); + printf(HELP_FMT, "-r, --run-time", "scene run time(s)"); + printf(HELP_FMT, "-i, --interval", "the interval(in seconds) between echo print monitor statistics, the default is 1s"); + printf(HELP_FMT, "-t --thread_cnt", "indicate the thread num"); + printf(HELP_FMT, "-R, --resource-limited-type", "indicate the resource limited type"); + printf(HELP_FMT, "", "0, network packet loss limited type"); + printf(HELP_FMT, "", "1, network bandwidth limited type"); + printf(HELP_FMT, "", "2, memory limited type"); + printf(HELP_FMT, "", "3, cpu limited type"); + printf(HELP_FMT, "--limit-run-time", "indicate the time(s) of resource limitation, dafault is 10s"); + printf(HELP_FMT, "--limit-memory", "indicate the limit meory size(MB), default is 64MB"); + printf(HELP_FMT, "--limit-cpu", "indicate the limit of cpu rate(0~1.0), default is 0.2"); + return ret; +} + +OSDQScene *ObAdminObjectStorageDriverQualityExecutor::create_scene_() +{ + int ret = OB_SUCCESS; + OSDQScene *scene = nullptr; + ObVSliceAlloc &allocator = get_vslice_alloc_instance(); + if (params_.scene_type_ == HYBRID_TEST_SCENE) { + if (OB_ISNULL(scene = static_cast(allocator.alloc(sizeof(OSDQHybridTestScene))))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "fail to alloc memory for hybrid test scene", KR(ret), K(sizeof(OSDQHybridTestScene))); + } else { + scene = new (scene) OSDQHybridTestScene(); + } + } else if (params_.scene_type_ == RESOURCE_LIMITED_SCENE) { + if (OB_ISNULL(scene = static_cast(allocator.alloc(sizeof(OSDQResourceLimitedScene))))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "fail to alloc memory for resource limited scene", KR(ret), K(sizeof(OSDQResourceLimitedScene))); + } else { + scene = new (scene) OSDQResourceLimitedScene(); + } + } else if (params_.scene_type_ == ERRSIM_SCENE) { + if (OB_ISNULL(scene = static_cast(allocator.alloc(sizeof(OSDQErrSimScene))))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "fail to alloc memory for errsim scene", KR(ret), K(sizeof(OSDQErrSimScene))); + } else { + scene = new (scene) OSDQErrSimScene(); + } + } else { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "error scene type", KR(ret)); + print_usage_(); + } + return scene; +} + +void ObAdminObjectStorageDriverQualityExecutor::free_scene_(OSDQScene *&scene) +{ + int ret = OB_SUCCESS; + if (OB_NOT_NULL(scene)) { + if (params_.scene_type_ == HYBRID_TEST_SCENE) { + dynamic_cast(scene)->~OSDQHybridTestScene(); + } else if (params_.scene_type_ == RESOURCE_LIMITED_SCENE) { + dynamic_cast(scene)->~OSDQResourceLimitedScene(); + } else if (params_.scene_type_ == ERRSIM_SCENE) { + dynamic_cast(scene)->~OSDQErrSimScene(); + } else { + } + get_vslice_alloc_instance().free(scene); + scene = nullptr; + } +} + +//=========================== OSDQFileSet =============================== + +OSDQFileSet::OSDQFileSet() {} +OSDQFileSet::~OSDQFileSet() +{ + FilePathMap::iterator it = file_path_map_.begin(); + while (it != file_path_map_.end()) { + if (it->second != nullptr) { + free(it->second); + it->second = nullptr; + } + it++; + } +} + +int OSDQFileSet::add_file(const int64_t object_id, const char *file_path) +{ + lib::ObMutexGuard guard(mutex_); + int ret = OB_SUCCESS; + char *file_path_copy = nullptr; + if (OB_UNLIKELY(object_id < 0 || file_path == nullptr)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "the argument is invalid", KR(ret), K(object_id), K(file_path)); + } else if (OB_UNLIKELY(file_set_.find(object_id) != file_set_.end())) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "object_id is already in the file set", KR(ret)); + } else if (OB_ISNULL(file_path_copy = static_cast(malloc(OB_MAX_URI_LENGTH)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "failed allocate memory for file path"); + } else { + MEMCPY(file_path_copy, file_path, OB_MAX_URI_LENGTH); + file_set_.insert(object_id); + file_path_map_[object_id] = file_path_copy; + } + return ret; +} + +int OSDQFileSet::fetch_and_delete_file(int64_t &object_id, char *&file_path) +{ + lib::ObMutexGuard guard(mutex_); + int ret = OB_SUCCESS; + if (OB_UNLIKELY(file_set_.size() <= 0)) { + ret = OB_FILE_NOT_EXIST; + OB_LOG(WARN, "the file set is empty", KR(ret), K(file_set_.size())); + } else { + // To quickly fetch a random element from file_set, we first generate a random value from [min, max] + // and then look for lower_bound in file_set based on it. + // Note that the probability that each element will be fetched is not the same when the file_set are + // sparse, but this can be allowed to happen for now. + int64_t min_object_id = *file_set_.begin(); + int64_t max_object_id = *file_set_.rbegin(); + int64_t random_object_id = ObRandom::rand(min_object_id, max_object_id); + std::set::iterator it = file_set_.lower_bound(random_object_id); + if (OB_UNLIKELY(it == file_set_.end() || file_path_map_.find(*it) == file_path_map_.end())) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "failed to get random object id", KR(ret)); + } else { + object_id = *it; + file_path = file_path_map_[object_id]; + file_path_map_.erase(object_id); + file_set_.erase(it); + } + } + return ret; +} + +size_t OSDQFileSet::size() const +{ + return file_set_.size(); +} + + +} // namespace tools +} // namespace oceanbase diff --git a/tools/ob_admin/object_storage_driver_quality/ob_admin_object_storage_driver_quality.h b/tools/ob_admin/object_storage_driver_quality/ob_admin_object_storage_driver_quality.h new file mode 100644 index 000000000..dfa882325 --- /dev/null +++ b/tools/ob_admin/object_storage_driver_quality/ob_admin_object_storage_driver_quality.h @@ -0,0 +1,713 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OB_ADMIN_IO_DRIVER_QUALITY_H_ +#define OB_ADMIN_IO_DRIVER_QUALITY_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "../ob_admin_executor.h" +#include "deps/oblib/src/lib/task/ob_timer.h" +#include "deps/oblib/src/lib/ob_define.h" // OB_MAX_URI_LENGTH +#include "share/backup/ob_backup_struct.h" // OB_MAX_BACKUP_STORAGE_INFO_LENGTH +#include "share/ob_thread_mgr.h" +#include "share/backup/ob_backup_io_adapter.h" +#include "src/share/io/ob_io_manager.h" +#include "src/share/ob_device_manager.h" +#include "deps/oblib/src/lib/lock/ob_rwlock.h" +#include "deps/oblib/src/lib/allocator/ob_malloc.h" +#include "deps/oblib/src/lib/allocator/ob_vslice_alloc.h" +#include "deps/oblib/src/lib/alloc/ob_malloc_sample_struct.h" +#include "deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.h" +#include "deps/oblib/src/lib/alloc/memory_dump.h" +#include "deps/oblib/src/lib/alloc/malloc_hook.h" + +namespace oceanbase +{ +namespace tools +{ +// color prefix in shell +const std::string NONE_COLOR_PREFIX = "\033[m"; +const std::string GREEN_COLOR_PREFIX = "\033[0;32;32m"; +const std::string RED_COLOR_PREFIX = "\033[0;32;31m"; +const std::string LIGHT_BLUE_PREFIX = "\033[1;34m"; +const std::string DARY_GRAY_PREFIX = "\033[1;30m"; + +// global constant +constexpr int64_t TIME_STR_LENGTH = 128; +constexpr int64_t FILE_PATH_LENGTH = common::OB_MAX_URI_LENGTH; +constexpr int64_t MAX_CONTENT_LENGTH = 128L * 1024 * 1024; +constexpr int64_t MAX_OBJECT_NAME_LENGTH = 50; +constexpr int64_t MAX_OBJECT_NAME_SUFFIX_LENGTH = 16; +constexpr int64_t MIN_OBJECT_SIZE = 0; +constexpr int64_t SMALL_OBJECT_SIZE_LIMIT = 128 * 1024; // 128 KB +constexpr int64_t NORMAL_OBJECT_SIZE_LIMIT = 2L * 1024 * 1024; // 2MB +constexpr int64_t LARGE_OBJECT_SIZE_LIMIT = 128L * 1024 * 1024; // 128MB +constexpr int64_t MEMORY_LIMITED_SIZE = 16L * 1024 * 1024 * 1024; +constexpr int64_t FINAL_OBJECT_STORAGE_MEMORY_LIMIT = 50 * 1024 * 1024; // 50MB + +constexpr double EPS = 1e-6; +// The LARGE_OBJECT_SIZE_RATE = 1.0 - SMALL_OBJECT_SIZE_RATE - NORMAL_OBJECT_SIZE_RATE; +constexpr double SMALL_OBJECT_SIZE_RATE = 0.2; +constexpr double NORMAL_OBJECT_SIZE_RATE = 0.75; +static_assert(SMALL_OBJECT_SIZE_RATE + NORMAL_OBJECT_SIZE_RATE < 1, "The sum of SMALL_OBJECT_SIZE_RATE and NORMAL_OBJECT_SIZE_LIMIT should be less than 1"); + +// Default Scene Parameter +constexpr int64_t DEFAULT_RUN_TIME_S = 20; // the default running time of the scene +constexpr int64_t DEFAULT_INTERVAL_S = 1; // the default interval between each metric display +constexpr int64_t DEFAULT_THREAD_CNT = 16; // the dafault thread cnt for task hanlder +constexpr int64_t DEFAULT_LIMIT_RUN_TIME_S = 10; // the default duration of the restrictions in the limited scene +constexpr int64_t DEFAULT_LIMIT_MEMORY_MB = 64; // the default memory limited in the limited scene +constexpr double DEFAULT_LIMIT_CPU = 0.2; // the default cpu limited in the limited scene + +constexpr int64_t DEFAULT_QUEUE_SIZE = 100; // the default task handler queue size +constexpr int64_t DEFAULT_PROB_OF_WRITING_OLD_DATA = 20; +constexpr int64_t DEFAULT_PROB_OF_PARALLEL = 10; + +enum OSDQOpType +{ + WRITE_SINGLE_FILE, + MULTIPART_WRITE, + APPEND_WRITE, + READ_SINGLE_FILE, + DEL_FILE, + MAX_OPERATE_TYPE // invalid type +}; + + +// in order to show in metric summary +static const char *osdq_op_type_names[] = { + "WS", + "MW", + "AW", + "RS", + "DE" +}; + +static_assert((sizeof(osdq_op_type_names) / sizeof(char *)) == static_cast(MAX_OPERATE_TYPE), + "the length of osdq_op_type_names should be equal to MAX_OPERATE_TYPE"); + +double cal_time_diff(const timeval &start, const timeval &end); + +/** + * @brief Generate the contents of the file based on the object_id + * + * @param buf [out] The pointer to the contents of the file and must be requested in advance with a length of buf_len + * @param buf_len [in] The length of the buf + * @param object_id [in] The Object's ID + */ +int generate_content_by_object_id(char *buf, const int64_t buf_len, const int64_t object_id); + +/** + * @brief Verify that content is correct by object_id + * + * @param buf [in] The pointer to the content + * @param buf_len [in] The length of the buf + * @param object_id [in] The Object's ID + */ +int check_content_by_object_id(const char *buf, const int64_t buf_len, const int64_t object_id); + +/** + * @brief Get a random length for performing a write operation or a read operation + * + * @return content_length Indicates the content length generated + */ +int64_t get_random_content_length(const OSDQOpType op_type); + +/** + * @brief Generate object_name from object_id. + * For example, if object_id is k, then the generated line will be like object_k_{suffic}, where suffic is a random string of length MAX_OBJECT_NAME_SUFFIX_LENGTH containing numbers and upper and lower case letters. + * @param object_id [in] The Object's ID + * @param object_name [out] The Object's name + * @param object_name_len [in] indicates the maximum length of object_name + */ +int construct_object_name(const int64_t object_id, char *object_name, const int64_t object_name_len); + +int construct_file_path( + const char *base_uri, + const char *object_name, + char *file_path, + const int64_t file_path_length); + +ObVSliceAlloc &get_vslice_alloc_instance(); + +extern int64_t allocator_cnt; + +template +class STLMemAllocator +{ +public: + ObVSliceAlloc &allocator_; +public: + using value_type = T; + using pointer = T*; + using const_pointer = const T*; + using void_pointer = void*; + using const_void_pointer = const void*; + using size_type = size_t; + using difference_type = ptrdiff_t; + + template + struct rebind + { + using other = STLMemAllocator; + }; + STLMemAllocator() : allocator_(get_vslice_alloc_instance()) + {} + template + STLMemAllocator(const STLMemAllocator &other) : allocator_(other.allocator_) + {} + pointer allocate(size_type n, const_void_pointer hint = 0) + { + int ret = OB_SUCCESS; + void *ptr = nullptr; + do { + ptr = allocator_.alloc(n * sizeof(T)); + if (OB_ISNULL(ptr)) { + ::usleep(10000); //10ms + if (TC_REACH_TIME_INTERVAL(10 * 1000 * 1000)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "failed to allocate memory", KR(ret), K(n)); + } + } + } while (OB_ISNULL(ptr)); + allocator_cnt++; + return static_cast(ptr); + } + void deallocate(pointer p, size_type n) noexcept { allocator_.free(p); } + size_type max_size() const noexcept { return allocator_.limit(); } + template + void construct(U *p, Args &&...args) + { + new(p) U(std::forward(args)...); + } + template + void destroy(U* p) { p->~U(); } +}; + +class OSDQLogEntry +{ +public: + OSDQLogEntry(); + ~OSDQLogEntry() {} + int init(const std::string &title, const std::string &color = NONE_COLOR_PREFIX); + static void print_log(const std::string &title, const std::string &content, const std::string &color = GREEN_COLOR_PREFIX); + void log_entry_kv(const std::string &key, const std::string &value, const std::string &color = NONE_COLOR_PREFIX); + void log_entry(const std::string &content, const std::string &color = NONE_COLOR_PREFIX); + void print(); + void reset(); + +private: + static std::string get_time_prefix_(); + +private: + bool is_inited_; + std::string prefix_; + std::string head_holder_; + std::string content_; +}; + +/** + * @class OSDQTimeMap + * @brief Save all the latency for a certain operation + */ +class OSDQTimeMap +{ +public: + OSDQTimeMap(); + ~OSDQTimeMap() {} + /** + * @brief Add the time consumed by a IO operaton to the time_map + * @param cost_time_us [in] the time consumed by a IO operaton + */ + int log_entry(const int64_t cost_time_us); + /** + * @brief print the information of the time_map + * @param map_name_str [in] title of the information + */ + int summary(const char *map_name_str, OSDQLogEntry &log) const; + + /** + * @brief Just estimating the dynamic memory requested by time_map_ + */ + TO_STRING_KV(K_(total_entry)); +private: + int64_t total_entry_; + std::map, + STLMemAllocator>> time_map_; +}; + +/** + * @class OSDQMetric + * @brief record the runtime metric, like requests statistics, memory info, cpu info, etc. + * Since the size of read and write operation can be random, + * statistics will be categorised by request size + * The following table shows the relationship between file size and attributes: + * + * File Size | Attribute + * ------------------------------------------ + * file_size <= 128KB | small object + * 128 KB < file_size <= 2MB | normal object + * 2MB < file_size <= 128MB | large object + */ +class OSDQMetric +{ + +public: + enum ObjectSizeType + { + SMALL_OBJECT = 0, + NORMAL_OBJECT_SIZE = 1, + LARGE_OBJECT_SIZE = 2, + MAX_OJBECT_SIZE_TYPE + }; + + struct ReqStatisticalsInfo + { + ReqStatisticalsInfo(); + int64_t total_operation_num_; // total req numbers + int64_t total_queued_num_; + double total_throughput_mb_; + double average_qps_; + double average_bw_mb_; + double real_qps_; + double real_bw_mb_; + }; + + struct CpuInfo + { + CpuInfo(); + double cpu_usage_for_100MB_bw_; + double total_cpu_usage_; + double total_user_time_; + double total_system_time_; + double real_cpu_usage_; + }; + + struct MemInfo + { + MemInfo(); + double start_vm_size_kb_; + double start_vm_rss_kb_; + double object_storage_hold_kb_; + double object_storage_used_kb_; + double total_hold_kb_; + double total_used_kb_; + double vm_peak_kb_; // maximum virtual memory usage + double vm_size_kb_; // current virtual memory usage + double vm_hwm_kb_; // physical maxiumu memory usage + double vm_rss_kb_; // current physical memory usage + double ob_vslice_alloc_used_memory_kb_; + int64_t ob_vslice_alloc_allocator_cnt_; + }; + +public: + OSDQMetric(); + ~OSDQMetric(); + int init(); + /** + * @brief add the latency of the IO operation to the corresponding latency_map + * @param op_start_time_us [in] the start time of the IO operation + * @param op_type [in] type of operation, e.g. read or write + * @param object_size [in] the operation's size + */ + int add_latency_metric( + const int64_t op_start_time_us, + const OSDQOpType op_type, + const int64_t object_size); + int add_queued_entry(); + int sub_queued_entry(); + int summary(const bool is_final = false); + +public: + static int get_memory_usage(MemInfo &mem_info); + double get_real_cpu_usage() const { return last_cpu_info_.real_cpu_usage_; } + +private: + int get_req_statistical_info_(OSDQLogEntry &log); + int get_req_latency_map_(OSDQLogEntry &log); + int get_cpu_info_(OSDQLogEntry &log); + int get_memory_info_(OSDQLogEntry &log, const bool is_final = false); + + int print_csv_title_(); + int print_csv_dump_(); + +private: + static constexpr int64_t TITLE_LEN = 64; + static constexpr int64_t PRECISION = 2; + lib::ObMutex mutex_; + bool is_inited_; + char metric_csv_path_[OB_MAX_FILE_NAME_LENGTH]; + + int64_t summary_cnt_; + int64_t start_real_time_us_; + int64_t last_real_time_us_; + int64_t total_operation_num_; + int64_t total_queued_num_; + double total_throughput_mb_; + ReqStatisticalsInfo last_req_statistical_info_; + struct rusage start_usage_; + struct rusage last_usage_; + CpuInfo last_cpu_info_; + MemInfo start_mem_info_; + MemInfo last_mem_info_; + OSDQTimeMap latency_maps_[OSDQOpType::MAX_OPERATE_TYPE][ObjectSizeType::MAX_OJBECT_SIZE_TYPE]; +}; + +/** + * @class OSDQMonitor + * @brief Monitoring threads, used to print monitoring data such as qps, latency, etc. at regular intervals. + */ +class OSDQMonitor : public common::ObTimerTask +{ +public: + OSDQMonitor(); + virtual ~OSDQMonitor(); + int init(const int64_t interval_s, OSDQMetric *metric); + int start(); + void destroy(); + virtual void runTimerTask() override; + +private: + bool is_inited_; + bool is_started_; + OSDQMetric *metric_; + int64_t interval_us_; + int tg_id_; +}; + +struct OSDQParameters +{ + OSDQParameters(); + ~OSDQParameters() {} + TO_STRING_KV(K(base_path_), K(storage_info_str_), K(scene_type_), + K(run_time_s_), K(interval_s_), K(thread_cnt_), K(resource_limited_type_), + K(limit_run_time_s_), K(limit_memory_mb_), K(limit_cpu_)); + // common param + char base_path_[common::OB_MAX_URI_LENGTH]; + char storage_info_str_[common::OB_MAX_BACKUP_STORAGE_INFO_LENGTH]; + int64_t scene_type_; + int64_t run_time_s_; + int64_t interval_s_; + int64_t thread_cnt_; + + //ResourceLimitedScene param + int64_t resource_limited_type_; + int64_t limit_run_time_s_; + int64_t limit_memory_mb_; + double limit_cpu_; +}; + +class OSDQScene; +/** + * @class ObAdminObjectStorageDriverQualityExecutor + * @brief The execution entry point of the programme, used for parameter parsing, to start the execution of the scenario. + */ +class ObAdminObjectStorageDriverQualityExecutor : public ObAdminExecutor +{ + enum SceneType + { + HYBRID_TEST_SCENE, + RESOURCE_LIMITED_SCENE, + ERRSIM_SCENE, + MAX_SCENE + }; +public: + ObAdminObjectStorageDriverQualityExecutor(); + virtual ~ObAdminObjectStorageDriverQualityExecutor() {}; + virtual int execute(int argc, char *argv[]) override; + +private: + int parse_cmd_(int argc, char *argv[]); + int set_environment_(); + int run_all_tests_(); + int print_usage_(); + +private: + OSDQScene *create_scene_(); + void free_scene_(OSDQScene *&scene); + +private: + OSDQParameters params_; + OSDQMetric metric_; + OSDQMonitor monitor_; +private: + static constexpr char *HELP_FMT = const_cast("\t%-30s%-12s\n"); + static constexpr int64_t N_WAY = 32; + static constexpr int64_t DEFAULT_BLOCK_SIZE = 2L * 1024 * 1024; + DISALLOW_COPY_AND_ASSIGN(ObAdminObjectStorageDriverQualityExecutor); +}; + +class OSDQIDGenerator +{ +public: + OSDQIDGenerator() : current_id_(0) {} + int64_t get_next_id() { return current_id_.fetch_add(1) + 1; } + + static OSDQIDGenerator &get_instance() + { + static OSDQIDGenerator instance; + return instance; + } + +private: + std::atomic current_id_; +}; + +/** + * @class OSDQFileSet + * @brief This class maintains a set of files that have been written, along with the file path. + */ +class OSDQFileSet +{ + template + struct equal_to { + bool operator()(const T &lhs, const T &rhs) const { + return lhs == rhs; + } + }; + typedef std::unordered_map, equal_to, STLMemAllocator>> FilePathMap; + typedef std::set, STLMemAllocator> FileSet; +public: + OSDQFileSet(); + ~OSDQFileSet(); + int add_file(const int64_t object_id, const char *file_path); + /** + * @brief randomly select a file from the file set that have been written to + * then delete it from the file set + * @param object_id [out] the selected file's object_id + * @param file_path [out] the selected file's path + */ + int fetch_and_delete_file(int64_t &object_id, char *&file_path); + size_t size() const; +private: + lib::ObMutex mutex_; + FileSet file_set_; + FilePathMap file_path_map_; +}; + +struct OSDQTask +{ + OSDQTask(); + ~OSDQTask(); + bool is_valid() const; + TO_STRING_KV(K_(op_type), K_(object_id), K_(uri), K_(buf_len), K(start_time_stamp_us_), K(parallel_), K(parallel_op_type_)); + OSDQOpType op_type_; + int64_t object_id_; + char *uri_; + char *buf_; + int64_t buf_len_; + int64_t start_time_stamp_us_; + + bool parallel_; // indicates this task is concurrent + OSDQOpType parallel_op_type_; +}; + +void free_task(OSDQTask *&task); + +class OSDQResourceLimitedScene; +class OSDQErrSimScene; +/** + * @class OSDQTaskHandler + * @brief Responsible for task generation and processing + */ +class OSDQTaskHandler : public lib::TGTaskHandler +{ +public: + OSDQTaskHandler(); + virtual ~OSDQTaskHandler(); + int init( + const char *base_uri, + OSDQMetric *metric, + OSDQFileSet *file_set, + const share::ObBackupStorageInfo *storage_info); + void destroy(); + int start(); + void stop(); + int wait(); + int set_thread_cnt(const int64_t thread_cnt); + virtual void handle(void *task) override; + virtual void handle_drop(void *task) override; + int push_task(OSDQTask *task); + +public: + int set_operator_weight(const OSDQOpType op_type, const int64_t op_type_weight); + int set_prob_of_writing_old_data(const int64_t prob); + int set_prob_of_parallel(const int64_t prob); + int gen_task(OSDQTask *&task); + int gen_write_task(OSDQTask *task, const OSDQOpType op_type); + int handle_write_task(const OSDQTask *task); + int gen_read_single_task(OSDQTask *task); + int handle_read_single_task(OSDQTask *task); + int gen_del_task(OSDQTask *task); + int handle_del_task(OSDQTask *task); + +public: + friend class OSDQResourceLimitedScene; + friend class OSDQErrSimScene; + +private: + std::packaged_task get_packaged_task_(OSDQOpType op_type, const OSDQTask *task); + int handle_read_single_task_helper_(OSDQTask *task); + int handle_write_single_task_helper_(const OSDQTask *task); + int handle_multipart_write_task_helper_(const OSDQTask *task); + int handle_append_write_task_helper_(const OSDQTask *task); + bool check_parallel_write_result_( + const OSDQOpType op_type1, + const int ret1, + const OSDQOpType op_type2, + const int ret2); + + void push_req_result_(const int ret); + +private: + int64_t op_type_weights_[OSDQOpType::MAX_OPERATE_TYPE]; + int64_t op_type_random_boundaries_[OSDQOpType::MAX_OPERATE_TYPE]; + int64_t prob_of_writing_old_data_; // the range is [0, 100), 0 indicates impossiblly write old file + int64_t prob_of_parallel_; // the probabilty of parallelly execution, generally considering only write or read operations + +private: + bool is_inited_; + bool is_stopped_; + int tg_id_; // thread group id + int thread_cnt_; + const char *base_uri_; + OSDQMetric *metric_; + OSDQFileSet *file_set_; + const share::ObBackupStorageInfo *storage_info_; + ObBackupIoAdapter adapter_; + lib::ObMutex mutex_; + std::vector> req_results_; +}; + + +/** + * @class OSDQScene + * @brief base class for scenarios + */ +class OSDQScene +{ +public: + OSDQScene(); + virtual ~OSDQScene(); + virtual int init(const OSDQParameters *param, OSDQMetric *metric) = 0; + virtual bool param_valid(const OSDQParameters *param) = 0; + virtual int execute() = 0; + int loop_send_task(); + int set_thread_cnt(const int64_t thread_cnt); + +protected: + bool is_inited_; + int64_t run_time_s_; + OSDQMetric *metric_; + const char *base_uri_; + share::ObBackupStorageInfo storage_info_; + OSDQTaskHandler task_handler_; + OSDQFileSet file_set_; +}; + +/** + * @class OSDQHybridTestScene + * @brief Hybrid scene + */ +class OSDQHybridTestScene : public OSDQScene +{ +public: + OSDQHybridTestScene(); + virtual ~OSDQHybridTestScene() {} + virtual int init(const OSDQParameters *param, OSDQMetric *metric) override; + virtual bool param_valid(const OSDQParameters *param) override; + virtual int execute() override; +}; + +/** + * @class OSDQResourceLimitedScene + * @brief resource limited scene, contain: + * - 0 network packet loss resource limited + * - 1 network bandwidth limited + * - 2 memory limited + * - 3 cpu limited + */ +class OSDQResourceLimitedScene : public OSDQScene +{ + enum ResourceLimitedType + { + NETWORK_PACKET_LOSS_LIMITED_TYPE, + NETWORK_BANDWIDTH_LIMITED_TYPE, + MEMORY_LIMITED_TYPE, + CPU_LIMITED_TYPE, + MAX_RESOURCE_LIMITED_TYPE + }; +public: + OSDQResourceLimitedScene(); + virtual ~OSDQResourceLimitedScene() {} + virtual int init(const OSDQParameters *param, OSDQMetric *metric) override; + virtual bool param_valid(const OSDQParameters *param) override; + virtual int execute() override; + +private: + int test_network_packet_loss_limit_(); + int test_network_bandwidth_limit_(); + int test_memory_limit_(); + int test_cpu_limit_(); + static void inner_disrupt_network_( + const int64_t sleep_time_s, + const char *class_handle, + std::mutex &mtx, + std::condition_variable &cv, + bool &ready); + static void inner_limit_memory_( + const int64_t sleep_time_s, + const int64_t limit_memory_size_mb, + std::mutex &mtx, + std::condition_variable &cv, + bool &ready); + static void inner_limit_cpu_( + const int64_t sleep_time_s, + const double cpu_rate, + std::mutex &mtx, + std::condition_variable &cv, + bool &ready); + +private: + static constexpr int64_t TASK_HANDLER_DEFAULT_THREAD_CNT = 16; + static constexpr double FINAL_CPU_USAGE_LIMIT = 5; + ResourceLimitedType resource_limited_type_; + int64_t limit_run_time_s_; + int64_t limit_memory_mb_; + double limit_cpu_; +}; + +class OSDQErrSimScene : public OSDQScene +{ +public: + OSDQErrSimScene(); + virtual ~OSDQErrSimScene() {} + virtual int init(const OSDQParameters *param, OSDQMetric *metric) override; + virtual bool param_valid(const OSDQParameters *param) override; + virtual int execute() override; + +}; + +} // namespace tools +} // namespace oceanbase + +#endif diff --git a/tools/ob_admin/object_storage_driver_quality/ob_admin_object_storage_driver_quality_scene.cpp b/tools/ob_admin/object_storage_driver_quality/ob_admin_object_storage_driver_quality_scene.cpp new file mode 100644 index 000000000..deb82343b --- /dev/null +++ b/tools/ob_admin/object_storage_driver_quality/ob_admin_object_storage_driver_quality_scene.cpp @@ -0,0 +1,680 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#include "ob_admin_object_storage_driver_quality.h" +#include "deps/oblib/src/lib/utility/ob_tracepoint.h" + +using namespace oceanbase::share; +using namespace oceanbase::common; + +namespace oceanbase +{ +namespace tools +{ +//=========================== OSDQScene ================================ +OSDQScene::OSDQScene() + : is_inited_(false), + run_time_s_(0), + metric_(nullptr), + base_uri_(nullptr), + storage_info_(), + task_handler_(), + file_set_() +{ +} + +OSDQScene::~OSDQScene() +{ +} + +int OSDQScene::loop_send_task() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "scene not init", KR(ret), K(is_inited_)); + } else { + const int64_t start_time_us = ObTimeUtility::current_time(); + int64_t current_time_us = start_time_us; + while (current_time_us - start_time_us < run_time_s_ * 1000000 + && (ret == OB_SUCCESS || ret == OB_EAGAIN)) { + OSDQTask *task = nullptr; + if (OB_FAIL(task_handler_.gen_task(task))) { + OB_LOG(WARN, "failed generate task", KR(ret)); + if (ret == OB_FILE_NOT_EXIST) { + ret = OB_EAGAIN; + } + } else { + do { + ret = task_handler_.push_task(task); + if (ret != OB_SUCCESS && ret != OB_EAGAIN) { + OB_LOG(ERROR, "deliver task failed", KR(ret), K(task->op_type_), K(task->uri_)); + } else { + ::usleep(10000); + } + current_time_us = ObTimeUtility::current_time(); + } while (OB_EAGAIN == ret && current_time_us - start_time_us < run_time_s_ * 1000000); + } + + if (OB_FAIL(ret)) { + free_task(task); + } + current_time_us = ObTimeUtility::current_time(); + } + + if (ret == OB_EAGAIN) { + ret = OB_SUCCESS; + } + } + return ret; +} + +int OSDQScene::set_thread_cnt(const int64_t thread_cnt) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "not init", KR(ret), K(is_inited_)); + } else if (OB_UNLIKELY(thread_cnt <= 0 || thread_cnt > 128)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret)); + } else if (OB_FAIL(task_handler_.set_thread_cnt(thread_cnt))) { + OB_LOG(WARN, "failed set task handler's thread_cnt", KR(ret), K(thread_cnt)); + } + return ret; +} +//=========================== OSDQTask ======================================== +OSDQTask::OSDQTask() + : op_type_(OSDQOpType::MAX_OPERATE_TYPE), + object_id_(-1), + uri_(nullptr), + buf_(nullptr), + buf_len_(-1), + start_time_stamp_us_(0), + parallel_(false), + parallel_op_type_(OSDQOpType::MAX_OPERATE_TYPE) +{} + +OSDQTask::~OSDQTask() +{ + if (uri_ != nullptr) { + free(uri_); + uri_ = nullptr; + } + if (buf_ != nullptr) { + free(buf_); + buf_ = nullptr; + } +} + +bool OSDQTask::is_valid() const +{ + bool valid = true; + if (OB_UNLIKELY(op_type_ == MAX_OPERATE_TYPE) || OB_ISNULL(uri_)) { + valid = false; + } else if (op_type_ == WRITE_SINGLE_FILE || op_type_ == APPEND_WRITE || op_type_ == MULTIPART_WRITE) { + if (OB_UNLIKELY(object_id_ <= 0) + || OB_UNLIKELY(buf_len_ > 0 && buf_ == nullptr) + || OB_UNLIKELY(buf_len_ < 0)) { + valid = false; + } else if (op_type_ == APPEND_WRITE && OB_UNLIKELY(buf_len_ < 4)) { + valid = false; + } + } else if (op_type_ == READ_SINGLE_FILE) { + if (OB_UNLIKELY(object_id_ <= 0)) { + valid = false; + } + } else if (op_type_ == DEL_FILE) { + if (OB_UNLIKELY(object_id_ <= 0)) { + valid = false; + } + } + return valid; +} + +void free_task(OSDQTask *&task) +{ + if (OB_NOT_NULL(task)) { + task->~OSDQTask(); + free(task); + } + task = nullptr; +} + +//=========================== OSDQHybridTestScene ================================ + +OSDQHybridTestScene::OSDQHybridTestScene() + : OSDQScene() +{ +} + +int OSDQHybridTestScene::init(const OSDQParameters *param, OSDQMetric *metric) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + OB_LOG(WARN, "hybrid test scene init twice", KR(ret), K(is_inited_)); + } else if (OB_UNLIKELY(!param_valid(param))) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), KPC(param), K(storage_info_)); + } else if (OB_FAIL(task_handler_.init(param->base_path_, metric, &file_set_, &storage_info_))) { + OB_LOG(WARN, "failed init task handler", KR(ret), KPC(param), K(storage_info_)); + } else { + task_handler_.set_operator_weight(WRITE_SINGLE_FILE, 1); + task_handler_.set_operator_weight(MULTIPART_WRITE, 1); + task_handler_.set_operator_weight(APPEND_WRITE, 1); + task_handler_.set_operator_weight(READ_SINGLE_FILE, 3); + task_handler_.set_operator_weight(DEL_FILE, 1); + run_time_s_ = param->run_time_s_; + metric_ = metric; + base_uri_ = param->base_path_; + is_inited_ = true; + } + return ret; +} + +bool OSDQHybridTestScene::param_valid(const OSDQParameters *param) { + bool is_valid = true; + int ret = OB_SUCCESS; + if (OB_ISNULL(param)) { + is_valid = false; + } else if (OB_FAIL(storage_info_.set(param->base_path_, param->storage_info_str_))) { + is_valid = false; + OB_LOG(WARN, "failed to set storage info", KR(ret), KPC(param)); + } else if (OB_UNLIKELY(!storage_info_.is_valid() || param->run_time_s_ <= 0)) { + is_valid = false; + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(storage_info_), KPC(param)); + } + return is_valid; +} + +int OSDQHybridTestScene::execute() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "not init", KR(ret)); + } else if (OB_FAIL(task_handler_.start())) { + OB_LOG(WARN, "failed start task handler", KR(ret)); + } else if (OB_FAIL(loop_send_task())) { + OB_LOG(WARN, "failed exec loop_send_task", KR(ret)); + } + task_handler_.destroy(); + + return ret; +} + +//==================== OSDQResourceLimitedScene ==================== +OSDQResourceLimitedScene::OSDQResourceLimitedScene() + : OSDQScene(), + resource_limited_type_(MAX_RESOURCE_LIMITED_TYPE), + limit_run_time_s_(0), + limit_memory_mb_(0), + limit_cpu_(0) +{} + +int OSDQResourceLimitedScene::init(const OSDQParameters *param, OSDQMetric *metric) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + OB_LOG(WARN, "resource limited scene init twice", KR(ret), K(is_inited_)); + } else if (OB_UNLIKELY(!param_valid(param))) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), KPC(param), K(metric)); + } else if (OB_FAIL(task_handler_.init(param->base_path_, metric, &file_set_, &storage_info_))) { + OB_LOG(WARN, "failed init task handler", KR(ret)); + } else if (OB_FAIL(task_handler_.set_thread_cnt(TASK_HANDLER_DEFAULT_THREAD_CNT))) { + OB_LOG(WARN, "failed set task handler thread cnt", KR(ret)); + } else { + task_handler_.set_operator_weight(OSDQOpType::DEL_FILE, 0); + task_handler_.set_prob_of_parallel(0); + task_handler_.set_prob_of_writing_old_data(0); + base_uri_ = param->base_path_; + run_time_s_ = param->run_time_s_; + metric_ = metric; + resource_limited_type_ = static_cast(param->resource_limited_type_); + limit_run_time_s_ = param->limit_run_time_s_; + limit_memory_mb_ = param->limit_memory_mb_; + limit_cpu_ = param->limit_cpu_; + is_inited_ = true; + } + return ret; +} + +bool OSDQResourceLimitedScene::param_valid(const OSDQParameters *param) { + bool is_valid = true; + int ret = OB_SUCCESS; + if (OB_ISNULL(param)) { + is_valid = false; + } else if (OB_FAIL(storage_info_.set(param->base_path_, param->storage_info_str_))) { + is_valid = false; + OB_LOG(WARN, "failed to set storage info", KR(ret), KPC(param)); + } else if (OB_UNLIKELY(!storage_info_.is_valid() || param->run_time_s_ <= 0 + || param->resource_limited_type_ < 0 + || param->resource_limited_type_ >= MAX_RESOURCE_LIMITED_TYPE + || param->limit_run_time_s_ <= 0)) { + is_valid = false; + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(storage_info_), KPC(param)); + } else if (param->resource_limited_type_ == MEMORY_LIMITED_TYPE) { + // setting this to a smaller value has been tested to cause the program to break + if (OB_UNLIKELY(param->limit_memory_mb_ < 64)) { + is_valid = false; + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(param->limit_memory_mb_)); + } + } else if (param->resource_limited_type_ == CPU_LIMITED_TYPE) { + if (OB_UNLIKELY(param->limit_cpu_ > 1 || param->limit_cpu_ < 0 + || param->limit_run_time_s_ < param->run_time_s_ + 5)) { + is_valid = false; + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(param->limit_cpu_)); + OSDQLogEntry::print_log("INVALID ARGUMENT", "the resource limit run time should be at least 5s larger than run time when limited type is cpu limit", RED_COLOR_PREFIX); + } + } + return is_valid; +} + +int OSDQResourceLimitedScene::execute() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "not init", KR(ret)); + } else if (OB_FAIL(task_handler_.start())) { + OB_LOG(ERROR, "failed start task hanlder", KR(ret)); + } else { + switch (resource_limited_type_) { + case NETWORK_PACKET_LOSS_LIMITED_TYPE: + ret = test_network_packet_loss_limit_(); + break; + case NETWORK_BANDWIDTH_LIMITED_TYPE: + ret = test_network_bandwidth_limit_(); + break; + case MEMORY_LIMITED_TYPE: + ret = test_memory_limit_(); + break; + case CPU_LIMITED_TYPE: + ret = test_cpu_limit_(); + break; + case MAX_RESOURCE_LIMITED_TYPE: + ret = OB_ERR_UNEXPECTED; + break; + default: + ret = OB_ERR_UNEXPECTED; + break; + } + + if (OB_FAIL(ret)) { + OB_LOG(WARN, "failed exec resource limited test", KR(ret), K(resource_limited_type_)); + } + task_handler_.destroy(); + } + return ret; +} + +void OSDQResourceLimitedScene::inner_disrupt_network_( + const int64_t sleep_time_s, + const char *class_handle, + std::mutex &mtx, + std::condition_variable &cv, + bool &ready) +{ + int ret = OB_SUCCESS; + /* + * 1: root qdisc + * / \ + * 1:1 1:2 child class + * | | + * 10: 20: qdisc + */ + system("tc qdisc add dev eth0 root handle 1: htb default 30"); + system("tc class add dev eth0 parent 1: classid 1:1 htb rate 10000mbit"); + system("tc class add dev eth0 parent 1: classid 1:2 htb rate 10mbit"); + system("tc filter add dev eth0 parent 1: protocol all prio 1 handle 1: cgroup"); + system("tc qdisc add dev eth0 parent 1:1 handle 10: netem loss 100%"); + system("tc qdisc add dev eth0 parent 1:2 handle 20: sfq perturb 10"); + system("cgcreate -g net_cls:/ob_admin_osdq_cgroup"); + std::string bind_str = std::string("echo ") + std::string(class_handle) + + " | tee /sys/fs/cgroup/net_cls/ob_admin_osdq_cgroup/net_cls.classid > /dev/null"; + system(bind_str.c_str()); + const int pid = getpid(); + std::string cmd = std::string("cgclassify -g net_cls:ob_admin_osdq_cgroup ") + std::to_string(pid); + + assert(system(cmd.c_str()) == 0); + { + std::unique_lock lock(mtx); + ready = true; + } + cv.notify_all(); + OSDQLogEntry log; + if (OB_FAIL(log.init("DISRUPT NETWORK"))) { + OB_LOG(WARN, "failed init log", KR(ret)); + } + OSDQLogEntry::print_log("DISRUPT_NETWORK", "THE NETWORK RESTRICTED ENVIRONMENT IS READY"); + OSDQLogEntry::print_log("DISRUPT_NETWORK", + std::string("bind qdisc ") + std::string(class_handle) + std::string(" success")); + + std::this_thread::sleep_for(std::chrono::seconds(sleep_time_s)); + + system("echo | tee /sys/fs/cgroup/net_cls/ob_admin_osdq_cgroup/net_cls.classid"); + OSDQLogEntry::print_log("DISRUPT_NETWORK", "NETWORK DISRUPT IS FINISHED"); + system("tc qdisc del dev eth0 root"); +} + +int OSDQResourceLimitedScene::test_network_packet_loss_limit_() +{ + int ret = OB_SUCCESS; + std::mutex mtx; + std::condition_variable cv; + bool ready = false; + OSDQLogEntry::print_log("NETWORK TEST1", "Packet loss 100% Test started"); + std::thread t(inner_disrupt_network_, limit_run_time_s_, "0x00010001", + std::ref(mtx), std::ref(cv), std::ref(ready)); + std::unique_lock lock(mtx); + while (!ready) { + cv.wait(lock); + } + ret = loop_send_task(); + t.join(); + task_handler_.stop(); + + if (OB_SUCC(ret)) { + for (auto req_ret : task_handler_.req_results_) { + if (req_ret != OB_SUCCESS + && req_ret != OB_OBJECT_STORAGE_IO_ERROR + && req_ret != OB_IO_TIMEOUT + && req_ret != OB_TIMEOUT) { + ret = req_ret; + break; + } + } + } + + return ret; +} + +int OSDQResourceLimitedScene::test_network_bandwidth_limit_() +{ + int ret = OB_SUCCESS; + std::mutex mtx; + std::condition_variable cv; + bool ready = false; + OSDQLogEntry::print_log("NETWORK TEST2", "BandWidth Limit Test Started"); + std::thread t(inner_disrupt_network_, limit_run_time_s_, "0x00010002", + std::ref(mtx), std::ref(cv), std::ref(ready)); + std::unique_lock lock(mtx); + while (!ready) { + cv.wait(lock); + } + + ret = loop_send_task(); + t.join(); + task_handler_.stop(); + if (OB_SUCC(ret)) { + for (auto req_ret : task_handler_.req_results_) { + if (req_ret != OB_SUCCESS && req_ret != OB_IO_TIMEOUT) { + ret = req_ret; + break; + } + } + } + + return ret; +} + +void OSDQResourceLimitedScene::inner_limit_memory_( + const int64_t sleep_time_s, + const int64_t limit_memory_size_mb, + std::mutex &mtx, + std::condition_variable &cv, + bool &ready) +{ + int ret = OB_SUCCESS; + ObRefHolder tenant_holder; + const int64_t limit_memory_size = limit_memory_size_mb * 1024 * 1024; + if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(OB_SERVER_TENANT_ID, tenant_holder))) { + OB_LOG(WARN, "failed get tenant io manager", KR(ret)); + } else if (OB_FAIL(tenant_holder.get_ptr()->update_memory_pool(limit_memory_size))) { + OB_LOG(WARN, "faield to update memory limit", KR(ret)); + OSDQLogEntry::print_log("LIMIT MEMORY", "failed to update memory limit", RED_COLOR_PREFIX); + } else if (OB_FAIL(ObMallocAllocator::get_instance()->set_tenant_limit(OB_SERVER_TENANT_ID, limit_memory_size))) { + OB_LOG(ERROR, "failed set tenant memory limit", KR(ret), K(limit_memory_size)); + } else { + lib::set_memory_limit(limit_memory_size); + // When the tenant is out of memory, it will raise(49), and ob_admin doesn't do the + // initialisation that observer does, so it will just quit, so in order to make sure + // that ob_admin runs correctly, need to ignore 49. + signal(49, SIG_IGN); + { + std::unique_lock lock(mtx); + ready = true; + } + + cv.notify_all(); + OSDQLogEntry log; + if (OB_FAIL(log.init("LIMIT MEMORY"))) { + OB_LOG(ERROR, "failed init log", KR(ret)); + } else { + log.log_entry_kv("RESTRICTED TIME", std::to_string(sleep_time_s) + "s"); + log.log_entry_kv("TOTAL MEMORY LIMIT", std::to_string(limit_memory_size)); + log.log_entry_kv("TOTAL TENANT MEMORY LIMIT", std::to_string(limit_memory_size)); + log.log_entry_kv("TOTAL TENANT IO MANAGER MEMORY LIMIT", std::to_string(limit_memory_size)); + log.log_entry("THE MEMORY RESTRICTED ENVIRONMENT IS READY"); + log.print(); + } + ret = OB_SUCCESS; + std::this_thread::sleep_for(std::chrono::seconds(sleep_time_s)); + lib::set_memory_limit(MEMORY_LIMITED_SIZE); + lib::set_tenant_memory_limit(500, MEMORY_LIMITED_SIZE); + if (OB_NOT_NULL(tenant_holder.get_ptr())) { + if (FAILEDx(tenant_holder.get_ptr()->update_memory_pool(MEMORY_LIMITED_SIZE))) { + OB_LOG(WARN, "faield to restore memory", KR(ret)); + OSDQLogEntry::print_log("LIMIT MEMORY", "failed to restore memory", RED_COLOR_PREFIX); + } + } + } + + if (OB_SUCC(ret)) { + OSDQLogEntry::print_log("LIMIT MEMORY", "THE MEMORY RESTRICETED ENVIRONMENT IS REMOVED", GREEN_COLOR_PREFIX); + } +} + +int OSDQResourceLimitedScene::test_memory_limit_() +{ + int ret = OB_SUCCESS; + OSDQLogEntry::print_log("MEMORY LIMITD TEST", "MEMORY LIMIT TEST START"); + std::mutex mtx; + std::condition_variable cv; + bool ready = false; + std::thread t(inner_limit_memory_, limit_run_time_s_, limit_memory_mb_, + std::ref(mtx), std::ref(cv), std::ref(ready)); + std::unique_lock lock(mtx); + while (!ready) { + cv.wait(lock); + } + ret = loop_send_task(); + t.join(); + task_handler_.stop(); + + if (OB_SUCC(ret)) { + for (auto req_ret : task_handler_.req_results_) { + if (req_ret != OB_SUCCESS + && req_ret != OB_HASH_NOT_EXIST + && req_ret != OB_ALLOCATE_MEMORY_FAILED) { + ret = req_ret; + break; + } + } + } + + return ret; +} + +void OSDQResourceLimitedScene::inner_limit_cpu_( + const int64_t sleep_time_s, + const double cpu_rate, + std::mutex &mtx, + std::condition_variable &cv, + bool &ready) +{ + int ret = OB_SUCCESS; + system("cgcreate -g cpu:/ob_admin_osdq_cgroup"); + const int64_t quota = static_cast(100000 * cpu_rate); + std::string cmd = std::string("echo ") + std::to_string(quota) + std::string(" > /sys/fs/cgroup/cpu/ob_admin_osdq_cgroup/cpu.cfs_quota_us"); + system(cmd.c_str()); + + system("cat /sys/fs/cgroup/cpu/cpuset.mems > /sys/fs/cgroup/cpu/ob_admin_osdq_cgroup/cpuset.mems"); + system("cat /sys/fs/cgroup/cpu/cpuset.cpus > /sys/fs/cgroup/cpu/ob_admin_osdq_cgroup/cpuset.cpus"); + + const int pid = getpid(); + cmd = std::string("cgclassify -g cpu:ob_admin_osdq_cgroup ") + std::to_string(pid); + assert(system(cmd.c_str()) == 0); + OSDQLogEntry log; + if (OB_FAIL(log.init("LIMIT CPU"))) { + OB_LOG(ERROR, "failed init log", KR(ret)); + } else { + log.log_entry_kv("RESTRICTED TIME", std::to_string(sleep_time_s) + "s"); + log.log_entry_kv("CPU RATE", std::to_string(cpu_rate) + "%"); + log.log_entry("THE MEMORY RESTRICTED ENVIRONMENT IS READY"); + log.log_entry("LIMIT CPU SUCCESS"); + log.print(); + } + { + std::unique_lock lock(mtx); + ready = true; + } + cv.notify_all(); + std::this_thread::sleep_for(std::chrono::seconds(sleep_time_s)); + OSDQLogEntry::print_log("LIMIT CPU", "LIMIT CPU FINISH"); + system("echo 100000 > /sys/fs/cgroup/cpu/ob_admin_osdq_cgroup/cpu.cfs_quota_us"); +} + +int OSDQResourceLimitedScene::test_cpu_limit_() +{ + int ret = OB_SUCCESS; + OSDQLogEntry::print_log("CPU LIMIT TEST", "CPU LIMIT TEST START"); + std::mutex mtx; + std::condition_variable cv; + bool ready = false; + std::thread t(inner_limit_cpu_, limit_run_time_s_, limit_cpu_, + std::ref(mtx), std::ref(cv), + std::ref(ready)); + std::unique_lock lock(mtx); + while (!ready) { + cv.wait(lock); + } + ret = loop_send_task(); + t.join(); + task_handler_.stop(); + + if (OB_SUCC(ret)) { + for (auto req_ret : task_handler_.req_results_) { + if (req_ret != OB_SUCCESS) { + ret = req_ret; + break; + } + } + } + + if (metric_->get_real_cpu_usage() >= FINAL_CPU_USAGE_LIMIT) { + ret = OB_ERR_UNEXPECTED; + } + + return ret; +} + +//===================== OSDQErrSimScene ==================== + +OSDQErrSimScene::OSDQErrSimScene() +{} + +int OSDQErrSimScene::init(const OSDQParameters *param, OSDQMetric *metric) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + OB_LOG(WARN, "ErrSim scene init twice", KR(ret), K(is_inited_)); + } else if (OB_UNLIKELY(!param_valid(param))) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), KPC(param)); + } else if (OB_FAIL(task_handler_.init(param->base_path_, metric, &file_set_, &storage_info_))) { + OB_LOG(WARN, "failed init task handler", KR(ret)); + } else { + task_handler_.set_operator_weight(WRITE_SINGLE_FILE, 1); + task_handler_.set_operator_weight(MULTIPART_WRITE, 1); + task_handler_.set_operator_weight(APPEND_WRITE, 1); + task_handler_.set_operator_weight(READ_SINGLE_FILE, 3); + task_handler_.set_operator_weight(DEL_FILE, 1); + task_handler_.set_prob_of_parallel(0); + run_time_s_ = param->run_time_s_; + metric_ = metric; + base_uri_ = param->base_path_; + is_inited_ = true; + } + return ret; +} + +bool OSDQErrSimScene::param_valid(const OSDQParameters *param) +{ + bool is_valid = true; + int ret = OB_SUCCESS; + if (OB_ISNULL(param)) { + is_valid = false; + } else if (OB_FAIL(storage_info_.set(param->base_path_, param->storage_info_str_))) { + is_valid = false; + OB_LOG(WARN, "failed to set storage info", KR(ret), KPC(param)); + } else if (OB_UNLIKELY(!storage_info_.is_valid() || param->run_time_s_ <= 0)) { + is_valid = false; + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(storage_info_), KPC(param)); + } + return is_valid; +} + +int OSDQErrSimScene::execute() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "not init", KR(ret)); + } else if (OB_FAIL(task_handler_.start())) { + OB_LOG(WARN, "failed start task handler", KR(ret)); + } else { + EventItem item; + item.trigger_freq_ = 1; + item.error_code_ = OB_OBJECT_STORAGE_CHECKSUM_ERROR; + /* if (OB_FAIL(EventTable::instance().set_event(EventTable::EN_OBJECT_STORAGE_CHECKSUM_ERROR.item_.no_, item))) { */ + if (OB_FAIL(EventTable::instance().set_event("EN_OBJECT_STORAGE_CHECKSUM_ERROR", item))) { + OB_LOG(ERROR, "failed set event", KR(ret)); + } else { + ret = loop_send_task(); + + task_handler_.stop(); + if (OB_SUCC(ret)) { + for (auto req_ret : task_handler_.req_results_) { + if (req_ret != OB_OBJECT_STORAGE_CHECKSUM_ERROR) { + ret = req_ret; + break; + } + } + } + } + } + + task_handler_.destroy(); + return ret; +} + +} // namespace tools +} // namespace oceanbase diff --git a/tools/ob_admin/object_storage_driver_quality/ob_admin_object_storage_driver_quality_task_handler.cpp b/tools/ob_admin/object_storage_driver_quality/ob_admin_object_storage_driver_quality_task_handler.cpp new file mode 100644 index 000000000..29010ea54 --- /dev/null +++ b/tools/ob_admin/object_storage_driver_quality/ob_admin_object_storage_driver_quality_task_handler.cpp @@ -0,0 +1,747 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "ob_admin_object_storage_driver_quality.h" + +using namespace oceanbase::share; +using namespace oceanbase::common; + +namespace oceanbase +{ +namespace tools +{ +//=========================== OSDQTaskHandler ================================== +OSDQTaskHandler::OSDQTaskHandler() + : prob_of_writing_old_data_(DEFAULT_PROB_OF_WRITING_OLD_DATA), + prob_of_parallel_(DEFAULT_PROB_OF_PARALLEL), + is_inited_(false), + is_stopped_(true), + tg_id_(-1), + thread_cnt_(DEFAULT_THREAD_CNT), + base_uri_(nullptr), + metric_(nullptr), + file_set_(nullptr), + storage_info_(nullptr), + adapter_() +{} + +OSDQTaskHandler::~OSDQTaskHandler() +{} + +int OSDQTaskHandler::init( + const char *base_uri, + OSDQMetric *metric, + OSDQFileSet *file_set, + const share::ObBackupStorageInfo *storage_info) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + OB_LOG(WARN, "task handler init twice", KR(ret), K(is_inited_)); + } else if (OB_ISNULL(base_uri) || OB_ISNULL(metric) || OB_ISNULL(storage_info) + || OB_ISNULL(file_set) || OB_UNLIKELY(!storage_info->is_valid())) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(metric), KPC(storage_info), K(storage_info->is_valid()), K(file_set)); + } else if (OB_FAIL(TG_CREATE(lib::TGDefIDs::COMMON_QUEUE_THREAD, tg_id_))) { + OB_LOG(WARN, "create task handler thread failed", KR(ret), K(tg_id_)); + } else if (OB_FAIL(TG_SET_HANDLER(tg_id_, *this))) { + OB_LOG(WARN, "failed set hanlder", KR(ret), K(tg_id_)); + } else if (OB_FAIL(TG_SET_THREAD_CNT(tg_id_, thread_cnt_))) { + OB_LOG(WARN, "failed set thread cnt", KR(ret), K(tg_id_), K(thread_cnt_)); + } else { + TG_MGR.tgs_[tg_id_]->set_queue_size(DEFAULT_QUEUE_SIZE); + base_uri_ = base_uri; + metric_ = metric; + file_set_ = file_set; + storage_info_ = storage_info; + for (int i = 0; i < MAX_OPERATE_TYPE; i++) { + op_type_weights_[i] = 1; + } + is_inited_ = true; + } + if (OB_UNLIKELY(!is_inited_)) { + destroy(); + } + return ret; +} + +int OSDQTaskHandler::start() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_)); + } else if (OB_FAIL(TG_START(tg_id_))) { + OB_LOG(WARN, "start thread pool faield", KR(ret)); + } else { + is_stopped_ = false; + op_type_random_boundaries_[0] = op_type_weights_[0]; + for (int i = 1; i < MAX_OPERATE_TYPE; i++) { + op_type_random_boundaries_[i] = op_type_random_boundaries_[i - 1] + op_type_weights_[i]; + } + OB_LOG(INFO, "start OSDQTaskHandler success"); + OSDQLogEntry::print_log("TASK HANDLER", "task handler has started"); + } + return ret; +} + +void OSDQTaskHandler::stop() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "not init", KR(ret)); + } else { + is_stopped_ = true; + TG_STOP(tg_id_); + OB_LOG(INFO, "stop OSDQTaskHandler"); + } +} + +int OSDQTaskHandler::wait() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "not init", KR(ret)); + } else { + TG_WAIT(tg_id_); + OB_LOG(INFO, "wait OSDQTaskHandler success", KR(ret), K(tg_id_)); + } + return ret; +} + +int OSDQTaskHandler::set_thread_cnt(const int64_t thread_cnt) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "not init", KR(ret)); + } else if (OB_FAIL(TG_SET_THREAD_CNT(tg_id_, thread_cnt))) { + OB_LOG(WARN, "failed set thread cnt", KR(ret), K(tg_id_), K(thread_cnt)); + } else { + thread_cnt_ = thread_cnt; + } + return ret; +} + +void OSDQTaskHandler::destroy() +{ + if (OB_LIKELY(tg_id_ >= 0)) { + stop(); + wait(); + TG_DESTROY(tg_id_); + tg_id_ = -1; + } + is_inited_ = false; + is_stopped_ = true; + metric_ = nullptr; + file_set_ = nullptr; + storage_info_ = nullptr; + OSDQLogEntry::print_log("TASK HANDLER", "task handler has destroy"); +} + +void OSDQTaskHandler::handle(void *task) +{ + int ret = OB_SUCCESS; + OSDQTask *task_p = nullptr; + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_)); + } else if (OB_ISNULL(task)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(task)); + } else if (FALSE_IT(task_p = reinterpret_cast(task))) { + } else if (OB_UNLIKELY(!task_p->is_valid())) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), KPC(task_p)); + } else { + task_p->start_time_stamp_us_ = ObTimeUtility::current_time(); + if (task_p->op_type_ == WRITE_SINGLE_FILE + || task_p->op_type_ == MULTIPART_WRITE + || task_p->op_type_ == APPEND_WRITE) { + if (OB_FAIL(handle_write_task(task_p))) { + OB_LOG(WARN, "failed handle write task", KR(ret), KPC(task_p)); + } + } else if (task_p->op_type_ == READ_SINGLE_FILE) { + if (OB_FAIL(handle_read_single_task(task_p))) { + OB_LOG(WARN, "failed handle read single task", KR(ret), KPC(task_p)); + } + } else if (task_p->op_type_ == DEL_FILE) { + if (OB_FAIL(handle_del_task(task_p))) { + OB_LOG(WARN, "failed handle del task", KR(ret), KPC(task_p)); + } + } else { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "invalid task op type", KR(ret), K(task_p->op_type_)); + } + + if (OB_FAIL(ret)) { + if (OB_NOT_NULL(task_p)) { + const double cost_time = (ObTimeUtility::current_time() - task_p->start_time_stamp_us_) / 1000000; + std::ostringstream oss; + oss << " error code: " << ret << " " << common::ob_error_name(ret) \ + << " cost_time:" << std::setprecision(3) << cost_time << "s"; + OSDQLogEntry::print_log("TASK FAILED", oss.str(), RED_COLOR_PREFIX); + } + } + } + + if (OB_NOT_NULL(task_p)) { + free_task(task_p); + task = nullptr; + } + push_req_result_(ret); +} + +int OSDQTaskHandler::handle_write_single_task_helper_(const OSDQTask *task) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(adapter_.write_single_file(task->uri_, storage_info_, + task->buf_, task->buf_len_, + ObStorageIdMod::get_default_id_mod()))) { + OB_LOG(WARN, "failed write single file", KR(ret), KPC(task), KPC(storage_info_)); + } + return ret; +} + +int OSDQTaskHandler::handle_multipart_write_task_helper_(const OSDQTask *task) +{ + int ret = OB_SUCCESS; + int64_t write_size = 0; + ObIODevice *device_handle = nullptr; + ObIOFd fd; + ObIOHandle io_handle; + const ObStorageAccessType access_type = OB_STORAGE_ACCESS_DIRECT_MULTIPART_WRITER; + + if (OB_FAIL(adapter_.open_with_access_type(device_handle, fd, storage_info_, task->uri_, + access_type, ObStorageIdMod::get_default_id_mod()))) { + OB_LOG(WARN, "failed to open device with access type", KR(ret), KPC(task)); + } else if (OB_FAIL(adapter_.async_upload_data(*device_handle, fd, task->buf_, 0/*offset*/, + task->buf_len_, io_handle))) { + OB_LOG(WARN, "failed to start async upload task!", KR(ret), KPC(task)); + } else if (OB_FAIL(io_handle.wait())) { + OB_LOG(WARN, "failed to wait async upload data finish", KR(ret), KPC(task)); + } else if (OB_FAIL(adapter_.complete(*device_handle, fd))) { + OB_LOG(WARN, "failed to complete", KR(ret), KPC(task)); + } + + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(adapter_.close_device_and_fd(device_handle, fd))) { + ret = COVER_SUCC(tmp_ret); + OB_LOG(WARN, "failed to close file and release device", KR(ret), KPC(task)); + } + return ret; +} + +int OSDQTaskHandler::handle_append_write_task_helper_(const OSDQTask *task) +{ + int ret = OB_SUCCESS; + const int64_t buf_len = task->buf_len_; + const int64_t write_cnt = 4; + const int64_t write_size_once = buf_len / write_cnt; + const ObStorageAccessType access_type = OB_STORAGE_ACCESS_APPENDER; + int64_t write_size = 0; + for (int i = 0; i < write_cnt && OB_SUCC(ret); i++) { + const int64_t offset = write_size_once * i; + const bool is_can_seal = (i + 1 == write_cnt); + const int64_t expected_write_size = is_can_seal ? (buf_len - offset) : write_size_once; + if (FAILEDx(adapter_.pwrite(task->uri_, storage_info_, task->buf_ + offset, + offset, expected_write_size, access_type, + write_size, is_can_seal, ObStorageIdMod::get_default_id_mod()))) { + OB_LOG(WARN, "failed exec pwrite", KR(ret), K(i), K(offset), KPC(task)); + } else if (OB_UNLIKELY(write_size != expected_write_size)) { + OB_LOG(WARN, "pwrite operation write size not as expected", KR(ret), KPC(task), K(write_size), K(expected_write_size)); + } + } + + return ret; +} + +bool OSDQTaskHandler::check_parallel_write_result_( + const OSDQOpType op_type1, + const int ret1, + const OSDQOpType op_type2, + const int ret2) +{ + bool bool_ret = true; + int ret = OB_SUCCESS; + if ((op_type1 == WRITE_SINGLE_FILE || op_type1 == MULTIPART_WRITE) + && (op_type2 == WRITE_SINGLE_FILE || op_type2 == MULTIPART_WRITE)) { + // in this case, no operation is append write + if (OB_UNLIKELY(ret1 != OB_SUCCESS || ret2 != OB_SUCCESS)) { + bool_ret = false; + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "parallel write should success when two type is write_single_file or multipart write", KR(ret), + K(op_type1), K(ret1), K(op_type2), K(ret2)); + } + } else { + // in this case, at least one operation is append write + if (storage_info_->get_type() == ObStorageType::OB_STORAGE_S3) { + if (OB_UNLIKELY(ret1 != OB_SUCCESS || ret2 != OB_SUCCESS)) { + bool_ret = false; + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "parallel append write should succeed when storage type is s3", KR(ret), + K(op_type1), K(ret1), K(op_type2), K(ret2)); + } + } else if (storage_info_->get_type() == ObStorageType::OB_STORAGE_COS + || storage_info_->get_type() == ObStorageType::OB_STORAGE_OSS) { + // if the storage type is cos or oss, the append write may fail if the append operation is performed + // after the single or multi-part write operation. + // but if two parallel operation are both append write operation, they should succeed. + if (op_type1 == APPEND_WRITE && op_type2 == APPEND_WRITE) { + if (OB_UNLIKELY(ret1 != OB_SUCCESS || ret2 != OB_SUCCESS)) { + bool_ret = false; + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "the parallel append write operation should succeed", KR(ret), + K(op_type1), K(ret1), K(op_type2), K(ret2)); + } + } else { + if (OB_UNLIKELY((op_type1 != APPEND_WRITE && ret1 != OB_SUCCESS) || (op_type2 != APPEND_WRITE && ret2 != OB_SUCCESS))) { + bool_ret = false; + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "the write single operation or multi-part write should success", + KR(ret), KPC(storage_info_), K(op_type1), K(ret1), K(op_type2), K(ret2)); + } + } + } else { + bool_ret = false; + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "error storage type", KR(ret), KPC(storage_info_)); + } + } + return bool_ret; +} + +void OSDQTaskHandler::push_req_result_(const int ret) +{ + lib::ObMutexGuard guard(mutex_); + req_results_.push_back(ret); +} + +void OSDQTaskHandler::handle_drop(void *task) +{ + int ret = OB_SUCCESS; + OSDQTask *task_p = nullptr; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "not init", KR(ret), K(is_inited_)); + } else if (OB_ISNULL(task)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(task)); + } else if (FALSE_IT(task_p = reinterpret_cast(task))) { + } else if (OB_FAIL(metric_->sub_queued_entry())) { + OB_LOG(WARN, "failed sub queued entry in metric", KR(ret)); + } else { + free_task(task_p); + task = nullptr; + } +} + +int OSDQTaskHandler::push_task(OSDQTask *task) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_)); + } else if (OB_UNLIKELY(is_stopped_)) { + OB_LOG(WARN, "task handler is stopped", KR(ret), K(is_stopped_)); + } else if (OB_ISNULL(task) || OB_UNLIKELY(!task->is_valid())) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), KPC(task)); + } else if (OB_FAIL(TG_PUSH_TASK(tg_id_, task))) { + OB_LOG(WARN, "failed push task in task handler", KR(ret), KPC(task)); + } else if (OB_FAIL(metric_->add_queued_entry())){ + OB_LOG(WARN, "failed add queued entry in metric", KR(ret)); + } + return ret; +} + +int OSDQTaskHandler::set_operator_weight(const OSDQOpType op_type, const int64_t op_type_weight) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_stopped_)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "set op type weight is disable when task handler is running", KR(ret), K(is_stopped_)); + } else if (OB_UNLIKELY(op_type == MAX_OPERATE_TYPE || op_type_weight < 0)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(op_type), K(op_type_weight)); + } else { + op_type_weights_[op_type] = op_type_weight; + } + return ret; +} + +int OSDQTaskHandler::set_prob_of_writing_old_data(const int64_t prob) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(prob < 0 || prob >= 100)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(prob)); + } else { + prob_of_writing_old_data_ = prob; + } + return ret; +} + +int OSDQTaskHandler::set_prob_of_parallel(const int64_t prob) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(prob < 0 || prob >= 100)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(prob)); + } else { + prob_of_parallel_ = prob; + } + return ret; +} + +int OSDQTaskHandler::gen_task(OSDQTask *&task) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_)); + } else if (OB_ISNULL(task = static_cast(malloc(sizeof(OSDQTask))))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "failed allocate memory for OSDQTask", KR(ret)); + } else if (FALSE_IT(task = new(task)OSDQTask())) { + } else { + int64_t weight_sum = op_type_random_boundaries_[MAX_OPERATE_TYPE - 1]; + if (file_set_->size() == 0) { + weight_sum = op_type_random_boundaries_[APPEND_WRITE]; + } + const int64_t rnd = ObRandom::rand(1, weight_sum); + if (rnd <= op_type_random_boundaries_[WRITE_SINGLE_FILE]) { + ret = gen_write_task(task, WRITE_SINGLE_FILE); + } else if (rnd <= op_type_random_boundaries_[MULTIPART_WRITE]) { + ret = gen_write_task(task, MULTIPART_WRITE); + } else if (rnd <= op_type_random_boundaries_[APPEND_WRITE]) { + ret = gen_write_task(task, APPEND_WRITE); + } else if (rnd <= op_type_random_boundaries_[READ_SINGLE_FILE]) { + ret = gen_read_single_task(task); + } else if (rnd <= op_type_random_boundaries_[DEL_FILE]) { + ret = gen_del_task(task); + } else { + ret = OB_ERR_UNEXPECTED; + OB_LOG(ERROR, "wrong rnd", KR(ret)); + } + if (OB_FAIL(ret)) { + OB_LOG(WARN, "failed gen task", KR(ret), KPC(task)); + } + } + + if (OB_FAIL(ret)) { + push_req_result_(ret); + free_task(task); + } + return ret; +} + +int OSDQTaskHandler::gen_write_task(OSDQTask *task, const OSDQOpType op_type) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "not init", KR(ret), K(is_inited_)); + } else if (OB_ISNULL(task) || OB_UNLIKELY(op_type != WRITE_SINGLE_FILE + && op_type != MULTIPART_WRITE && op_type != APPEND_WRITE)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(task), K(op_type)); + } else { + // random choosing whether to write a new file or an old one + // 20% chance of writing old files + bool is_write_new = true; + int rnd = ObRandom::rand(0, 100); + if (rnd <= prob_of_writing_old_data_ && file_set_->size() != 0) { + is_write_new = false; + } + int64_t object_id = 0; + // when op_type is append_write, only new files can be written. + if (is_write_new || op_type == APPEND_WRITE) { + object_id = OSDQIDGenerator::get_instance().get_next_id(); + char object_name[MAX_OBJECT_NAME_LENGTH] = { 0 }; + if (FAILEDx(construct_object_name(object_id, object_name, MAX_OBJECT_NAME_LENGTH))) { + OB_LOG(WARN, "failed construct object name", KR(ret), K(object_id)); + } else if (OB_ISNULL(task->uri_ = static_cast(malloc(OB_MAX_URI_LENGTH)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "failed allocate memory for file path", KR(ret), K(OB_MAX_URI_LENGTH)); + } else if (OB_FAIL(construct_file_path(base_uri_, object_name, task->uri_, OB_MAX_URI_LENGTH))) { + OB_LOG(WARN, "failed construct file path", KR(ret), K(base_uri_), K(object_name)); + } + } else { + if (FAILEDx(file_set_->fetch_and_delete_file(object_id, task->uri_))) { + OB_LOG(WARN, "failed to fetch file from file set", KR(ret), K(object_id)); + } + } + + if (OB_FAIL(ret)) { + } else if (FALSE_IT(task->buf_len_ = get_random_content_length(op_type))) { + } else if (FALSE_IT(task->buf_ = static_cast(malloc(task->buf_len_)))) { + } else if (task->buf_len_ > 0) { + if (OB_ISNULL(task->buf_)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "failed allocate memory for task buf", KR(ret), K(task->buf_len_)); + } else if (OB_FAIL(generate_content_by_object_id(task->buf_, task->buf_len_, object_id))) { + OB_LOG(WARN, "failed to generate content for task", KR(ret), K(task->buf_len_), K(object_id)); + } + } + + if (OB_SUCC(ret)) { + task->op_type_ = op_type; + task->object_id_ = object_id; + // random choosing whether to parallel or not + rnd = ObRandom::rand(1, 100); + if (rnd <= prob_of_parallel_) { + task->parallel_ = true; + static const OSDQOpType parallel_op_types[] = { + WRITE_SINGLE_FILE, + MULTIPART_WRITE, + APPEND_WRITE + }; + int parallel_op_type_nums = sizeof(parallel_op_types) / sizeof(OSDQOpType); + // if buf_len_ is less than 4, the append task cannot be executed + if (task->buf_len_ < 4) { + parallel_op_type_nums--; + } + rnd = ObRandom::rand(0, parallel_op_type_nums - 1); + task->parallel_op_type_ = static_cast(rnd); + } + } + } + return ret; +} + +int OSDQTaskHandler::gen_read_single_task(OSDQTask *task) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_)); + } else if (OB_ISNULL(task)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(task)); + } else { + task->op_type_ = READ_SINGLE_FILE; + if (OB_FAIL(file_set_->fetch_and_delete_file(task->object_id_, task->uri_))) { + OB_LOG(WARN, "failed fetch and delete file from file set", KR(ret)); + } else { + int rnd = ObRandom::rand(1, 100); + if (rnd <= prob_of_parallel_) { + task->parallel_ = true; + task->parallel_op_type_ = task->op_type_; + } + } + } + return ret; +} + +int OSDQTaskHandler::gen_del_task(OSDQTask *task) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "task hanlder not init", KR(ret), K(is_inited_)); + } else if (OB_ISNULL(task)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), K(task)); + } else { + task->op_type_ = DEL_FILE; + if (OB_FAIL(file_set_->fetch_and_delete_file(task->object_id_, task->uri_))) { + OB_LOG(WARN, "failed fetch and delete file from file set", KR(ret)); + } + } + return ret; +} + +std::packaged_task OSDQTaskHandler::get_packaged_task_(OSDQOpType op_type, const OSDQTask *task) +{ + if (op_type == WRITE_SINGLE_FILE) { + return std::packaged_task(std::bind(&OSDQTaskHandler::handle_write_single_task_helper_, this, task)); + } else if (op_type == MULTIPART_WRITE) { + return std::packaged_task(std::bind(&OSDQTaskHandler::handle_multipart_write_task_helper_, this, task)); + } else { + // the previous parameter checks ensure that this must be append write operation + return std::packaged_task(std::bind(&OSDQTaskHandler::handle_append_write_task_helper_, this, task)); + } +} + +int OSDQTaskHandler::handle_write_task(const OSDQTask *task) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_)); + } else if (OB_ISNULL(task) || OB_UNLIKELY(!task->is_valid())) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), KPC(task)); + } else { + if (!task->parallel_) { + // not parallel write + const int64_t op_start_time_us = ObTimeUtility::current_time(); + if (task->op_type_ == WRITE_SINGLE_FILE + && OB_FAIL(handle_write_single_task_helper_(task))) { + OB_LOG(WARN, "failed handle write single task", KR(ret), KPC(task)); + } else if (task->op_type_ == MULTIPART_WRITE + && OB_FAIL(handle_multipart_write_task_helper_(task))) { + OB_LOG(WARN, "failed handle multipart write task", KR(ret), KPC(task)); + } else if (task->op_type_ == APPEND_WRITE + && OB_FAIL(handle_append_write_task_helper_(task))) { + OB_LOG(WARN, "failed handle append write task", KR(ret), KPC(task)); + } + + if (FAILEDx(metric_->add_latency_metric(op_start_time_us, + task->op_type_, task->buf_len_))) { + OB_LOG(WARN, "failed add latency metric", KR(ret), K(op_start_time_us), KPC(task)); + } else if (OB_FAIL(file_set_->add_file(task->object_id_, task->uri_))) { + OB_LOG(WARN, "failed add object id and file path in file set", KR(ret), KPC(task)); + } + } else { + //parallel write + std::packaged_task task1 = get_packaged_task_(task->op_type_, task); + std::packaged_task task2 = get_packaged_task_(task->parallel_op_type_, task); + + std::future future1 = task1.get_future(); + std::future future2 = task2.get_future(); + std::thread th1(std::move(task1)); + std::thread th2(std::move(task2)); + + th1.join(); + th2.join(); + int ret1 = future1.get(); + int ret2 = future2.get(); + + if (OB_UNLIKELY(!check_parallel_write_result_(task->op_type_, ret1, task->parallel_op_type_, ret2))) { + OSDQLogEntry log; + log.init("PARALLEL WRITE FAILED", RED_COLOR_PREFIX); + log.log_entry(std::string(osdq_op_type_names[task->op_type_]) + + ": " + std::to_string(ret1) + " " + std::string(ob_error_name(ret1))); + log.log_entry(std::string(osdq_op_type_names[task->parallel_op_type_]) + + ": " + std::to_string(ret2) + " " + std::string(ob_error_name(ret2))); + log.print(); + } else { + OB_LOG(INFO, "parallel write succees info", KPC(task), K(ret1), K(ret2)); + } + } + } + return ret; +} + +int OSDQTaskHandler::handle_read_single_task(OSDQTask *task) +{ + int ret = OB_SUCCESS; + const int64_t op_start_time_us = ObTimeUtility::current_time(); + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_)); + } else if (OB_ISNULL(task) || OB_UNLIKELY(!task->is_valid())) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), KPC(task)); + } else { + if (!task->parallel_) { + if (OB_FAIL(handle_read_single_task_helper_(task))) { + OB_LOG(WARN, "failed handle read single task", KR(ret)); + } else if (OB_FAIL(metric_->add_latency_metric(op_start_time_us, + task->op_type_, task->buf_len_))) { + OB_LOG(WARN, "failed add latency metric", KR(ret), K(op_start_time_us), KPC(task)); + } else if (OB_FAIL(file_set_->add_file(task->object_id_, task->uri_))) { + OB_LOG(WARN, "failed add object id and file path in file set", KR(ret), KPC(task)); + } + } else { + std::packaged_task helper1(std::bind(&OSDQTaskHandler::handle_read_single_task_helper_, this, std::placeholders::_1)); + std::packaged_task helper2(std::bind(&OSDQTaskHandler::handle_read_single_task_helper_, this, std::placeholders::_1)); + std::future future_obj1 = helper1.get_future(); + std::future future_obj2 = helper2.get_future(); + + OSDQTask *task2 = nullptr; + if (OB_ISNULL(task2 = static_cast(malloc(sizeof(OSDQTask))))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "failed allocate memory for OSDQTask", KR(ret)); + } else if (FALSE_IT(task2 = new(task2)OSDQTask())) { + } else if (OB_ISNULL(task2->uri_ = static_cast(malloc(OB_MAX_URI_LENGTH)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "failed allocate memory for OSDQTask", KR(ret)); + } else { + memcpy(task2->uri_, task->uri_, OB_MAX_URI_LENGTH); + task2->op_type_ = READ_SINGLE_FILE; + task2->object_id_ = task->object_id_; + } + + if (OB_FAIL(ret)) { + } else { + std::thread th1(std::move(helper1), task); + std::thread th2(std::move(helper2), task2); + + th1.join(); + th2.join(); + int ret1 = future_obj1.get(); + int ret2 = future_obj2.get(); + if (OB_UNLIKELY(ret1 != OB_SUCCESS || ret2 != OB_SUCCESS)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "failed parallel read single task", KR(ret), KR(ret1), KR(ret2)); + } + } + + if (OB_NOT_NULL(task2)) { + free_task(task2); + } + } + } + return ret; +} + +int OSDQTaskHandler::handle_read_single_task_helper_(OSDQTask *task) +{ + int ret = OB_SUCCESS; + int64_t read_size = 0; + if (OB_FAIL(adapter_.adaptively_get_file_length(task->uri_, storage_info_, task->buf_len_))) { + OB_LOG(WARN, "failed get file length", KR(ret), KPC(task), KPC(storage_info_)); + } else if (OB_ISNULL(task->buf_ = static_cast(malloc(task->buf_len_)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "failed allocate memory for task buf", KR(ret), KPC(task)); + } else if (OB_FAIL(adapter_.adaptively_read_single_file(task->uri_, storage_info_, task->buf_, + task->buf_len_, read_size, ObStorageIdMod::get_default_id_mod()))) { + OB_LOG(WARN, "failed read single file", KR(ret), KPC(task), KPC(storage_info_)); + } + return ret; +} + +int OSDQTaskHandler::handle_del_task(OSDQTask *task) +{ + int ret = OB_SUCCESS; + const int64_t op_start_time_us = ObTimeUtility::current_time(); + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "task handler not init", KR(ret), K(is_inited_)); + } else if (OB_ISNULL(task) || OB_UNLIKELY(!task->is_valid())) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid argument", KR(ret), KPC(task)); + } else if (OB_FAIL(adapter_.adaptively_del_file(task->uri_, storage_info_))) { + OB_LOG(WARN, "failed del file", KR(ret), KPC(task)); + } else if (OB_FAIL(metric_->add_latency_metric(op_start_time_us, task->op_type_, 0))) { + OB_LOG(WARN, "failed add latency metric", KR(ret), K(op_start_time_us), KPC(task)); + } + + if (OB_FAIL(ret)) { + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(file_set_->add_file(task->object_id_, task->uri_))) { + OB_LOG(WARN, "failed add object id and file path in file set", KR(ret), KPC(task)); + } + } + return ret; +} + +} // namespace tools +} // namespace oceanbase